CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutSign UpSign In
rapid7

CoCalc provides the best real-time collaborative environment for Jupyter Notebooks, LaTeX documents, and SageMath, scalable from individual users to large groups and classes!

GitHub Repository: rapid7/metasploit-framework
Path: blob/master/lib/rex/post/meterpreter/packet_dispatcher.rb
Views: 1904
1
# -*- coding: binary -*-
2
3
require 'rex/post/meterpreter/command_mapper'
4
require 'rex/post/meterpreter/packet_response_waiter'
5
require 'rex/exceptions'
6
require 'pathname'
7
8
module Rex
9
module Post
10
module Meterpreter
11
12
###
13
#
14
# Exception thrown when a request fails.
15
#
16
###
17
class RequestError < ArgumentError
18
def initialize(command_id, einfo, ecode=nil)
19
command_name = Rex::Post::Meterpreter::CommandMapper.get_command_name(command_id)
20
21
@method = command_name || "##{command_id}"
22
@result = einfo
23
@code = ecode || einfo
24
end
25
26
def to_s
27
"#{@method}: Operation failed: #{@result}"
28
end
29
30
# The method that failed.
31
attr_reader :method
32
33
# The error result that occurred, typically a windows error message.
34
attr_reader :result
35
36
# The error result that occurred, typically a windows error code.
37
attr_reader :code
38
end
39
40
###
41
#
42
# Handles packet transmission, reception, and correlation,
43
# and processing
44
#
45
###
46
module PacketDispatcher
47
48
# Default time, in seconds, to wait for a response after sending a packet
49
PACKET_TIMEOUT = 600
50
51
# Number of seconds to wait without getting any packets before we try to
52
# send a liveness check. A minute should be generous even on the highest
53
# latency networks
54
#
55
# @see #keepalive
56
PING_TIME = 60
57
58
# This mutex is used to lock out new commands during an
59
# active migration. Unused if this is a passive dispatcher
60
attr_accessor :comm_mutex
61
62
# The guid that identifies an active Meterpreter session
63
attr_accessor :session_guid
64
65
# This contains the key material used for TLV encryption
66
attr_accessor :tlv_enc_key
67
68
# Passive Dispatching
69
#
70
# @return [Rex::ServiceManager]
71
# @return [nil] if this is not a passive dispatcher
72
attr_accessor :passive_service
73
74
# @return [Array]
75
attr_accessor :send_queue
76
77
# @return [Array]
78
attr_accessor :recv_queue
79
80
def initialize_passive_dispatcher
81
self.send_queue = []
82
self.recv_queue = []
83
self.waiters = []
84
self.alive = true
85
end
86
87
def shutdown_passive_dispatcher
88
self.alive = false
89
self.send_queue = []
90
self.recv_queue = []
91
self.waiters = []
92
end
93
94
def on_passive_request(cli, req)
95
begin
96
self.last_checkin = ::Time.now
97
resp = send_queue.shift
98
cli.send_response(resp)
99
rescue => e
100
send_queue.unshift(resp) if resp
101
elog("Exception sending a reply to the reader request #{cli.inspect}", error: e)
102
end
103
end
104
105
##
106
#
107
# Transmission
108
#
109
##
110
111
#
112
# Sends a packet without waiting for a response.
113
#
114
def send_packet(packet, opts={})
115
if self.pivot_session
116
opts[:session_guid] = self.session_guid
117
opts[:tlv_enc_key] = self.tlv_enc_key
118
return self.pivot_session.send_packet(packet, opts)
119
end
120
121
if opts[:completion_routine]
122
add_response_waiter(packet, opts[:completion_routine], opts[:completion_param])
123
end
124
125
session_guid = self.session_guid
126
tlv_enc_key = self.tlv_enc_key
127
128
# if a session guid is provided, use all the details provided
129
if opts[:session_guid]
130
session_guid = opts[:session_guid]
131
tlv_enc_key = opts[:tlv_enc_key]
132
end
133
134
log_packet(packet, :send)
135
136
bytes = 0
137
raw = packet.to_r(session_guid, tlv_enc_key)
138
err = nil
139
140
# Short-circuit send when using a passive dispatcher
141
if self.passive_service
142
send_queue.push(raw)
143
return raw.size # Lie!
144
end
145
146
if raw
147
self.comm_mutex.synchronize do
148
begin
149
bytes = self.sock.write(raw)
150
rescue ::Exception => e
151
err = e
152
end
153
end
154
155
156
if bytes.to_i == 0
157
# Mark the session itself as dead
158
self.alive = false
159
160
# Reraise the error to the top-level caller
161
raise err if err
162
end
163
end
164
165
return bytes
166
end
167
168
#
169
# Sends a packet and waits for a timeout for the given time interval.
170
#
171
# @param packet [Packet] request to send
172
# @param timeout [Integer,nil] seconds to wait for response, or nil to ignore the
173
# response and return immediately
174
# @return (see #send_packet_wait_response)
175
def send_request(packet, timeout = self.response_timeout)
176
response = send_packet_wait_response(packet, timeout)
177
178
if timeout.nil?
179
return nil
180
elsif response.nil?
181
raise Rex::TimeoutError.new("Send timed out")
182
elsif (response.result != 0)
183
einfo = lookup_error(response.result)
184
e = RequestError.new(packet.method, einfo, response.result)
185
186
e.set_backtrace(caller)
187
188
raise e
189
end
190
191
return response
192
end
193
194
#
195
# Transmits a packet and waits for a response.
196
#
197
# @param packet [Packet] the request packet to send
198
# @param timeout [Integer,nil] number of seconds to wait, or nil to wait
199
# forever
200
def send_packet_wait_response(packet, timeout)
201
if packet.type == PACKET_TYPE_REQUEST && commands.present?
202
# XXX: Remove this condition once the payloads gem has had another major version bump from 2.x to 3.x and
203
# rapid7/metasploit-payloads#451 has been landed to correct the `enumextcmd` behavior on Windows. Until then, skip
204
# proactive validation of Windows core commands. This is not the only instance of this workaround.
205
windows_core = base_platform == 'windows' && (packet.method - (packet.method % COMMAND_ID_RANGE)) == Rex::Post::Meterpreter::ClientCore.extension_id
206
207
unless windows_core || commands.include?(packet.method)
208
if (ext_name = Rex::Post::Meterpreter::ExtensionMapper.get_extension_name(packet.method))
209
unless ext.aliases.include?(ext_name)
210
raise RequestError.new(packet.method, "The command requires the #{ext_name} extension to be loaded")
211
end
212
end
213
raise RequestError.new(packet.method, "The command is not supported by this Meterpreter type (#{session_type})")
214
end
215
end
216
217
# First, add the waiter association for the supplied packet
218
waiter = add_response_waiter(packet)
219
220
bytes_written = send_packet(packet)
221
222
# Transmit the packet
223
if (bytes_written.to_i <= 0)
224
# Remove the waiter if we failed to send the packet.
225
remove_response_waiter(waiter)
226
return nil
227
end
228
229
if not timeout
230
return nil
231
end
232
233
# Wait for the supplied time interval
234
response = waiter.wait(timeout)
235
236
# Remove the waiter from the list of waiters in case it wasn't
237
# removed. This happens if the waiter timed out above.
238
remove_response_waiter(waiter)
239
240
# wire in the UUID for this, as it should be part of every response
241
# packet
242
if response && !self.payload_uuid
243
uuid = response.get_tlv_value(TLV_TYPE_UUID)
244
self.payload_uuid = Msf::Payload::UUID.new({:raw => uuid}) if uuid
245
end
246
247
# Return the response packet, if any
248
return response
249
end
250
251
# Send a ping to the server.
252
#
253
# Our 'ping' is a check for eof on channel id 0. This method has no side
254
# effects and always returns an answer (regardless of the existence of chan
255
# 0), which is all that's needed for a liveness check. The answer itself is
256
# unimportant and is ignored.
257
#
258
# @return [void]
259
def keepalive
260
if @ping_sent
261
if ::Time.now.to_i - last_checkin.to_i > PING_TIME*2
262
dlog("No response to ping, session #{self.sid} is dead", LEV_3)
263
self.alive = false
264
end
265
else
266
pkt = Packet.create_request(COMMAND_ID_CORE_CHANNEL_EOF)
267
pkt.add_tlv(TLV_TYPE_CHANNEL_ID, 0)
268
add_response_waiter(pkt, Proc.new { @ping_sent = false })
269
send_packet(pkt)
270
@ping_sent = true
271
end
272
end
273
274
##
275
#
276
# Reception
277
#
278
##
279
280
def pivot_keepalive_start
281
return unless self.send_keepalives
282
self.receiver_thread = Rex::ThreadFactory.spawn("PivotKeepalive", false) do
283
while self.alive
284
begin
285
Rex::sleep(PING_TIME)
286
keepalive
287
rescue ::Exception => e
288
dlog("Exception caught in pivot keepalive: #{e.class}: #{e}", 'meterpreter', LEV_1)
289
dlog("Call stack: #{e.backtrace.join("\n")}", 'meterpreter', LEV_2)
290
self.alive = false
291
break
292
end
293
end
294
end
295
end
296
297
#
298
# Monitors the PacketDispatcher's sock for data in its own
299
# thread context and parsers all inbound packets.
300
#
301
def monitor_socket
302
303
# Skip if we are using a passive dispatcher
304
return if self.passive_service
305
306
# redirect to pivot keepalive if we're a pivot session
307
return pivot_keepalive_start if self.pivot_session
308
309
self.comm_mutex = ::Mutex.new
310
311
self.waiters = []
312
313
# This queue is where the new incoming packets end up
314
@new_packet_queue = ::Queue.new
315
# This is where we put packets that aren't new, but haven't
316
# yet been handled.
317
@incomplete_queue = ::Queue.new
318
@ping_sent = false
319
320
# Spawn a thread for receiving packets
321
self.receiver_thread = Rex::ThreadFactory.spawn("MeterpreterReceiver", false) do
322
while (self.alive)
323
begin
324
rv = Rex::ThreadSafe.select([ self.sock.fd ], nil, nil, PING_TIME)
325
if rv
326
packet = receive_packet
327
# Always enqueue the new packets onto the new packet queue
328
@new_packet_queue << decrypt_inbound_packet(packet) if packet
329
elsif self.send_keepalives && @new_packet_queue.empty?
330
keepalive
331
end
332
rescue ::Exception => e
333
dlog("Exception caught in monitor_socket: #{e.class}: #{e}", 'meterpreter', LEV_1)
334
dlog("Call stack: #{e.backtrace.join("\n")}", 'meterpreter', LEV_2)
335
self.alive = false
336
break
337
end
338
end
339
end
340
341
# Spawn a new thread that monitors the socket
342
self.dispatcher_thread = Rex::ThreadFactory.spawn("MeterpreterDispatcher", false) do
343
begin
344
while (self.alive)
345
# This is where we'll store incomplete packets on
346
# THIS iteration
347
incomplete = []
348
# The backlog is the full list of packets that aims
349
# to be handled this iteration
350
backlog = []
351
352
# If we have any left over packets from the previous
353
# iteration, we need to prioritise them over the new
354
# packets. If we don't do this, then we end up in
355
# situations where data on channels can be processed
356
# out of order. We don't do a blocking wait here via
357
# the .pop method because we don't want to block, we
358
# just want to dump the queue.
359
while @incomplete_queue.length > 0
360
backlog << @incomplete_queue.pop
361
end
362
363
# If the backlog is empty, we don't have old/stale
364
# packets hanging around, so perform a blocking wait
365
# for the next packet
366
backlog << @new_packet_queue.pop if backlog.length == 0
367
# At this point we either received a packet off the wire
368
# or we had a backlog to process. In either case, we
369
# perform a non-blocking queue dump to fill the backlog
370
# with every packet we have.
371
while @new_packet_queue.length > 0
372
backlog << @new_packet_queue.pop
373
end
374
375
#
376
# Prioritize message processing here
377
# 1. Close should always be processed at the end
378
# 2. Command responses always before channel data
379
#
380
381
tmp_command = []
382
tmp_channel = []
383
tmp_close = []
384
backlog.each do |pkt|
385
if(pkt.response?)
386
tmp_command << pkt
387
next
388
end
389
if(pkt.method == COMMAND_ID_CORE_CHANNEL_CLOSE)
390
tmp_close << pkt
391
next
392
end
393
tmp_channel << pkt
394
end
395
396
backlog = []
397
backlog.push(*tmp_command)
398
backlog.push(*tmp_channel)
399
backlog.push(*tmp_close)
400
401
#
402
# Process the message queue
403
#
404
backlog.each do |pkt|
405
406
begin
407
unless dispatch_inbound_packet(pkt)
408
# Keep Packets in the receive queue until a handler is registered
409
# for them. Packets will live in the receive queue for up to
410
# PACKET_TIMEOUT seconds, after which they will be dropped.
411
#
412
# A common reason why there would not immediately be a handler for
413
# a received Packet is in channels, where a connection may
414
# open and receive data before anything has asked to read.
415
if (::Time.now.to_i - pkt.created_at.to_i < PACKET_TIMEOUT)
416
incomplete << pkt
417
end
418
end
419
420
rescue ::Exception => e
421
dlog("Dispatching exception with packet #{pkt}: #{e} #{e.backtrace}", 'meterpreter', LEV_1)
422
end
423
end
424
425
# If the backlog and incomplete arrays are the same, it means
426
# dispatch_inbound_packet wasn't able to handle any of the
427
# packets. When that's the case, we can get into a situation
428
# where @new_packet_queue is not empty and, since nothing else bounds this
429
# loop, we spin CPU trying to handle packets that can't be
430
# handled. Sleep here to treat that situation as though the
431
# queue is empty.
432
if (backlog.length > 0 && backlog.length == incomplete.length)
433
::IO.select(nil, nil, nil, 0.10)
434
end
435
436
# If we have any packets that weren't handled, they go back
437
# on the incomplete queue so that they're prioritised over
438
# new packets that are coming in off the wire.
439
dlog("Requeuing #{incomplete.length} packet(s)", 'meterpreter', LEV_1) if incomplete.length > 0
440
while incomplete.length > 0
441
@incomplete_queue << incomplete.shift
442
end
443
444
# If the old queue of packets gets too big...
445
if(@incomplete_queue.length > 100)
446
removed = []
447
# Drop a bunch of them.
448
(1..25).each {
449
removed << @incomplete_queue.pop
450
}
451
dlog("Backlog has grown to over 100 in monitor_socket, dropping older packets: #{removed.map{|x| x.inspect}.join(" - ")}", 'meterpreter', LEV_1)
452
end
453
end
454
rescue ::Exception => e
455
dlog("Exception caught in monitor_socket dispatcher: #{e.class} #{e} #{e.backtrace}", 'meterpreter', LEV_1)
456
ensure
457
self.receiver_thread.kill if self.receiver_thread
458
end
459
end
460
end
461
462
463
#
464
# Parses data from the dispatcher's sock and returns a Packet context
465
# once a full packet has been received.
466
#
467
def receive_packet
468
packet = parser.recv(self.sock)
469
if packet
470
packet.parse_header!
471
if self.session_guid == NULL_GUID
472
self.session_guid = packet.session_guid.dup
473
end
474
end
475
packet
476
end
477
478
#
479
# Stop the monitor
480
#
481
def monitor_stop
482
if self.receiver_thread
483
self.receiver_thread.kill
484
self.receiver_thread.join
485
self.receiver_thread = nil
486
end
487
488
if self.dispatcher_thread
489
self.dispatcher_thread.kill
490
self.dispatcher_thread.join
491
self.dispatcher_thread = nil
492
end
493
end
494
495
##
496
#
497
# Waiter registration
498
#
499
##
500
501
#
502
# Adds a waiter association with the supplied request packet.
503
#
504
def add_response_waiter(request, completion_routine = nil, completion_param = nil)
505
if self.pivot_session
506
return self.pivot_session.add_response_waiter(request, completion_routine, completion_param)
507
end
508
509
waiter = PacketResponseWaiter.new(request.rid, completion_routine, completion_param)
510
511
self.waiters << waiter
512
513
return waiter
514
end
515
516
#
517
# Notifies a whomever is waiting for a the supplied response,
518
# if anyone.
519
#
520
def notify_response_waiter(response)
521
if self.pivot_session
522
return self.pivot_session.notify_response_waiter(response)
523
end
524
525
handled = false
526
self.waiters.each() { |waiter|
527
if (waiter.waiting_for?(response))
528
waiter.notify(response)
529
remove_response_waiter(waiter)
530
handled = true
531
break
532
end
533
}
534
return handled
535
end
536
537
#
538
# Removes a waiter from the list of waiters.
539
#
540
def remove_response_waiter(waiter)
541
if self.pivot_session
542
self.pivot_session.remove_response_waiter(waiter)
543
else
544
self.waiters.delete(waiter)
545
end
546
end
547
548
##
549
#
550
# Dispatching
551
#
552
##
553
554
#
555
# Initializes the inbound handlers.
556
#
557
def initialize_inbound_handlers
558
@inbound_handlers = []
559
end
560
561
#
562
# Decrypt the given packet with the appropriate key depending on
563
# if this session is a pivot session or not.
564
#
565
def decrypt_inbound_packet(packet)
566
pivot_session = self.find_pivot_session(packet.session_guid)
567
tlv_enc_key = self.tlv_enc_key
568
tlv_enc_key = pivot_session.pivoted_session.tlv_enc_key if pivot_session
569
packet.from_r(tlv_enc_key)
570
packet
571
end
572
573
#
574
# Dispatches and processes an inbound packet. If the packet is a
575
# response that has an associated waiter, the waiter is notified.
576
# Otherwise, the packet is passed onto any registered dispatch
577
# handlers until one returns success.
578
#
579
def dispatch_inbound_packet(packet)
580
handled = false
581
582
log_packet(packet, :recv)
583
584
# Update our last reply time
585
self.last_checkin = ::Time.now
586
587
pivot_session = self.find_pivot_session(packet.session_guid)
588
pivot_session.pivoted_session.last_checkin = self.last_checkin if pivot_session
589
590
# If the packet is a response, try to notify any potential
591
# waiters
592
if packet.response? && notify_response_waiter(packet)
593
return true
594
end
595
596
# Enumerate all of the inbound packet handlers until one handles
597
# the packet
598
@inbound_handlers.each { |handler|
599
600
handled = nil
601
begin
602
603
if packet.response?
604
handled = handler.response_handler(self, packet)
605
else
606
handled = handler.request_handler(self, packet)
607
end
608
609
rescue ::Exception => e
610
dlog("Exception caught in dispatch_inbound_packet: handler=#{handler} #{e.class} #{e} #{e.backtrace}", 'meterpreter', LEV_1)
611
return true
612
end
613
614
if (handled)
615
break
616
end
617
}
618
return handled
619
end
620
621
#
622
# Registers an inbound packet handler that implements the
623
# InboundPacketHandler interface.
624
#
625
def register_inbound_handler(handler)
626
@inbound_handlers << handler
627
end
628
629
#
630
# Deregisters a previously registered inbound packet handler.
631
#
632
def deregister_inbound_handler(handler)
633
@inbound_handlers.delete(handler)
634
end
635
636
def initialize_tlv_logging(opt)
637
self.tlv_logging_error_occured = false
638
self.tlv_log_file = nil
639
self.tlv_log_file_path = nil
640
self.tlv_log_output = :none
641
642
if opt.casecmp?('console') || opt.casecmp?('true')
643
self.tlv_log_output = :console
644
elsif opt.start_with?('file:')
645
self.tlv_log_output = :file
646
self.tlv_log_file_path = opt.split('file:').last
647
end
648
end
649
650
protected
651
652
attr_accessor :receiver_thread # :nodoc:
653
attr_accessor :dispatcher_thread # :nodoc:
654
attr_accessor :waiters # :nodoc:
655
656
attr_accessor :tlv_log_output # :nodoc:
657
attr_accessor :tlv_log_file # :nodoc:
658
attr_accessor :tlv_log_file_path # :nodoc:
659
attr_accessor :tlv_logging_error_occured # :nodoc:
660
661
def shutdown_tlv_logging
662
self.tlv_log_output = :none
663
self.tlv_log_file.close unless self.tlv_log_file.nil?
664
self.tlv_log_file = nil
665
self.tlv_log_file_path = nil
666
end
667
668
def log_packet(packet, packet_type)
669
# if we previously failed to log, return
670
return if self.tlv_logging_error_occured || self.tlv_log_output == :none
671
672
if self.tlv_log_output == :console
673
log_packet_to_console(packet, packet_type)
674
elsif self.tlv_log_output == :file
675
log_packet_to_file(packet, packet_type)
676
end
677
end
678
679
def log_packet_to_console(packet, packet_type)
680
if packet_type == :send
681
print "\n%redSEND%clr: #{packet.inspect}\n"
682
elsif packet_type == :recv
683
print "\n%bluRECV%clr: #{packet.inspect}\n"
684
end
685
end
686
687
def log_packet_to_file(packet, packet_type)
688
pathname = ::Pathname.new(self.tlv_log_file_path.split('file:').last)
689
690
begin
691
if self.tlv_log_file.nil? || self.tlv_log_file.path != pathname.to_s
692
self.tlv_log_file.close unless self.tlv_log_file.nil?
693
694
self.tlv_log_file = ::File.open(pathname, 'a+')
695
end
696
697
if packet_type == :recv
698
self.tlv_log_file.puts("\nRECV: #{packet.inspect}\n")
699
elsif packet_type == :send
700
self.tlv_log_file.puts("\nSEND: #{packet.inspect}\n")
701
end
702
rescue ::StandardError => e
703
self.tlv_logging_error_occured = true
704
print_error "Failed writing to TLV Log File: #{pathname} with error: #{e.message}. Turning off logging for this session: #{self.inspect}..."
705
elog(e)
706
shutdown_tlv_logging
707
return
708
end
709
end
710
end
711
712
module HttpPacketDispatcher
713
def initialize_passive_dispatcher
714
super
715
716
# Ensure that there is only one leading and trailing slash on the URI
717
resource_uri = "/" + self.conn_id.to_s.gsub(/(^\/|\/$)/, '') + "/"
718
self.passive_service = self.passive_dispatcher
719
self.passive_service.remove_resource(resource_uri)
720
self.passive_service.add_resource(resource_uri,
721
'Proc' => Proc.new { |cli, req| on_passive_request(cli, req) },
722
'VirtualDirectory' => true
723
)
724
725
# Add a reference count to the handler
726
self.passive_service.ref
727
end
728
729
def shutdown_passive_dispatcher
730
if self.passive_service
731
# Ensure that there is only one leading and trailing slash on the URI
732
resource_uri = "/" + self.conn_id.to_s.gsub(/(^\/|\/$)/, '') + "/"
733
self.passive_service.remove_resource(resource_uri) if self.passive_service
734
735
self.passive_service.deref
736
self.passive_service = nil
737
end
738
super
739
end
740
741
def on_passive_request(cli, req)
742
743
begin
744
745
resp = Rex::Proto::Http::Response.new(200, "OK")
746
resp['Content-Type'] = 'application/octet-stream'
747
resp['Connection'] = 'close'
748
749
self.last_checkin = ::Time.now
750
751
if req.method == 'GET'
752
rpkt = send_queue.shift
753
resp.body = rpkt || ''
754
begin
755
cli.send_response(resp)
756
rescue ::Exception => e
757
send_queue.unshift(rpkt) if rpkt
758
elog("Exception sending a reply to the reader request #{cli.inspect}", error: e)
759
end
760
else
761
resp.body = ""
762
if req.body and req.body.length > 0
763
packet = Packet.new(0)
764
packet.add_raw(req.body)
765
packet.parse_header!
766
packet = decrypt_inbound_packet(packet)
767
dispatch_inbound_packet(packet)
768
end
769
cli.send_response(resp)
770
end
771
772
rescue ::Exception => e
773
elog("Exception handling request: #{cli.inspect} #{req.inspect}", error: e)
774
end
775
end
776
777
end
778
779
end; end; end
780
781