Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/hub/local_hub_connection.coffee
1496 views
1
#########################################################################
2
# This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.
3
# License: MS-RSL – see LICENSE.md for details
4
#########################################################################
5
6
###
7
NOTE/ATTENTION!
8
9
A "local hub" is exactly the same thing as a "project". I just used to call
10
them "local hubs" a very long time ago.
11
12
###
13
14
15
{PROJECT_HUB_HEARTBEAT_INTERVAL_S} = require('@cocalc/util/heartbeat')
16
17
# Connection to a Project (="local hub", for historical reasons only.)
18
19
async = require('async')
20
{callback2} = require('@cocalc/util/async-utils')
21
22
uuid = require('uuid')
23
winston = require('./logger').getLogger('local-hub-connection')
24
underscore = require('underscore')
25
26
message = require('@cocalc/util/message')
27
misc_node = require('@cocalc/backend/misc_node')
28
{connectToLockedSocket} = require("@cocalc/backend/tcp/locked-socket")
29
misc = require('@cocalc/util/misc')
30
{defaults, required} = misc
31
32
blobs = require('./blobs')
33
34
# Blobs (e.g., files dynamically appearing as output in worksheets) are kept for this
35
# many seconds before being discarded. If the worksheet is saved (e.g., by a user's autosave),
36
# then the BLOB is saved indefinitely.
37
BLOB_TTL_S = 60*60*24 # 1 day
38
39
if not process.env.SMC_TEST
40
DEBUG = true
41
42
connect_to_a_local_hub = (opts) -> # opts.cb(err, socket)
43
opts = defaults opts,
44
port : required
45
host : required
46
secret_token : required
47
timeout : 10
48
cb : required
49
50
try
51
socket = await connectToLockedSocket({port:opts.port, host:opts.host, token:opts.secret_token, timeout:opts.timeout})
52
misc_node.enable_mesg(socket, 'connection_to_a_local_hub')
53
opts.cb(undefined, socket)
54
catch err
55
opts.cb(err)
56
57
_local_hub_cache = {}
58
exports.new_local_hub = (project_id, database, projectControl) ->
59
if not project_id?
60
throw "project_id must be specified (it is undefined)"
61
H = _local_hub_cache[project_id]
62
if H?
63
winston.debug("new_local_hub('#{project_id}') -- using cached version")
64
else
65
winston.debug("new_local_hub('#{project_id}') -- creating new one")
66
H = new LocalHub(project_id, database, projectControl)
67
_local_hub_cache[project_id] = H
68
return H
69
70
exports.connect_to_project = (project_id, database, projectControl, cb) ->
71
hub = exports.new_local_hub(project_id, database, projectControl)
72
hub.local_hub_socket (err) ->
73
if err
74
winston.debug("connect_to_project: error ensuring connection to #{project_id} -- #{err}")
75
else
76
winston.debug("connect_to_project: successfully ensured connection to #{project_id}")
77
cb?(err)
78
79
exports.disconnect_from_project = (project_id) ->
80
H = _local_hub_cache[project_id]
81
delete _local_hub_cache[project_id]
82
H?.free_resources()
83
return
84
85
exports.all_local_hubs = () ->
86
v = []
87
for k, h of _local_hub_cache
88
if h?
89
v.push(h)
90
return v
91
92
server_settings = undefined
93
init_server_settings = () ->
94
server_settings = await require('./servers/server-settings').default()
95
update = () ->
96
winston.debug("local_hub_connection (version might have changed) -- checking on clients")
97
for x in exports.all_local_hubs()
98
x.restart_if_version_too_old()
99
update()
100
server_settings.table.on('change', update)
101
102
class LocalHub # use the function "new_local_hub" above; do not construct this directly!
103
constructor: (@project_id, @database, @projectControl) ->
104
if not server_settings? # module being used -- make sure server_settings is initialized
105
init_server_settings()
106
@_local_hub_socket_connecting = false
107
@_sockets = {} # key = session_uuid:client_id
108
@_sockets_by_client_id = {} #key = client_id, value = list of sockets for that client
109
@call_callbacks = {}
110
@path = '.' # should deprecate - *is* used by some random code elsewhere in this file
111
@dbg("getting deployed running project")
112
113
init_heartbeat: =>
114
@dbg("init_heartbeat")
115
if @_heartbeat_interval? # already running
116
@dbg("init_heartbeat -- already running")
117
return
118
send_heartbeat = =>
119
@dbg("init_heartbeat -- send")
120
@_socket?.write_mesg('json', message.heartbeat())
121
@_heartbeat_interval = setInterval(send_heartbeat, PROJECT_HUB_HEARTBEAT_INTERVAL_S*1000)
122
123
delete_heartbeat: =>
124
if @_heartbeat_interval?
125
@dbg("delete_heartbeat")
126
clearInterval(@_heartbeat_interval)
127
delete @_heartbeat_interval
128
129
project: (cb) =>
130
try
131
cb(undefined, await @projectControl(@project_id))
132
catch err
133
cb(err)
134
135
dbg: (m) =>
136
## only enable when debugging
137
if DEBUG
138
winston.debug("local_hub('#{@project_id}'): #{misc.to_json(m)}")
139
140
restart: (cb) =>
141
@dbg("restart")
142
@free_resources()
143
try
144
await (await @projectControl(@project_id)).restart()
145
cb()
146
catch err
147
cb(err)
148
149
status: (cb) =>
150
@dbg("status: get status of a project")
151
try
152
cb(undefined, await (await @projectControl(@project_id)).status())
153
catch err
154
cb(err)
155
156
state: (cb) =>
157
@dbg("state: get state of a project")
158
try
159
cb(undefined, await (await @projectControl(@project_id)).state())
160
catch err
161
cb(err)
162
163
free_resources: () =>
164
@dbg("free_resources")
165
@query_cancel_all_changefeeds()
166
@delete_heartbeat()
167
delete @_ephemeral
168
if @_ephemeral_timeout
169
clearTimeout(@_ephemeral_timeout)
170
delete @_ephemeral_timeout
171
delete @address # so we don't continue trying to use old address
172
delete @_status
173
delete @smc_version # so when client next connects we ignore version checks until they tell us their version
174
try
175
@_socket?.end()
176
winston.debug("free_resources: closed main local_hub socket")
177
catch e
178
winston.debug("free_resources: exception closing main _socket: #{e}")
179
delete @_socket
180
for k, s of @_sockets
181
try
182
s.end()
183
winston.debug("free_resources: closed #{k}")
184
catch e
185
winston.debug("free_resources: exception closing a socket: #{e}")
186
@_sockets = {}
187
@_sockets_by_client_id = {}
188
189
free_resources_for_client_id: (client_id) =>
190
v = @_sockets_by_client_id[client_id]
191
if v?
192
@dbg("free_resources_for_client_id(#{client_id}) -- #{v.length} sockets")
193
for socket in v
194
try
195
socket.end()
196
socket.destroy()
197
catch e
198
# do nothing
199
delete @_sockets_by_client_id[client_id]
200
201
# async
202
init_ephemeral: () =>
203
settings = await callback2(@database.get_project_settings, {project_id:@project_id})
204
@_ephemeral = misc.copy_with(settings, ['ephemeral_disk', 'ephemeral_state'])
205
@dbg("init_ephemeral -- #{JSON.stringify(@_ephemeral)}")
206
# cache for 60s
207
@_ephemeral_timeout = setTimeout((() => delete @_ephemeral), 60000)
208
209
ephemeral_disk: () =>
210
if not @_ephemeral?
211
await @init_ephemeral()
212
return @_ephemeral.ephemeral_disk
213
214
ephemeral_state: () =>
215
if not @_ephemeral?
216
await @init_ephemeral()
217
return @_ephemeral.ephemeral_state
218
219
#
220
# Project query support code
221
#
222
mesg_query: (mesg, write_mesg) =>
223
dbg = (m) => winston.debug("mesg_query(project_id='#{@project_id}'): #{misc.trunc(m,200)}")
224
dbg(misc.to_json(mesg))
225
query = mesg.query
226
if not query?
227
write_mesg(message.error(error:"query must be defined"))
228
return
229
if await @ephemeral_state()
230
@dbg("project has ephemeral state")
231
write_mesg(message.error(error:"FATAL -- project has ephemeral state so no database queries are allowed"))
232
return
233
@dbg("project does NOT have ephemeral state")
234
first = true
235
if mesg.changes
236
@_query_changefeeds ?= {}
237
@_query_changefeeds[mesg.id] = true
238
mesg_id = mesg.id
239
@database.user_query
240
project_id : @project_id
241
query : query
242
options : mesg.options
243
changes : if mesg.changes then mesg_id
244
cb : (err, result) =>
245
if result?.action == 'close'
246
err = 'close'
247
if err
248
dbg("project_query error: #{misc.to_json(err)}")
249
if @_query_changefeeds?[mesg_id]
250
delete @_query_changefeeds[mesg_id]
251
write_mesg(message.error(error:err))
252
if mesg.changes and not first
253
# also, assume changefeed got messed up, so cancel it.
254
@database.user_query_cancel_changefeed(id : mesg_id)
255
else
256
#if Math.random() <= .3 # for testing -- force forgetting about changefeed with probability 10%.
257
# delete @_query_changefeeds[mesg_id]
258
if mesg.changes and not first
259
resp = result
260
resp.id = mesg_id
261
resp.multi_response = true
262
else
263
first = false
264
resp = mesg
265
resp.query = result
266
write_mesg(resp)
267
268
mesg_query_cancel: (mesg, write_mesg) =>
269
if not @_query_changefeeds?
270
# no changefeeds
271
write_mesg(mesg)
272
else
273
@database.user_query_cancel_changefeed
274
id : mesg.id
275
cb : (err, resp) =>
276
if err
277
write_mesg(message.error(error:err))
278
else
279
mesg.resp = resp
280
write_mesg(mesg)
281
delete @_query_changefeeds?[mesg.id]
282
283
query_cancel_all_changefeeds: (cb) =>
284
if not @_query_changefeeds? or @_query_changefeeds.length == 0
285
cb?(); return
286
dbg = (m) => winston.debug("query_cancel_all_changefeeds(project_id='#{@project_id}'): #{m}")
287
v = @_query_changefeeds
288
dbg("canceling #{v.length} changefeeds")
289
delete @_query_changefeeds
290
f = (id, cb) =>
291
dbg("canceling id=#{id}")
292
@database.user_query_cancel_changefeed
293
id : id
294
cb : (err) =>
295
if err
296
dbg("FEED: warning #{id} -- error canceling a changefeed #{misc.to_json(err)}")
297
else
298
dbg("FEED: canceled changefeed -- #{id}")
299
cb()
300
async.map(misc.keys(v), f, (err) => cb?(err))
301
302
# async -- throws error if project doesn't have access to string with this id.
303
check_syncdoc_access: (string_id) =>
304
if not typeof string_id == 'string' and string_id.length == 40
305
throw Error('string_id must be specified and valid')
306
return
307
opts =
308
query : "SELECT project_id FROM syncstrings"
309
where : {"string_id = $::CHAR(40)" : string_id}
310
results = await callback2(@database._query, opts)
311
if results.rows.length != 1
312
throw Error("no such syncdoc")
313
if results.rows[0].project_id != @project_id
314
throw Error("project does NOT have access to this syncdoc")
315
return # everything is fine.
316
317
#
318
# end project query support code
319
#
320
321
# local hub just told us its version. Record it. Restart project if hub version too old.
322
local_hub_version: (version) =>
323
winston.debug("local_hub_version: version=#{version}")
324
@smc_version = version
325
@restart_if_version_too_old()
326
327
# If our known version of the project is too old compared to the
328
# current version_min_project in smcu-util/smc-version, then
329
# we restart the project, which updates the code to the latest
330
# version. Only restarts the project if we have an open control
331
# socket to it.
332
# Please make damn sure to update the project code on the compute
333
# server before updating the version, or the project will be
334
# forced to restart and it won't help!
335
restart_if_version_too_old: () =>
336
if not @_socket?
337
# not connected at all -- just return
338
return
339
if not @smc_version?
340
# client hasn't told us their version yet
341
return
342
if server_settings.version.version_min_project <= @smc_version
343
# the project is up to date
344
return
345
if @_restart_goal_version == server_settings.version.version_min_project
346
# We already restarted the project in an attempt to update it to this version
347
# and it didn't get updated. Don't try again until @_restart_version is cleared, since
348
# we don't want to lock a user out of their project due to somebody forgetting
349
# to update code on the compute server! It could also be that the project just
350
# didn't finish restarting.
351
return
352
353
winston.debug("restart_if_version_too_old(#{@project_id}): #{@smc_version}, #{server_settings.version.version_min_project}")
354
# record some stuff so that we don't keep trying to restart the project constantly
355
ver = @_restart_goal_version = server_settings.version.version_min_project # version which we tried to get to
356
f = () =>
357
if @_restart_goal_version == ver
358
delete @_restart_goal_version
359
setTimeout(f, 15*60*1000) # don't try again for at least 15 minutes.
360
361
@dbg("restart_if_version_too_old -- restarting since #{server_settings.version.version_min_project} > #{@smc_version}")
362
@restart (err) =>
363
@dbg("restart_if_version_too_old -- done #{err}")
364
365
# handle incoming JSON messages from the local_hub
366
handle_mesg: (mesg, socket) =>
367
@dbg("local_hub --> hub: received mesg: #{misc.trunc(misc.to_json(mesg), 250)}")
368
if mesg.client_id?
369
# Should we worry about ensuring that message from this local hub are allowed to
370
# send messages to this client? NO. For them to send a message, they would have to
371
# know the client's id, which is a random uuid, assigned each time the user connects.
372
# It obviously is known to the local hub -- but if the user has connected to the local
373
# hub then they should be allowed to receive messages.
374
# *DEPRECATED*
375
return
376
if mesg.event == 'version'
377
@local_hub_version(mesg.version)
378
return
379
if mesg.id?
380
f = @call_callbacks[mesg.id]
381
if f?
382
f(mesg)
383
else
384
winston.debug("handling call from local_hub")
385
write_mesg = (resp) =>
386
resp.id = mesg.id
387
@local_hub_socket (err, sock) =>
388
if not err
389
sock.write_mesg('json', resp)
390
switch mesg.event
391
when 'ping'
392
write_mesg(message.pong())
393
when 'query'
394
@mesg_query(mesg, write_mesg)
395
when 'query_cancel'
396
@mesg_query_cancel(mesg, write_mesg)
397
when 'file_written_to_project'
398
# ignore -- don't care; this is going away
399
return
400
when 'file_read_from_project'
401
# handle elsewhere by the code that requests the file
402
return
403
when 'error'
404
# ignore -- don't care since handler already gone.
405
return
406
else
407
write_mesg(message.error(error:"unknown event '#{mesg.event}'"))
408
return
409
410
handle_blob: (opts) =>
411
opts = defaults opts,
412
uuid : required
413
blob : required
414
415
@dbg("local_hub --> global_hub: received a blob with uuid #{opts.uuid}")
416
# Store blob in DB.
417
blobs.save_blob
418
uuid : opts.uuid
419
blob : opts.blob
420
project_id : @project_id
421
ttl : BLOB_TTL_S
422
check : true # if malicious user tries to overwrite a blob with given sha1 hash, they get an error.
423
database : @database
424
cb : (err, ttl) =>
425
if err
426
resp = message.save_blob(sha1:opts.uuid, error:err)
427
@dbg("handle_blob: error! -- #{err}")
428
else
429
resp = message.save_blob(sha1:opts.uuid, ttl:ttl)
430
431
@local_hub_socket (err, socket) =>
432
if not err
433
socket.write_mesg('json', resp)
434
435
# Connection to the remote local_hub daemon that we use for control.
436
local_hub_socket: (cb) =>
437
if @_socket?
438
#@dbg("local_hub_socket: re-using existing socket")
439
cb(undefined, @_socket)
440
return
441
442
if @_local_hub_socket_connecting
443
@_local_hub_socket_queue.push(cb)
444
@dbg("local_hub_socket: added socket request to existing queue, which now has length #{@_local_hub_socket_queue.length}")
445
return
446
@_local_hub_socket_connecting = true
447
@_local_hub_socket_queue = [cb]
448
connecting_timer = undefined
449
450
cancel_connecting = () =>
451
@_local_hub_socket_connecting = false
452
if @_local_hub_socket_queue?
453
@dbg("local_hub_socket: canceled due to timeout")
454
for c in @_local_hub_socket_queue
455
c?('timeout')
456
delete @_local_hub_socket_queue
457
clearTimeout(connecting_timer)
458
459
# If below fails for 20s for some reason, cancel everything to allow for future attempt.
460
connecting_timer = setTimeout(cancel_connecting, 20000)
461
462
@dbg("local_hub_socket: getting new socket")
463
@new_socket (err, socket) =>
464
if not @_local_hub_socket_queue?
465
# already gave up.
466
return
467
@_local_hub_socket_connecting = false
468
@dbg("local_hub_socket: new_socket returned #{err}")
469
if err
470
for c in @_local_hub_socket_queue
471
c?(err)
472
delete @_local_hub_socket_queue
473
else
474
socket.on 'mesg', (type, mesg) =>
475
switch type
476
when 'blob'
477
@handle_blob(mesg)
478
when 'json'
479
@handle_mesg(mesg, socket)
480
481
socket.on('end', @free_resources)
482
socket.on('close', @free_resources)
483
socket.on('error', @free_resources)
484
485
# Send a hello message to the local hub, so it knows this is the control connection,
486
# and not something else (e.g., a console).
487
socket.write_mesg('json', {event:'hello'})
488
489
for c in @_local_hub_socket_queue
490
c?(undefined, socket)
491
delete @_local_hub_socket_queue
492
493
@_socket = socket
494
@init_heartbeat() # start sending heartbeat over this socket
495
496
# Finally, we wait a bit to see if the version gets sent from
497
# the client. If not, we set it to 0, which will cause a restart,
498
# which will upgrade to a new version that sends versions.
499
# TODO: This code can be deleted after all projects get restarted.
500
check_version_received = () =>
501
if @_socket? and not @smc_version?
502
@smc_version = 0
503
@restart_if_version_too_old()
504
setTimeout(check_version_received, 60*1000)
505
506
cancel_connecting()
507
508
# Get a new connection to the local_hub,
509
# authenticated via the secret_token, and enhanced
510
# to be able to send/receive json and blob messages.
511
new_socket: (cb) => # cb(err, socket)
512
@dbg("new_socket")
513
f = (cb) =>
514
if not @address?
515
cb("no address")
516
return
517
if not @address.port?
518
cb("no port")
519
return
520
if not @address.host?
521
cb("no host")
522
return
523
if not @address.secret_token?
524
cb("no secret_token")
525
return
526
connect_to_a_local_hub
527
port : @address.port
528
host : @address.ip ? @address.host # prefer @address.ip if it exists (e.g., for cocalc-kubernetes); otherwise use host (which is where compute server is).
529
secret_token : @address.secret_token
530
cb : cb
531
socket = undefined
532
async.series([
533
(cb) =>
534
if not @address?
535
@dbg("get address of a working local hub")
536
try
537
@address = await (await @projectControl(@project_id)).address()
538
cb()
539
catch err
540
cb(err)
541
else
542
cb()
543
(cb) =>
544
@dbg("try to connect to local hub socket using last known address")
545
f (err, _socket) =>
546
if not err
547
socket = _socket
548
cb()
549
else
550
@dbg("failed to get address of a working local hub -- #{err}")
551
try
552
@address = await (await @projectControl(@project_id)).address()
553
cb()
554
catch err
555
cb(err)
556
(cb) =>
557
if not socket?
558
@dbg("still don't have our connection -- try again")
559
f (err, _socket) =>
560
socket = _socket; cb(err)
561
else
562
cb()
563
], (err) =>
564
cb(err, socket)
565
)
566
567
remove_multi_response_listener: (id) =>
568
delete @call_callbacks[id]
569
570
call: (opts) =>
571
opts = defaults opts,
572
mesg : required
573
timeout : undefined # NOTE: a nonzero timeout MUST be specified, or we will not even listen for a response from the local hub! (Ensures leaking listeners won't happen.)
574
multi_response : false # if true, timeout ignored; call @remove_multi_response_listener(mesg.id) to remove
575
cb : undefined
576
@dbg("call")
577
if not opts.mesg.id?
578
if opts.timeout or opts.multi_response # opts.timeout being undefined or 0 both mean "don't do it"
579
opts.mesg.id = uuid.v4()
580
581
@local_hub_socket (err, socket) =>
582
if err
583
@dbg("call: failed to get socket -- #{err}")
584
opts.cb?(err)
585
return
586
@dbg("call: get socket -- now writing message to the socket -- #{misc.trunc(misc.to_json(opts.mesg),200)}")
587
socket.write_mesg 'json', opts.mesg, (err) =>
588
if err
589
@free_resources() # at least next time it will get a new socket
590
opts.cb?(err)
591
return
592
if opts.multi_response
593
@call_callbacks[opts.mesg.id] = opts.cb
594
else if opts.timeout
595
# Listen to exactly one response, them remove the listener:
596
@call_callbacks[opts.mesg.id] = (resp) =>
597
delete @call_callbacks[opts.mesg.id]
598
if resp.event == 'error'
599
opts.cb(resp.error)
600
else
601
opts.cb(undefined, resp)
602
# As mentioned above -- there's no else -- if not timeout then
603
# we do not listen for a response.
604
605
# Read a file from a project into memory on the hub.
606
# I think this is used only by the API, but not by browser clients anymore.
607
read_file: (opts) => # cb(err, content_of_file)
608
{path, project_id, archive, cb} = defaults opts,
609
path : required
610
project_id : required
611
archive : 'tar.bz2' # for directories; if directory, then the output object "data" has data.archive=actual extension used.
612
cb : required
613
@dbg("read_file '#{path}'")
614
socket = undefined
615
id = uuid.v4()
616
data = undefined
617
data_uuid = undefined
618
result_archive = undefined
619
620
async.series([
621
# Get a socket connection to the local_hub.
622
(cb) =>
623
@local_hub_socket (err, _socket) =>
624
if err
625
cb(err)
626
else
627
socket = _socket
628
cb()
629
(cb) =>
630
socket.write_mesg('json', message.read_file_from_project(id:id, project_id:project_id, path:path, archive:archive))
631
socket.recv_mesg
632
type : 'json'
633
id : id
634
timeout : 60
635
cb : (mesg) =>
636
switch mesg.event
637
when 'error'
638
cb(mesg.error)
639
when 'file_read_from_project'
640
data_uuid = mesg.data_uuid
641
result_archive = mesg.archive
642
cb()
643
else
644
cb("Unknown mesg event '#{mesg.event}'")
645
(cb) =>
646
socket.recv_mesg
647
type : 'blob'
648
id : data_uuid
649
timeout : 60
650
cb : (_data) =>
651
# recv_mesg returns either a Buffer blob
652
# *or* a {event:'error', error:'the error'} object.
653
# Fortunately `new Buffer().event` is valid (and undefined).
654
if _data.event == 'error'
655
cb(_data.error)
656
else
657
data = _data
658
data.archive = result_archive
659
cb()
660
], (err) =>
661
if err
662
cb(err)
663
else
664
cb(undefined, data)
665
)
666
667
# Write a file to a project
668
# I think this is used only by the API, but not by browser clients anymore.
669
write_file: (opts) => # cb(err)
670
{path, project_id, cb, data} = defaults opts,
671
path : required
672
project_id : required
673
data : required # what to write
674
cb : required
675
@dbg("write_file '#{path}'")
676
id = uuid.v4()
677
data_uuid = uuid.v4()
678
679
@local_hub_socket (err, socket) =>
680
if err
681
opts.cb(err)
682
return
683
mesg = message.write_file_to_project
684
id : id
685
project_id : project_id
686
path : path
687
data_uuid : data_uuid
688
socket.write_mesg('json', mesg)
689
socket.write_mesg('blob', {uuid:data_uuid, blob:data})
690
socket.recv_mesg
691
type : 'json'
692
id : id
693
timeout : 10
694
cb : (mesg) =>
695
switch mesg.event
696
when 'file_written_to_project'
697
opts.cb()
698
when 'error'
699
opts.cb(mesg.error)
700
else
701
opts.cb("unexpected message type '#{mesg.event}'")
702
703