Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.
Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.
Path: blob/master/lib/rex/post/meterpreter/packet_dispatcher.rb
Views: 11784
# -*- coding: binary -*-12require 'rex/post/meterpreter/command_mapper'3require 'rex/post/meterpreter/packet_response_waiter'4require 'rex/exceptions'5require 'pathname'67module Rex8module Post9module Meterpreter1011###12#13# Exception thrown when a request fails.14#15###16class RequestError < ArgumentError17def initialize(command_id, einfo, ecode=nil)18command_name = Rex::Post::Meterpreter::CommandMapper.get_command_name(command_id)1920@method = command_name || "##{command_id}"21@result = einfo22@code = ecode || einfo23end2425def to_s26"#{@method}: Operation failed: #{@result}"27end2829# The method that failed.30attr_reader :method3132# The error result that occurred, typically a windows error message.33attr_reader :result3435# The error result that occurred, typically a windows error code.36attr_reader :code37end3839###40#41# Handles packet transmission, reception, and correlation,42# and processing43#44###45module PacketDispatcher4647# Default time, in seconds, to wait for a response after sending a packet48PACKET_TIMEOUT = 6004950# Number of seconds to wait without getting any packets before we try to51# send a liveness check. A minute should be generous even on the highest52# latency networks53#54# @see #keepalive55PING_TIME = 605657# This mutex is used to lock out new commands during an58# active migration. Unused if this is a passive dispatcher59attr_accessor :comm_mutex6061# The guid that identifies an active Meterpreter session62attr_accessor :session_guid6364# This contains the key material used for TLV encryption65attr_accessor :tlv_enc_key6667# Passive Dispatching68#69# @return [Rex::ServiceManager]70# @return [nil] if this is not a passive dispatcher71attr_accessor :passive_service7273# @return [Array]74attr_accessor :send_queue7576# @return [Array]77attr_accessor :recv_queue7879def initialize_passive_dispatcher80self.send_queue = []81self.recv_queue = []82self.waiters = []83self.alive = true84end8586def shutdown_passive_dispatcher87self.alive = false88self.send_queue = []89self.recv_queue = []90self.waiters = []91end9293def on_passive_request(cli, req)94begin95self.last_checkin = ::Time.now96resp = send_queue.shift97cli.send_response(resp)98rescue => e99send_queue.unshift(resp) if resp100elog("Exception sending a reply to the reader request #{cli.inspect}", error: e)101end102end103104##105#106# Transmission107#108##109110#111# Sends a packet without waiting for a response.112#113def send_packet(packet, opts={})114if self.pivot_session115opts[:session_guid] = self.session_guid116opts[:tlv_enc_key] = self.tlv_enc_key117return self.pivot_session.send_packet(packet, opts)118end119120if opts[:completion_routine]121add_response_waiter(packet, opts[:completion_routine], opts[:completion_param])122end123124session_guid = self.session_guid125tlv_enc_key = self.tlv_enc_key126127# if a session guid is provided, use all the details provided128if opts[:session_guid]129session_guid = opts[:session_guid]130tlv_enc_key = opts[:tlv_enc_key]131end132133log_packet(packet, :send)134135bytes = 0136raw = packet.to_r(session_guid, tlv_enc_key)137err = nil138139# Short-circuit send when using a passive dispatcher140if self.passive_service141send_queue.push(raw)142return raw.size # Lie!143end144145if raw146self.comm_mutex.synchronize do147begin148bytes = self.sock.write(raw)149rescue ::Exception => e150err = e151end152end153154155if bytes.to_i == 0156# Mark the session itself as dead157self.alive = false158159# Reraise the error to the top-level caller160raise err if err161end162end163164return bytes165end166167#168# Sends a packet and waits for a timeout for the given time interval.169#170# @param packet [Packet] request to send171# @param timeout [Integer,nil] seconds to wait for response, or nil to ignore the172# response and return immediately173# @return (see #send_packet_wait_response)174def send_request(packet, timeout = self.response_timeout)175response = send_packet_wait_response(packet, timeout)176177if timeout.nil?178return nil179elsif response.nil?180raise Rex::TimeoutError.new("Send timed out")181elsif (response.result != 0)182einfo = lookup_error(response.result)183e = RequestError.new(packet.method, einfo, response.result)184185e.set_backtrace(caller)186187raise e188end189190return response191end192193#194# Transmits a packet and waits for a response.195#196# @param packet [Packet] the request packet to send197# @param timeout [Integer,nil] number of seconds to wait, or nil to wait198# forever199def send_packet_wait_response(packet, timeout)200if packet.type == PACKET_TYPE_REQUEST && commands.present?201# XXX: Remove this condition once the payloads gem has had another major version bump from 2.x to 3.x and202# rapid7/metasploit-payloads#451 has been landed to correct the `enumextcmd` behavior on Windows. Until then, skip203# proactive validation of Windows core commands. This is not the only instance of this workaround.204windows_core = base_platform == 'windows' && (packet.method - (packet.method % COMMAND_ID_RANGE)) == Rex::Post::Meterpreter::ClientCore.extension_id205206unless windows_core || commands.include?(packet.method)207if (ext_name = Rex::Post::Meterpreter::ExtensionMapper.get_extension_name(packet.method))208unless ext.aliases.include?(ext_name)209raise RequestError.new(packet.method, "The command requires the #{ext_name} extension to be loaded")210end211end212raise RequestError.new(packet.method, "The command is not supported by this Meterpreter type (#{session_type})")213end214end215216# First, add the waiter association for the supplied packet217waiter = add_response_waiter(packet)218219bytes_written = send_packet(packet)220221# Transmit the packet222if (bytes_written.to_i <= 0)223# Remove the waiter if we failed to send the packet.224remove_response_waiter(waiter)225return nil226end227228if not timeout229return nil230end231232# Wait for the supplied time interval233response = waiter.wait(timeout)234235# Remove the waiter from the list of waiters in case it wasn't236# removed. This happens if the waiter timed out above.237remove_response_waiter(waiter)238239# wire in the UUID for this, as it should be part of every response240# packet241if response && !self.payload_uuid242uuid = response.get_tlv_value(TLV_TYPE_UUID)243self.payload_uuid = Msf::Payload::UUID.new({:raw => uuid}) if uuid244end245246# Return the response packet, if any247return response248end249250# Send a ping to the server.251#252# Our 'ping' is a check for eof on channel id 0. This method has no side253# effects and always returns an answer (regardless of the existence of chan254# 0), which is all that's needed for a liveness check. The answer itself is255# unimportant and is ignored.256#257# @return [void]258def keepalive259if @ping_sent260if ::Time.now.to_i - last_checkin.to_i > PING_TIME*2261dlog("No response to ping, session #{self.sid} is dead", LEV_3)262self.alive = false263end264else265pkt = Packet.create_request(COMMAND_ID_CORE_CHANNEL_EOF)266pkt.add_tlv(TLV_TYPE_CHANNEL_ID, 0)267add_response_waiter(pkt, Proc.new { @ping_sent = false })268send_packet(pkt)269@ping_sent = true270end271end272273##274#275# Reception276#277##278279def pivot_keepalive_start280return unless self.send_keepalives281self.receiver_thread = Rex::ThreadFactory.spawn("PivotKeepalive", false) do282while self.alive283begin284Rex::sleep(PING_TIME)285keepalive286rescue ::Exception => e287dlog("Exception caught in pivot keepalive: #{e.class}: #{e}", 'meterpreter', LEV_1)288dlog("Call stack: #{e.backtrace.join("\n")}", 'meterpreter', LEV_2)289self.alive = false290break291end292end293end294end295296#297# Monitors the PacketDispatcher's sock for data in its own298# thread context and parsers all inbound packets.299#300def monitor_socket301302# Skip if we are using a passive dispatcher303return if self.passive_service304305# redirect to pivot keepalive if we're a pivot session306return pivot_keepalive_start if self.pivot_session307308self.comm_mutex = ::Mutex.new309310self.waiters = []311312# This queue is where the new incoming packets end up313@new_packet_queue = ::Queue.new314# This is where we put packets that aren't new, but haven't315# yet been handled.316@incomplete_queue = ::Queue.new317@ping_sent = false318319# Spawn a thread for receiving packets320self.receiver_thread = Rex::ThreadFactory.spawn("MeterpreterReceiver", false) do321while (self.alive)322begin323rv = Rex::ThreadSafe.select([ self.sock.fd ], nil, nil, PING_TIME)324if rv325packet = receive_packet326# Always enqueue the new packets onto the new packet queue327@new_packet_queue << decrypt_inbound_packet(packet) if packet328elsif self.send_keepalives && @new_packet_queue.empty?329keepalive330end331rescue ::Exception => e332dlog("Exception caught in monitor_socket: #{e.class}: #{e}", 'meterpreter', LEV_1)333dlog("Call stack: #{e.backtrace.join("\n")}", 'meterpreter', LEV_2)334self.alive = false335break336end337end338end339340# Spawn a new thread that monitors the socket341self.dispatcher_thread = Rex::ThreadFactory.spawn("MeterpreterDispatcher", false) do342begin343while (self.alive)344# This is where we'll store incomplete packets on345# THIS iteration346incomplete = []347# The backlog is the full list of packets that aims348# to be handled this iteration349backlog = []350351# If we have any left over packets from the previous352# iteration, we need to prioritise them over the new353# packets. If we don't do this, then we end up in354# situations where data on channels can be processed355# out of order. We don't do a blocking wait here via356# the .pop method because we don't want to block, we357# just want to dump the queue.358while @incomplete_queue.length > 0359backlog << @incomplete_queue.pop360end361362# If the backlog is empty, we don't have old/stale363# packets hanging around, so perform a blocking wait364# for the next packet365backlog << @new_packet_queue.pop if backlog.length == 0366# At this point we either received a packet off the wire367# or we had a backlog to process. In either case, we368# perform a non-blocking queue dump to fill the backlog369# with every packet we have.370while @new_packet_queue.length > 0371backlog << @new_packet_queue.pop372end373374#375# Prioritize message processing here376# 1. Close should always be processed at the end377# 2. Command responses always before channel data378#379380tmp_command = []381tmp_channel = []382tmp_close = []383backlog.each do |pkt|384if(pkt.response?)385tmp_command << pkt386next387end388if(pkt.method == COMMAND_ID_CORE_CHANNEL_CLOSE)389tmp_close << pkt390next391end392tmp_channel << pkt393end394395backlog = []396backlog.push(*tmp_command)397backlog.push(*tmp_channel)398backlog.push(*tmp_close)399400#401# Process the message queue402#403backlog.each do |pkt|404405begin406unless dispatch_inbound_packet(pkt)407# Keep Packets in the receive queue until a handler is registered408# for them. Packets will live in the receive queue for up to409# PACKET_TIMEOUT seconds, after which they will be dropped.410#411# A common reason why there would not immediately be a handler for412# a received Packet is in channels, where a connection may413# open and receive data before anything has asked to read.414if (::Time.now.to_i - pkt.created_at.to_i < PACKET_TIMEOUT)415incomplete << pkt416end417end418419rescue ::Exception => e420dlog("Dispatching exception with packet #{pkt}: #{e} #{e.backtrace}", 'meterpreter', LEV_1)421end422end423424# If the backlog and incomplete arrays are the same, it means425# dispatch_inbound_packet wasn't able to handle any of the426# packets. When that's the case, we can get into a situation427# where @new_packet_queue is not empty and, since nothing else bounds this428# loop, we spin CPU trying to handle packets that can't be429# handled. Sleep here to treat that situation as though the430# queue is empty.431if (backlog.length > 0 && backlog.length == incomplete.length)432::IO.select(nil, nil, nil, 0.10)433end434435# If we have any packets that weren't handled, they go back436# on the incomplete queue so that they're prioritised over437# new packets that are coming in off the wire.438dlog("Requeuing #{incomplete.length} packet(s)", 'meterpreter', LEV_1) if incomplete.length > 0439while incomplete.length > 0440@incomplete_queue << incomplete.shift441end442443# If the old queue of packets gets too big...444if(@incomplete_queue.length > 100)445removed = []446# Drop a bunch of them.447(1..25).each {448removed << @incomplete_queue.pop449}450dlog("Backlog has grown to over 100 in monitor_socket, dropping older packets: #{removed.map{|x| x.inspect}.join(" - ")}", 'meterpreter', LEV_1)451end452end453rescue ::Exception => e454dlog("Exception caught in monitor_socket dispatcher: #{e.class} #{e} #{e.backtrace}", 'meterpreter', LEV_1)455ensure456self.receiver_thread.kill if self.receiver_thread457end458end459end460461462#463# Parses data from the dispatcher's sock and returns a Packet context464# once a full packet has been received.465#466def receive_packet467packet = parser.recv(self.sock)468if packet469packet.parse_header!470if self.session_guid == NULL_GUID471self.session_guid = packet.session_guid.dup472end473end474packet475end476477#478# Stop the monitor479#480def monitor_stop481if self.receiver_thread482self.receiver_thread.kill483self.receiver_thread.join484self.receiver_thread = nil485end486487if self.dispatcher_thread488self.dispatcher_thread.kill489self.dispatcher_thread.join490self.dispatcher_thread = nil491end492end493494##495#496# Waiter registration497#498##499500#501# Adds a waiter association with the supplied request packet.502#503def add_response_waiter(request, completion_routine = nil, completion_param = nil)504if self.pivot_session505return self.pivot_session.add_response_waiter(request, completion_routine, completion_param)506end507508waiter = PacketResponseWaiter.new(request.rid, completion_routine, completion_param)509510self.waiters << waiter511512return waiter513end514515#516# Notifies a whomever is waiting for a the supplied response,517# if anyone.518#519def notify_response_waiter(response)520if self.pivot_session521return self.pivot_session.notify_response_waiter(response)522end523524handled = false525self.waiters.each() { |waiter|526if (waiter.waiting_for?(response))527waiter.notify(response)528remove_response_waiter(waiter)529handled = true530break531end532}533return handled534end535536#537# Removes a waiter from the list of waiters.538#539def remove_response_waiter(waiter)540if self.pivot_session541self.pivot_session.remove_response_waiter(waiter)542else543self.waiters.delete(waiter)544end545end546547##548#549# Dispatching550#551##552553#554# Initializes the inbound handlers.555#556def initialize_inbound_handlers557@inbound_handlers = []558end559560#561# Decrypt the given packet with the appropriate key depending on562# if this session is a pivot session or not.563#564def decrypt_inbound_packet(packet)565pivot_session = self.find_pivot_session(packet.session_guid)566tlv_enc_key = self.tlv_enc_key567tlv_enc_key = pivot_session.pivoted_session.tlv_enc_key if pivot_session568packet.from_r(tlv_enc_key)569packet570end571572#573# Dispatches and processes an inbound packet. If the packet is a574# response that has an associated waiter, the waiter is notified.575# Otherwise, the packet is passed onto any registered dispatch576# handlers until one returns success.577#578def dispatch_inbound_packet(packet)579handled = false580581log_packet(packet, :recv)582583# Update our last reply time584self.last_checkin = ::Time.now585586pivot_session = self.find_pivot_session(packet.session_guid)587pivot_session.pivoted_session.last_checkin = self.last_checkin if pivot_session588589# If the packet is a response, try to notify any potential590# waiters591if packet.response? && notify_response_waiter(packet)592return true593end594595# Enumerate all of the inbound packet handlers until one handles596# the packet597@inbound_handlers.each { |handler|598599handled = nil600begin601602if packet.response?603handled = handler.response_handler(self, packet)604else605handled = handler.request_handler(self, packet)606end607608rescue ::Exception => e609dlog("Exception caught in dispatch_inbound_packet: handler=#{handler} #{e.class} #{e} #{e.backtrace}", 'meterpreter', LEV_1)610return true611end612613if (handled)614break615end616}617return handled618end619620#621# Registers an inbound packet handler that implements the622# InboundPacketHandler interface.623#624def register_inbound_handler(handler)625@inbound_handlers << handler626end627628#629# Deregisters a previously registered inbound packet handler.630#631def deregister_inbound_handler(handler)632@inbound_handlers.delete(handler)633end634635def initialize_tlv_logging(opt)636self.tlv_logging_error_occured = false637self.tlv_log_file = nil638self.tlv_log_file_path = nil639self.tlv_log_output = :none640641if opt.casecmp?('console') || opt.casecmp?('true')642self.tlv_log_output = :console643elsif opt.start_with?('file:')644self.tlv_log_output = :file645self.tlv_log_file_path = opt.split('file:').last646end647end648649protected650651attr_accessor :receiver_thread # :nodoc:652attr_accessor :dispatcher_thread # :nodoc:653attr_accessor :waiters # :nodoc:654655attr_accessor :tlv_log_output # :nodoc:656attr_accessor :tlv_log_file # :nodoc:657attr_accessor :tlv_log_file_path # :nodoc:658attr_accessor :tlv_logging_error_occured # :nodoc:659660def shutdown_tlv_logging661self.tlv_log_output = :none662self.tlv_log_file.close unless self.tlv_log_file.nil?663self.tlv_log_file = nil664self.tlv_log_file_path = nil665end666667def log_packet(packet, packet_type)668# if we previously failed to log, return669return if self.tlv_logging_error_occured || self.tlv_log_output == :none670671if self.tlv_log_output == :console672log_packet_to_console(packet, packet_type)673elsif self.tlv_log_output == :file674log_packet_to_file(packet, packet_type)675end676end677678def log_packet_to_console(packet, packet_type)679if packet_type == :send680print "\n%redSEND%clr: #{packet.inspect}\n"681elsif packet_type == :recv682print "\n%bluRECV%clr: #{packet.inspect}\n"683end684end685686def log_packet_to_file(packet, packet_type)687pathname = ::Pathname.new(self.tlv_log_file_path.split('file:').last)688689begin690if self.tlv_log_file.nil? || self.tlv_log_file.path != pathname.to_s691self.tlv_log_file.close unless self.tlv_log_file.nil?692693self.tlv_log_file = ::File.open(pathname, 'a+')694end695696if packet_type == :recv697self.tlv_log_file.puts("\nRECV: #{packet.inspect}\n")698elsif packet_type == :send699self.tlv_log_file.puts("\nSEND: #{packet.inspect}\n")700end701rescue ::StandardError => e702self.tlv_logging_error_occured = true703print_error "Failed writing to TLV Log File: #{pathname} with error: #{e.message}. Turning off logging for this session: #{self.inspect}..."704elog(e)705shutdown_tlv_logging706return707end708end709end710711module HttpPacketDispatcher712def initialize_passive_dispatcher713super714715# Ensure that there is only one leading and trailing slash on the URI716resource_uri = "/" + self.conn_id.to_s.gsub(/(^\/|\/$)/, '') + "/"717self.passive_service = self.passive_dispatcher718self.passive_service.remove_resource(resource_uri)719self.passive_service.add_resource(resource_uri,720'Proc' => Proc.new { |cli, req| on_passive_request(cli, req) },721'VirtualDirectory' => true722)723724# Add a reference count to the handler725self.passive_service.ref726end727728def shutdown_passive_dispatcher729if self.passive_service730# Ensure that there is only one leading and trailing slash on the URI731resource_uri = "/" + self.conn_id.to_s.gsub(/(^\/|\/$)/, '') + "/"732self.passive_service.remove_resource(resource_uri) if self.passive_service733734self.passive_service.deref735self.passive_service = nil736end737super738end739740def on_passive_request(cli, req)741742begin743744resp = Rex::Proto::Http::Response.new(200, "OK")745resp['Content-Type'] = 'application/octet-stream'746resp['Connection'] = 'close'747748self.last_checkin = ::Time.now749750if req.method == 'GET'751rpkt = send_queue.shift752resp.body = rpkt || ''753begin754cli.send_response(resp)755rescue ::Exception => e756send_queue.unshift(rpkt) if rpkt757elog("Exception sending a reply to the reader request #{cli.inspect}", error: e)758end759else760resp.body = ""761if req.body and req.body.length > 0762packet = Packet.new(0)763packet.add_raw(req.body)764packet.parse_header!765packet = decrypt_inbound_packet(packet)766dispatch_inbound_packet(packet)767end768cli.send_response(resp)769end770771rescue ::Exception => e772elog("Exception handling request: #{cli.inspect} #{req.inspect}", error: e)773end774end775776end777778end; end; end779780781