Path: blob/master/src/packages/hub/local_hub_connection.coffee
1496 views
#########################################################################1# This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.2# License: MS-RSL – see LICENSE.md for details3#########################################################################45###6NOTE/ATTENTION!78A "local hub" is exactly the same thing as a "project". I just used to call9them "local hubs" a very long time ago.1011###121314{PROJECT_HUB_HEARTBEAT_INTERVAL_S} = require('@cocalc/util/heartbeat')1516# Connection to a Project (="local hub", for historical reasons only.)1718async = require('async')19{callback2} = require('@cocalc/util/async-utils')2021uuid = require('uuid')22winston = require('./logger').getLogger('local-hub-connection')23underscore = require('underscore')2425message = require('@cocalc/util/message')26misc_node = require('@cocalc/backend/misc_node')27{connectToLockedSocket} = require("@cocalc/backend/tcp/locked-socket")28misc = require('@cocalc/util/misc')29{defaults, required} = misc3031blobs = require('./blobs')3233# Blobs (e.g., files dynamically appearing as output in worksheets) are kept for this34# many seconds before being discarded. If the worksheet is saved (e.g., by a user's autosave),35# then the BLOB is saved indefinitely.36BLOB_TTL_S = 60*60*24 # 1 day3738if not process.env.SMC_TEST39DEBUG = true4041connect_to_a_local_hub = (opts) -> # opts.cb(err, socket)42opts = defaults opts,43port : required44host : required45secret_token : required46timeout : 1047cb : required4849try50socket = await connectToLockedSocket({port:opts.port, host:opts.host, token:opts.secret_token, timeout:opts.timeout})51misc_node.enable_mesg(socket, 'connection_to_a_local_hub')52opts.cb(undefined, socket)53catch err54opts.cb(err)5556_local_hub_cache = {}57exports.new_local_hub = (project_id, database, projectControl) ->58if not project_id?59throw "project_id must be specified (it is undefined)"60H = _local_hub_cache[project_id]61if H?62winston.debug("new_local_hub('#{project_id}') -- using cached version")63else64winston.debug("new_local_hub('#{project_id}') -- creating new one")65H = new LocalHub(project_id, database, projectControl)66_local_hub_cache[project_id] = H67return H6869exports.connect_to_project = (project_id, database, projectControl, cb) ->70hub = exports.new_local_hub(project_id, database, projectControl)71hub.local_hub_socket (err) ->72if err73winston.debug("connect_to_project: error ensuring connection to #{project_id} -- #{err}")74else75winston.debug("connect_to_project: successfully ensured connection to #{project_id}")76cb?(err)7778exports.disconnect_from_project = (project_id) ->79H = _local_hub_cache[project_id]80delete _local_hub_cache[project_id]81H?.free_resources()82return8384exports.all_local_hubs = () ->85v = []86for k, h of _local_hub_cache87if h?88v.push(h)89return v9091server_settings = undefined92init_server_settings = () ->93server_settings = await require('./servers/server-settings').default()94update = () ->95winston.debug("local_hub_connection (version might have changed) -- checking on clients")96for x in exports.all_local_hubs()97x.restart_if_version_too_old()98update()99server_settings.table.on('change', update)100101class LocalHub # use the function "new_local_hub" above; do not construct this directly!102constructor: (@project_id, @database, @projectControl) ->103if not server_settings? # module being used -- make sure server_settings is initialized104init_server_settings()105@_local_hub_socket_connecting = false106@_sockets = {} # key = session_uuid:client_id107@_sockets_by_client_id = {} #key = client_id, value = list of sockets for that client108@call_callbacks = {}109@path = '.' # should deprecate - *is* used by some random code elsewhere in this file110@dbg("getting deployed running project")111112init_heartbeat: =>113@dbg("init_heartbeat")114if @_heartbeat_interval? # already running115@dbg("init_heartbeat -- already running")116return117send_heartbeat = =>118@dbg("init_heartbeat -- send")119@_socket?.write_mesg('json', message.heartbeat())120@_heartbeat_interval = setInterval(send_heartbeat, PROJECT_HUB_HEARTBEAT_INTERVAL_S*1000)121122delete_heartbeat: =>123if @_heartbeat_interval?124@dbg("delete_heartbeat")125clearInterval(@_heartbeat_interval)126delete @_heartbeat_interval127128project: (cb) =>129try130cb(undefined, await @projectControl(@project_id))131catch err132cb(err)133134dbg: (m) =>135## only enable when debugging136if DEBUG137winston.debug("local_hub('#{@project_id}'): #{misc.to_json(m)}")138139restart: (cb) =>140@dbg("restart")141@free_resources()142try143await (await @projectControl(@project_id)).restart()144cb()145catch err146cb(err)147148status: (cb) =>149@dbg("status: get status of a project")150try151cb(undefined, await (await @projectControl(@project_id)).status())152catch err153cb(err)154155state: (cb) =>156@dbg("state: get state of a project")157try158cb(undefined, await (await @projectControl(@project_id)).state())159catch err160cb(err)161162free_resources: () =>163@dbg("free_resources")164@query_cancel_all_changefeeds()165@delete_heartbeat()166delete @_ephemeral167if @_ephemeral_timeout168clearTimeout(@_ephemeral_timeout)169delete @_ephemeral_timeout170delete @address # so we don't continue trying to use old address171delete @_status172delete @smc_version # so when client next connects we ignore version checks until they tell us their version173try174@_socket?.end()175winston.debug("free_resources: closed main local_hub socket")176catch e177winston.debug("free_resources: exception closing main _socket: #{e}")178delete @_socket179for k, s of @_sockets180try181s.end()182winston.debug("free_resources: closed #{k}")183catch e184winston.debug("free_resources: exception closing a socket: #{e}")185@_sockets = {}186@_sockets_by_client_id = {}187188free_resources_for_client_id: (client_id) =>189v = @_sockets_by_client_id[client_id]190if v?191@dbg("free_resources_for_client_id(#{client_id}) -- #{v.length} sockets")192for socket in v193try194socket.end()195socket.destroy()196catch e197# do nothing198delete @_sockets_by_client_id[client_id]199200# async201init_ephemeral: () =>202settings = await callback2(@database.get_project_settings, {project_id:@project_id})203@_ephemeral = misc.copy_with(settings, ['ephemeral_disk', 'ephemeral_state'])204@dbg("init_ephemeral -- #{JSON.stringify(@_ephemeral)}")205# cache for 60s206@_ephemeral_timeout = setTimeout((() => delete @_ephemeral), 60000)207208ephemeral_disk: () =>209if not @_ephemeral?210await @init_ephemeral()211return @_ephemeral.ephemeral_disk212213ephemeral_state: () =>214if not @_ephemeral?215await @init_ephemeral()216return @_ephemeral.ephemeral_state217218#219# Project query support code220#221mesg_query: (mesg, write_mesg) =>222dbg = (m) => winston.debug("mesg_query(project_id='#{@project_id}'): #{misc.trunc(m,200)}")223dbg(misc.to_json(mesg))224query = mesg.query225if not query?226write_mesg(message.error(error:"query must be defined"))227return228if await @ephemeral_state()229@dbg("project has ephemeral state")230write_mesg(message.error(error:"FATAL -- project has ephemeral state so no database queries are allowed"))231return232@dbg("project does NOT have ephemeral state")233first = true234if mesg.changes235@_query_changefeeds ?= {}236@_query_changefeeds[mesg.id] = true237mesg_id = mesg.id238@database.user_query239project_id : @project_id240query : query241options : mesg.options242changes : if mesg.changes then mesg_id243cb : (err, result) =>244if result?.action == 'close'245err = 'close'246if err247dbg("project_query error: #{misc.to_json(err)}")248if @_query_changefeeds?[mesg_id]249delete @_query_changefeeds[mesg_id]250write_mesg(message.error(error:err))251if mesg.changes and not first252# also, assume changefeed got messed up, so cancel it.253@database.user_query_cancel_changefeed(id : mesg_id)254else255#if Math.random() <= .3 # for testing -- force forgetting about changefeed with probability 10%.256# delete @_query_changefeeds[mesg_id]257if mesg.changes and not first258resp = result259resp.id = mesg_id260resp.multi_response = true261else262first = false263resp = mesg264resp.query = result265write_mesg(resp)266267mesg_query_cancel: (mesg, write_mesg) =>268if not @_query_changefeeds?269# no changefeeds270write_mesg(mesg)271else272@database.user_query_cancel_changefeed273id : mesg.id274cb : (err, resp) =>275if err276write_mesg(message.error(error:err))277else278mesg.resp = resp279write_mesg(mesg)280delete @_query_changefeeds?[mesg.id]281282query_cancel_all_changefeeds: (cb) =>283if not @_query_changefeeds? or @_query_changefeeds.length == 0284cb?(); return285dbg = (m) => winston.debug("query_cancel_all_changefeeds(project_id='#{@project_id}'): #{m}")286v = @_query_changefeeds287dbg("canceling #{v.length} changefeeds")288delete @_query_changefeeds289f = (id, cb) =>290dbg("canceling id=#{id}")291@database.user_query_cancel_changefeed292id : id293cb : (err) =>294if err295dbg("FEED: warning #{id} -- error canceling a changefeed #{misc.to_json(err)}")296else297dbg("FEED: canceled changefeed -- #{id}")298cb()299async.map(misc.keys(v), f, (err) => cb?(err))300301# async -- throws error if project doesn't have access to string with this id.302check_syncdoc_access: (string_id) =>303if not typeof string_id == 'string' and string_id.length == 40304throw Error('string_id must be specified and valid')305return306opts =307query : "SELECT project_id FROM syncstrings"308where : {"string_id = $::CHAR(40)" : string_id}309results = await callback2(@database._query, opts)310if results.rows.length != 1311throw Error("no such syncdoc")312if results.rows[0].project_id != @project_id313throw Error("project does NOT have access to this syncdoc")314return # everything is fine.315316#317# end project query support code318#319320# local hub just told us its version. Record it. Restart project if hub version too old.321local_hub_version: (version) =>322winston.debug("local_hub_version: version=#{version}")323@smc_version = version324@restart_if_version_too_old()325326# If our known version of the project is too old compared to the327# current version_min_project in smcu-util/smc-version, then328# we restart the project, which updates the code to the latest329# version. Only restarts the project if we have an open control330# socket to it.331# Please make damn sure to update the project code on the compute332# server before updating the version, or the project will be333# forced to restart and it won't help!334restart_if_version_too_old: () =>335if not @_socket?336# not connected at all -- just return337return338if not @smc_version?339# client hasn't told us their version yet340return341if server_settings.version.version_min_project <= @smc_version342# the project is up to date343return344if @_restart_goal_version == server_settings.version.version_min_project345# We already restarted the project in an attempt to update it to this version346# and it didn't get updated. Don't try again until @_restart_version is cleared, since347# we don't want to lock a user out of their project due to somebody forgetting348# to update code on the compute server! It could also be that the project just349# didn't finish restarting.350return351352winston.debug("restart_if_version_too_old(#{@project_id}): #{@smc_version}, #{server_settings.version.version_min_project}")353# record some stuff so that we don't keep trying to restart the project constantly354ver = @_restart_goal_version = server_settings.version.version_min_project # version which we tried to get to355f = () =>356if @_restart_goal_version == ver357delete @_restart_goal_version358setTimeout(f, 15*60*1000) # don't try again for at least 15 minutes.359360@dbg("restart_if_version_too_old -- restarting since #{server_settings.version.version_min_project} > #{@smc_version}")361@restart (err) =>362@dbg("restart_if_version_too_old -- done #{err}")363364# handle incoming JSON messages from the local_hub365handle_mesg: (mesg, socket) =>366@dbg("local_hub --> hub: received mesg: #{misc.trunc(misc.to_json(mesg), 250)}")367if mesg.client_id?368# Should we worry about ensuring that message from this local hub are allowed to369# send messages to this client? NO. For them to send a message, they would have to370# know the client's id, which is a random uuid, assigned each time the user connects.371# It obviously is known to the local hub -- but if the user has connected to the local372# hub then they should be allowed to receive messages.373# *DEPRECATED*374return375if mesg.event == 'version'376@local_hub_version(mesg.version)377return378if mesg.id?379f = @call_callbacks[mesg.id]380if f?381f(mesg)382else383winston.debug("handling call from local_hub")384write_mesg = (resp) =>385resp.id = mesg.id386@local_hub_socket (err, sock) =>387if not err388sock.write_mesg('json', resp)389switch mesg.event390when 'ping'391write_mesg(message.pong())392when 'query'393@mesg_query(mesg, write_mesg)394when 'query_cancel'395@mesg_query_cancel(mesg, write_mesg)396when 'file_written_to_project'397# ignore -- don't care; this is going away398return399when 'file_read_from_project'400# handle elsewhere by the code that requests the file401return402when 'error'403# ignore -- don't care since handler already gone.404return405else406write_mesg(message.error(error:"unknown event '#{mesg.event}'"))407return408409handle_blob: (opts) =>410opts = defaults opts,411uuid : required412blob : required413414@dbg("local_hub --> global_hub: received a blob with uuid #{opts.uuid}")415# Store blob in DB.416blobs.save_blob417uuid : opts.uuid418blob : opts.blob419project_id : @project_id420ttl : BLOB_TTL_S421check : true # if malicious user tries to overwrite a blob with given sha1 hash, they get an error.422database : @database423cb : (err, ttl) =>424if err425resp = message.save_blob(sha1:opts.uuid, error:err)426@dbg("handle_blob: error! -- #{err}")427else428resp = message.save_blob(sha1:opts.uuid, ttl:ttl)429430@local_hub_socket (err, socket) =>431if not err432socket.write_mesg('json', resp)433434# Connection to the remote local_hub daemon that we use for control.435local_hub_socket: (cb) =>436if @_socket?437#@dbg("local_hub_socket: re-using existing socket")438cb(undefined, @_socket)439return440441if @_local_hub_socket_connecting442@_local_hub_socket_queue.push(cb)443@dbg("local_hub_socket: added socket request to existing queue, which now has length #{@_local_hub_socket_queue.length}")444return445@_local_hub_socket_connecting = true446@_local_hub_socket_queue = [cb]447connecting_timer = undefined448449cancel_connecting = () =>450@_local_hub_socket_connecting = false451if @_local_hub_socket_queue?452@dbg("local_hub_socket: canceled due to timeout")453for c in @_local_hub_socket_queue454c?('timeout')455delete @_local_hub_socket_queue456clearTimeout(connecting_timer)457458# If below fails for 20s for some reason, cancel everything to allow for future attempt.459connecting_timer = setTimeout(cancel_connecting, 20000)460461@dbg("local_hub_socket: getting new socket")462@new_socket (err, socket) =>463if not @_local_hub_socket_queue?464# already gave up.465return466@_local_hub_socket_connecting = false467@dbg("local_hub_socket: new_socket returned #{err}")468if err469for c in @_local_hub_socket_queue470c?(err)471delete @_local_hub_socket_queue472else473socket.on 'mesg', (type, mesg) =>474switch type475when 'blob'476@handle_blob(mesg)477when 'json'478@handle_mesg(mesg, socket)479480socket.on('end', @free_resources)481socket.on('close', @free_resources)482socket.on('error', @free_resources)483484# Send a hello message to the local hub, so it knows this is the control connection,485# and not something else (e.g., a console).486socket.write_mesg('json', {event:'hello'})487488for c in @_local_hub_socket_queue489c?(undefined, socket)490delete @_local_hub_socket_queue491492@_socket = socket493@init_heartbeat() # start sending heartbeat over this socket494495# Finally, we wait a bit to see if the version gets sent from496# the client. If not, we set it to 0, which will cause a restart,497# which will upgrade to a new version that sends versions.498# TODO: This code can be deleted after all projects get restarted.499check_version_received = () =>500if @_socket? and not @smc_version?501@smc_version = 0502@restart_if_version_too_old()503setTimeout(check_version_received, 60*1000)504505cancel_connecting()506507# Get a new connection to the local_hub,508# authenticated via the secret_token, and enhanced509# to be able to send/receive json and blob messages.510new_socket: (cb) => # cb(err, socket)511@dbg("new_socket")512f = (cb) =>513if not @address?514cb("no address")515return516if not @address.port?517cb("no port")518return519if not @address.host?520cb("no host")521return522if not @address.secret_token?523cb("no secret_token")524return525connect_to_a_local_hub526port : @address.port527host : @address.ip ? @address.host # prefer @address.ip if it exists (e.g., for cocalc-kubernetes); otherwise use host (which is where compute server is).528secret_token : @address.secret_token529cb : cb530socket = undefined531async.series([532(cb) =>533if not @address?534@dbg("get address of a working local hub")535try536@address = await (await @projectControl(@project_id)).address()537cb()538catch err539cb(err)540else541cb()542(cb) =>543@dbg("try to connect to local hub socket using last known address")544f (err, _socket) =>545if not err546socket = _socket547cb()548else549@dbg("failed to get address of a working local hub -- #{err}")550try551@address = await (await @projectControl(@project_id)).address()552cb()553catch err554cb(err)555(cb) =>556if not socket?557@dbg("still don't have our connection -- try again")558f (err, _socket) =>559socket = _socket; cb(err)560else561cb()562], (err) =>563cb(err, socket)564)565566remove_multi_response_listener: (id) =>567delete @call_callbacks[id]568569call: (opts) =>570opts = defaults opts,571mesg : required572timeout : undefined # NOTE: a nonzero timeout MUST be specified, or we will not even listen for a response from the local hub! (Ensures leaking listeners won't happen.)573multi_response : false # if true, timeout ignored; call @remove_multi_response_listener(mesg.id) to remove574cb : undefined575@dbg("call")576if not opts.mesg.id?577if opts.timeout or opts.multi_response # opts.timeout being undefined or 0 both mean "don't do it"578opts.mesg.id = uuid.v4()579580@local_hub_socket (err, socket) =>581if err582@dbg("call: failed to get socket -- #{err}")583opts.cb?(err)584return585@dbg("call: get socket -- now writing message to the socket -- #{misc.trunc(misc.to_json(opts.mesg),200)}")586socket.write_mesg 'json', opts.mesg, (err) =>587if err588@free_resources() # at least next time it will get a new socket589opts.cb?(err)590return591if opts.multi_response592@call_callbacks[opts.mesg.id] = opts.cb593else if opts.timeout594# Listen to exactly one response, them remove the listener:595@call_callbacks[opts.mesg.id] = (resp) =>596delete @call_callbacks[opts.mesg.id]597if resp.event == 'error'598opts.cb(resp.error)599else600opts.cb(undefined, resp)601# As mentioned above -- there's no else -- if not timeout then602# we do not listen for a response.603604# Read a file from a project into memory on the hub.605# I think this is used only by the API, but not by browser clients anymore.606read_file: (opts) => # cb(err, content_of_file)607{path, project_id, archive, cb} = defaults opts,608path : required609project_id : required610archive : 'tar.bz2' # for directories; if directory, then the output object "data" has data.archive=actual extension used.611cb : required612@dbg("read_file '#{path}'")613socket = undefined614id = uuid.v4()615data = undefined616data_uuid = undefined617result_archive = undefined618619async.series([620# Get a socket connection to the local_hub.621(cb) =>622@local_hub_socket (err, _socket) =>623if err624cb(err)625else626socket = _socket627cb()628(cb) =>629socket.write_mesg('json', message.read_file_from_project(id:id, project_id:project_id, path:path, archive:archive))630socket.recv_mesg631type : 'json'632id : id633timeout : 60634cb : (mesg) =>635switch mesg.event636when 'error'637cb(mesg.error)638when 'file_read_from_project'639data_uuid = mesg.data_uuid640result_archive = mesg.archive641cb()642else643cb("Unknown mesg event '#{mesg.event}'")644(cb) =>645socket.recv_mesg646type : 'blob'647id : data_uuid648timeout : 60649cb : (_data) =>650# recv_mesg returns either a Buffer blob651# *or* a {event:'error', error:'the error'} object.652# Fortunately `new Buffer().event` is valid (and undefined).653if _data.event == 'error'654cb(_data.error)655else656data = _data657data.archive = result_archive658cb()659], (err) =>660if err661cb(err)662else663cb(undefined, data)664)665666# Write a file to a project667# I think this is used only by the API, but not by browser clients anymore.668write_file: (opts) => # cb(err)669{path, project_id, cb, data} = defaults opts,670path : required671project_id : required672data : required # what to write673cb : required674@dbg("write_file '#{path}'")675id = uuid.v4()676data_uuid = uuid.v4()677678@local_hub_socket (err, socket) =>679if err680opts.cb(err)681return682mesg = message.write_file_to_project683id : id684project_id : project_id685path : path686data_uuid : data_uuid687socket.write_mesg('json', mesg)688socket.write_mesg('blob', {uuid:data_uuid, blob:data})689socket.recv_mesg690type : 'json'691id : id692timeout : 10693cb : (mesg) =>694switch mesg.event695when 'file_written_to_project'696opts.cb()697when 'error'698opts.cb(mesg.error)699else700opts.cb("unexpected message type '#{mesg.event}'")701702703