CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutSign UpSign In
rapid7

Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.

GitHub Repository: rapid7/metasploit-framework
Path: blob/master/test/lib/module_test.rb
Views: 11766
1
require 'rex/stopwatch'
2
3
module Msf
4
module ModuleTest
5
attr_accessor :tests
6
attr_accessor :failures
7
attr_accessor :skipped
8
9
class SkipTestError < ::Exception
10
end
11
12
def initialize(info = {})
13
@tests = 0
14
@failures = 0
15
@skipped = 0
16
super
17
18
register_options(
19
[
20
OptString.new("TestName", [false, "Run a specified test method name.", nil]),
21
], self.class
22
)
23
end
24
25
def run_all_tests
26
tests = datastore['TestName'].present? ? [datastore['TestName'].to_sym] : self.methods.select { |m| m.to_s =~ /^test_/ }
27
tests.each { |test_method|
28
begin
29
unless respond_to?(test_method)
30
print_error("test method #{test_method} not found")
31
next
32
end
33
self.send(test_method)
34
rescue SkipTestError => e
35
# If the entire def is skipped, increment tests and skip count
36
@tests += 1
37
@skipped += 1
38
print_status("SKIPPED: def #{test_method} (#{e.message})")
39
end
40
}
41
end
42
43
def skip(msg = "No reason given")
44
raise SkipTestError, msg
45
end
46
47
def it(msg = "", &block)
48
@current_it_msg = msg
49
@tests += 1
50
begin
51
result = block.call
52
unless result
53
@failures += 1
54
print_error("FAILED: #{error}") if error
55
@current_it_msg = nil
56
print_error("FAILED: #{msg}")
57
return
58
end
59
rescue SkipTestError => e
60
@skipped += 1
61
@current_it_msg = nil
62
print_status("SKIPPED: #{msg} (#{e.message})")
63
return
64
rescue ::Exception => e
65
@failures += 1
66
print_error("FAILED: #{msg}")
67
print_error("Exception: #{e.class}: #{e}")
68
dlog("Exception in testing - #{msg}")
69
dlog("Call stack: #{e.backtrace.join("\n")}")
70
return
71
ensure
72
@current_it_msg = nil
73
end
74
75
print_good("#{msg}")
76
end
77
78
def pending(msg = "", &block)
79
print_status("PENDING: #{msg}")
80
end
81
82
# @return [Integer] The number of tests that have passed
83
def passed
84
@tests - @failures
85
end
86
87
# When printing to console, additionally prepend the current test name
88
[
89
:print,
90
:print_line,
91
:print_status,
92
:print_good,
93
94
:print_warning,
95
:print_error,
96
:print_bad,
97
].each do |method|
98
define_method(method) do |msg|
99
super(@current_it_msg ? "[#{@current_it_msg}] #{msg}" : msg)
100
end
101
end
102
end
103
104
module ModuleTest::PostTest
105
include ModuleTest
106
def run
107
print_status("Running against session #{datastore["SESSION"]}")
108
print_status("Session type is #{session.type} and platform is #{session.platform}")
109
110
@tests = 0
111
@failures = 0
112
@skipped = 0
113
114
_res, elapsed_time = Rex::Stopwatch.elapsed_time do
115
run_all_tests
116
end
117
118
vprint_status("Testing complete in #{elapsed_time.round(2)} seconds")
119
status = "Passed: #{passed}; Failed: #{@failures}; Skipped: #{@skipped}"
120
if @failures > 0
121
print_error(status)
122
else
123
print_status(status)
124
end
125
end
126
end
127
128
module ModuleTest::PostTestFileSystem
129
def initialize(info = {})
130
super
131
132
register_options(
133
[
134
OptBool.new("AddEntropy", [false, "Add entropy token to file and directory names.", true]),
135
OptString.new('BaseDirectoryName', [true, 'Directory name to create', 'meterpreter-test-dir']),
136
OptString.new("BaseFileName", [true, "File/dir base name", "meterpreter-test"]),
137
], self.class
138
)
139
140
@directory_stack = []
141
end
142
143
def push_test_directory
144
@directory_stack.push(_file_system.pwd)
145
146
# Find the temp directory
147
148
# TODO: This caused issue with random jobs, will add some logic below to unblock for now
149
# tmp = _file_system.get_env("TMP")&.strip&.presence || _file_system&.get_env("TMPDIR").strip&.presence
150
if session.platform == 'unix'
151
tmp = _file_system.get_env("TMP").strip.presence || _file_system.get_env("TMPDIR").strip.presence
152
else
153
tmp = _file_system.get_env("TMP") || _file_system.get_env("TMPDIR")
154
end
155
156
# mettle fallback
157
tmp = '/tmp' if tmp.nil? && _file_system.directory?('/tmp')
158
raise "Could not find tmp directory" if tmp == nil || !_file_system.directory?(tmp)
159
160
vprint_status("Setup: changing working directory to tmp: #{tmp}")
161
_file_system.cd(tmp)
162
163
vprint_status("Setup: Creating clean directory")
164
165
if datastore["AddEntropy"]
166
entropy_value = '-' + ('a'..'z').to_a.shuffle[0, 8].join
167
else
168
entropy_value = ""
169
end
170
clean_test_directory = datastore['BaseDirectoryName'] + entropy_value
171
_file_system.mkdir(clean_test_directory)
172
_file_system.cd(clean_test_directory)
173
174
vprint_status("Setup: Now in #{_file_system.pwd}")
175
end
176
177
def pop_test_directory
178
previous_directory = @directory_stack.pop
179
unless previous_directory.nil?
180
vprint_status("Cleanup: changing working directory back to #{previous_directory}")
181
_file_system.cd(previous_directory)
182
end
183
end
184
185
# Private PostFile wrapper to ensure we don't clobber the test module's namespace with the Msf::Post::File mixin methods
186
class FileSystem
187
include Msf::Post::File
188
189
def initialize(mod)
190
@mod = mod
191
@session = mod.session
192
end
193
194
private
195
196
def vprint_status(s)
197
@mod.vprint_status(s)
198
end
199
200
def register_dir_for_cleanup(path)
201
end
202
203
attr_reader :session
204
end
205
206
def _file_system
207
FileSystem.new(self)
208
end
209
end
210
end
211
212