Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.
Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.
Path: blob/master/lib/rex/services/local_relay.rb
Views: 11623
# -*- coding: binary -*-1require 'thread'2require 'rex/socket'3module Rex4module Services56###7#8# This service acts as a local TCP relay whereby clients can connect to a9# local listener that forwards to an arbitrary remote endpoint. Interaction10# with the remote endpoint socket requires that it implement the11# Rex::IO::Stream interface.12#13###14class LocalRelay1516include Rex::Service1718###19#20# This module is used to extend streams such that they can be associated21# with a relay context and the other side of the stream.22#23###24module Stream2526#27# This method is called when the other side has data that has been read28# in.29#30def on_other_data(data)31if relay.on_other_data_proc32relay.on_other_data_proc.call(relay, self, data)33else34put(data)35end36end3738attr_accessor :relay39attr_accessor :other_stream40end4142###43#44# This module is used to extend stream servers such that they can be45# associated with a relay context.46#47###48module StreamServer4950#51# This method is called when the stream server receives a local52# connection such that the remote half can be allocated. The return53# value of the callback should be a Stream instance.54#55def on_local_connection(relay, lfd)56if relay.on_local_connection_proc57relay.on_local_connection_proc.call(relay, lfd)58end59end6061attr_accessor :relay62end6364###65#66# This class acts as an instance of a given local relay.67#68###69class Relay7071def initialize(name, listener, opts = {})72self.name = name73self.listener = listener74self.opts = opts75self.on_local_connection_proc = opts['OnLocalConnection']76self.on_conn_close_proc = opts['OnConnectionClose']77self.on_other_data_proc = opts['OnOtherData']78if (not $dispatcher['rex'])79register_log_source('rex', $dispatcher['core'], get_log_level('core'))80end81end8283def shutdown84begin85listener.shutdown if listener86rescue ::Exception87end88end8990def close91begin92listener.close if listener93rescue ::Exception94end95listener = nil96end9798attr_reader :name, :listener, :opts99attr_accessor :on_local_connection_proc100attr_accessor :on_conn_close_proc101attr_accessor :on_other_data_proc102protected103attr_writer :name, :listener, :opts104105end106107###108#109# This class acts as an instance of a local relay handling a reverse connection110#111###112class ReverseRelay < Relay113114def initialize(name, channel, opts = {})115116self.name = name117self.listener = nil118self.opts = opts119self.on_local_connection_proc = opts['OnLocalConnection']120self.on_conn_close_proc = opts['OnConnectionClose']121self.on_other_data_proc = opts['OnOtherData']122self.channel = channel123124if !$dispatcher['rex']125register_log_source('rex', $dispatcher['core'], get_log_level('core'))126end127end128129def shutdown130# don't need to do anything here, it's only "close" we care about131end132133def close134self.channel.close if self.channel135self.channel = nil136end137138attr_reader :channel139140protected141attr_writer :channel142143end144145#146# Initializes the local tcp relay monitor.147#148def initialize149self.relays = Hash.new150self.rfds = Array.new151self.rev_chans = Array.new152self.relay_thread = nil153self.relay_mutex = Mutex.new154end155156##157#158# Service interface implementors159#160##161162#163# Returns the hardcore alias for the local relay service.164#165def self.hardcore_alias(*args)166"__#{args}"167end168169#170# Returns the alias for this service.171#172def alias173super || "Local Relay"174end175176#177# Starts the thread that monitors the local relays.178#179def start180if (!self.relay_thread)181self.relay_thread = Rex::ThreadFactory.spawn("LocalRelay", false) {182begin183monitor_relays184rescue ::Exception => e185elog("Error in #{self} monitor_relays", 'rex', error: e)186end187}188end189end190191#192# Stops the thread that monitors the local relays and destroys all193# listeners, both local and remote.194#195def stop196if (self.relay_thread)197self.relay_thread.kill198self.relay_thread = nil199end200201self.relay_mutex.synchronize {202self.relays.delete_if { |k, v|203v.shutdown204v.close205true206}207}208209# make sure we kill off active sockets when we shut down210while self.rfds.length > 0211close_relay_conn(self.rfds.shift) rescue nil212end213214# we can safely clear the channels array because all of the215# reverse relays were closed down216self.rev_chans.clear217self.relays.clear218end219220#221# Start a new active listener on the victim ready for reverse connections.222#223def start_reverse_tcp_relay(channel, opts = {})224opts['__RelayType'] = 'tcp'225opts['Reverse'] = true226227name = "Reverse-#{opts['LocalPort']}"228229relay = ReverseRelay.new(name, channel, opts)230231# dirty hack to get "relay" support?232channel.extend(StreamServer)233channel.relay = relay234235self.relay_mutex.synchronize {236self.relays[name] = relay237self.rev_chans << channel238}239relay240end241242#243# Stop an active reverse port forward.244#245def stop_reverse_tcp_relay(rport)246stop_relay("Reverse-#{rport}")247end248249#250# Starts a local TCP relay.251#252def start_tcp_relay(lport, opts = {})253# Make sure our options are valid254if ((opts['PeerHost'] == nil or opts['PeerPort'] == nil) and (opts['Stream'] != true))255raise ArgumentError, "Missing peer host or peer port.", caller256end257258listener = Rex::Socket.create_tcp_server(259'LocalHost' => opts['LocalHost'],260'LocalPort' => lport)261262_, lhost, lport = listener.getlocalname()263opts['LocalHost'] = lhost264opts['LocalPort'] = lport265opts['__RelayType'] = 'tcp'266267start_relay(listener, lport.to_s + opts['LocalHost'], opts)268end269270#271# Starts a local relay on the supplied local port. This listener will call272# the supplied callback procedures when various events occur.273#274def start_relay(stream_server, name, opts = {})275# Create a Relay instance with the local stream and remote stream276relay = Relay.new(name, stream_server, opts)277278# Extend the stream_server so that we can associate it with this relay279stream_server.extend(StreamServer)280stream_server.relay = relay281282# Add the stream associations the appropriate lists and hashes283self.relay_mutex.synchronize {284self.relays[name] = relay285286self.rfds << stream_server287}288relay289end290291#292# Stops relaying on a given local port.293#294def stop_tcp_relay(lport, lhost = nil)295stop_relay(lport.to_s + (lhost || '0.0.0.0'))296end297298#299# Stops a relay with a given name.300#301def stop_relay(name)302rv = false303304self.relay_mutex.synchronize {305relay = self.relays[name]306307if relay308close_relay(relay)309rv = true310end311}312313rv314end315316#317# Enumerate each TCP relay318#319def each_tcp_relay(&block)320self.relays.each_pair { |name, relay|321next if (relay.opts['__RelayType'] != 'tcp')322323yield(324relay.opts['LocalHost'] || '0.0.0.0',325relay.opts['LocalPort'],326relay.opts['PeerHost'],327relay.opts['PeerPort'],328relay.opts)329}330end331332protected333334attr_accessor :relays, :relay_thread, :relay_mutex335attr_accessor :rfds, :rev_chans336337#338# Closes an cleans up a specific relay339#340def close_relay(relay)341342if relay.kind_of?(ReverseRelay)343self.rev_chans.delete(relay.channel)344else345self.rfds.delete(relay.listener)346end347348self.relays.delete(relay.name)349350begin351relay.shutdown352relay.close353rescue IOError354end355end356357#358# Closes a specific relay connection without tearing down the actual relay359# itself.360#361def close_relay_conn(fd)362relay = fd.relay363ofd = fd.other_stream364365self.rfds.delete(fd)366367begin368if relay.on_conn_close_proc369relay.on_conn_close_proc.call(fd)370end371372fd.shutdown373fd.close374rescue IOError375end376377if ofd378self.rfds.delete(ofd)379380begin381if (relay.on_conn_close_proc)382relay.on_conn_close_proc.call(ofd)383end384385ofd.shutdown386ofd.close387rescue IOError388end389end390end391392#393# Attempt to accept a new reverse connection on the given reverse394# relay handle.395#396def accept_reverse_relay(rrfd)397398rfd = rrfd.accept_nonblock399400return unless rfd401402lfd = Rex::Socket::Tcp.create(403'PeerHost' => rrfd.relay.opts['PeerHost'],404'PeerPort' => rrfd.relay.opts['PeerPort'],405'Timeout' => 5406)407408rfd.extend(Stream)409lfd.extend(Stream)410411rfd.relay = rrfd.relay412lfd.relay = rrfd.relay413414self.rfds << lfd415self.rfds << rfd416417rfd.other_stream = lfd418lfd.other_stream = rfd419end420421#422# Accepts a client connection on a local relay.423#424def accept_relay_conn(srvfd)425relay = srvfd.relay426427begin428dlog("Accepting relay client connection...", 'rex', LEV_3)429430# Accept the child connection431lfd = srvfd.accept432dlog("Got left side of relay: #{lfd}", 'rex', LEV_3)433434# Call the relay's on_local_connection method which should return a435# remote connection on success436rfd = srvfd.on_local_connection(relay, lfd)437438dlog("Got right side of relay: #{rfd}", 'rex', LEV_3)439rescue440wlog("Failed to get remote half of local connection on relay #{relay.name}: #{$!}", 'rex')441lfd.close442return443end444445# If we have both sides, then we rock. Extend the instances, associate446# them with the relay, associate them with each other, and add them to447# the list of polling file descriptors448if lfd && rfd449lfd.extend(Stream)450rfd.extend(Stream)451452lfd.relay = relay453rfd.relay = relay454455lfd.other_stream = rfd456rfd.other_stream = lfd457458self.rfds << lfd459self.rfds << rfd460else461# Otherwise, we don't have both sides, we'll close them.462close_relay_conn(lfd)463end464end465466#467# Monitors the relays for data and passes it in both directions.468#469def monitor_relays470begin471# Helps with latency472Thread.current.priority = 2473474# See if we have any new connections on the existing reverse port475# forward relays476rev_chans.each do |rrfd|477accept_reverse_relay(rrfd)478end479480# Poll all the streams...481begin482socks = Rex::ThreadSafe.select(rfds, nil, nil, 0.25)483rescue StreamClosedError => e484dlog("monitor_relays: closing stream #{e.stream}", 'rex', LEV_3)485486# Close the relay connection that is associated with the stream487# closed error488if e.stream.kind_of?(Stream)489close_relay_conn(e.stream)490end491492dlog("monitor_relays: closed stream #{e.stream}", 'rex', LEV_3)493494next495rescue => e496elog("Error in #{self} monitor_relays select:", 'rex', error: e)497return498end499500# If socks is nil, go again.501next unless socks502503# Process read-ready file descriptors, if any.504socks[0].each { |rfd|505506# If this file descriptor is a server, accept the connection507if (rfd.kind_of?(StreamServer))508accept_relay_conn(rfd)509else510# Otherwise, it's a relay connection, read data from one side511# and write it to the other512begin513# Pass the data onto the other fd, most likely writing it.514data = rfd.sysread(65536)515rfd.other_stream.on_other_data(data)516# If we catch an error, close the connection517rescue ::Exception => e518elog("Error in #{self} monitor_relays read", 'rex', error: e)519close_relay_conn(rfd)520end521end522523} if (socks[0])524525end while true526end527528end529530end531end532533534535