Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/project/client.ts
1447 views
1
/*
2
* This file is part of CoCalc: Copyright © 2023 Sagemath, Inc.
3
* License: MS-RSL – see LICENSE.md for details
4
*/
5
6
/*
7
client.ts -- A project viewed as a client for a hub.
8
9
For security reasons, a project does initiate a TCP connection to a hub,
10
but rather hubs initiate TCP connections to projects:
11
12
* MINUS: This makes various things more complicated, e.g., a project
13
might not have any open connection to a hub, but still "want" to write
14
something to the database; in such a case it is simply out of luck
15
and must wait.
16
17
* PLUS: Security is simpler since a hub initiates the connection to
18
a project. A hub doesn't have to receive TCP connections and decide
19
whether or not to trust what is on the other end of those connections.
20
21
That said, this architecture could change, and very little code would change
22
as a result.
23
*/
24
import EventEmitter from "node:events";
25
import fs from "node:fs";
26
import { join } from "node:path";
27
import { FileSystemClient } from "@cocalc/sync-client/lib/client-fs";
28
import { execute_code, uuidsha1 } from "@cocalc/backend/misc_node";
29
import { CoCalcSocket } from "@cocalc/backend/tcp/enable-messaging-protocol";
30
import type { SyncDoc } from "@cocalc/sync/editor/generic/sync-doc";
31
import type { ProjectClient as ProjectClientInterface } from "@cocalc/sync/editor/generic/types";
32
import { SyncString } from "@cocalc/sync/editor/string/sync";
33
import * as synctable2 from "@cocalc/sync/table";
34
import { callback2 } from "@cocalc/util/async-utils";
35
import { PROJECT_HUB_HEARTBEAT_INTERVAL_S } from "@cocalc/util/heartbeat";
36
import * as message from "@cocalc/util/message";
37
import * as misc from "@cocalc/util/misc";
38
import type { CB } from "@cocalc/util/types/callback";
39
import type { ExecuteCodeOptionsWithCallback } from "@cocalc/util/types/execute-code";
40
import * as blobs from "./blobs";
41
import { json } from "./common";
42
import * as data from "./data";
43
import initJupyter from "./jupyter/init";
44
import * as kucalc from "./kucalc";
45
import { getLogger } from "./logger";
46
import * as sage_session from "./sage_session";
47
import synctable_conat from "@cocalc/project/conat/synctable";
48
import pubsub from "@cocalc/project/conat/pubsub";
49
import type { ConatSyncTableFunction } from "@cocalc/conat/sync/synctable";
50
import {
51
callConatService,
52
createConatService,
53
type CallConatServiceFunction,
54
type CreateConatServiceFunction,
55
} from "@cocalc/conat/service";
56
import { connectToConat } from "./conat/connection";
57
import { getSyncDoc } from "@cocalc/project/conat/open-files";
58
import { isDeleted } from "@cocalc/project/conat/listings";
59
60
const winston = getLogger("client");
61
62
const HOME = process.env.HOME ?? "/home/user";
63
64
let DEBUG = !!kucalc.IN_KUCALC;
65
66
export function initDEBUG() {
67
if (DEBUG) {
68
return;
69
}
70
// // Easy way to enable debugging in any project anywhere.
71
const DEBUG_FILE = join(HOME, ".smc-DEBUG");
72
fs.access(DEBUG_FILE, (err) => {
73
if (err) {
74
// no file
75
winston.info(
76
"create this file to enable very verbose debugging:",
77
DEBUG_FILE,
78
);
79
return;
80
} else {
81
DEBUG = true;
82
}
83
winston.info(`DEBUG = ${DEBUG}`);
84
});
85
}
86
87
let client: Client | null = null;
88
89
export function init() {
90
if (client != null) {
91
return client;
92
}
93
client = new Client();
94
return client;
95
}
96
97
export function getClient(): Client {
98
if (client == null) {
99
init();
100
}
101
if (client == null) {
102
throw Error("BUG: Client not initialized!");
103
}
104
return client;
105
}
106
107
let ALREADY_CREATED = false;
108
109
type HubCB = CB<any, { event: "error"; error?: string }>;
110
111
export class Client extends EventEmitter implements ProjectClientInterface {
112
public readonly project_id: string;
113
private _connected: boolean;
114
115
private _hub_callbacks: {
116
[key: string]: HubCB;
117
};
118
private _hub_client_sockets: {
119
[id: string]: {
120
socket: CoCalcSocket;
121
callbacks?: { [id: string]: HubCB | CB<any, string> };
122
activity: Date;
123
};
124
};
125
private _changefeed_sockets: any;
126
private _open_syncstrings?: { [key: string]: SyncString };
127
128
// use to define a logging function that is cleanly used internally
129
dbg = (f: string) => {
130
if (DEBUG && winston) {
131
return (...m) => {
132
return winston.debug(`Client.${f}`, ...m);
133
};
134
} else {
135
return function (..._) {};
136
}
137
};
138
139
private filesystemClient = new FileSystemClient();
140
write_file = this.filesystemClient.write_file;
141
path_read = this.filesystemClient.path_read;
142
path_stat = this.filesystemClient.path_stat;
143
file_size_async = this.filesystemClient.file_size_async;
144
file_stat_async = this.filesystemClient.file_stat_async;
145
watch_file = this.filesystemClient.watch_file;
146
147
constructor() {
148
super();
149
if (ALREADY_CREATED) {
150
throw Error("BUG: Client already created!");
151
}
152
ALREADY_CREATED = true;
153
if (process.env.HOME != null) {
154
// client assumes curdir is HOME
155
process.chdir(process.env.HOME);
156
}
157
this.project_id = data.project_id;
158
this.dbg("constructor")();
159
this.setMaxListeners(300); // every open file/table/sync db listens for connect event, which adds up.
160
// initialize two caches
161
this._hub_callbacks = {};
162
this._hub_client_sockets = {};
163
this._changefeed_sockets = {};
164
this._connected = false;
165
166
// Start listening for syncstrings that have been recently modified, so that we
167
// can open them and provide filesystem and computational support.
168
// TODO: delete this code.
169
//# @_init_recent_syncstrings_table()
170
171
if (kucalc.IN_KUCALC) {
172
kucalc.init(this);
173
}
174
175
misc.bind_methods(this);
176
177
initJupyter();
178
}
179
180
public alert_message({
181
type = "default",
182
title,
183
message,
184
}: {
185
type?: "default";
186
title?: string;
187
message: string;
188
block?: boolean;
189
timeout?: number; // time in seconds
190
}): void {
191
this.dbg("alert_message")(type, title, message);
192
}
193
194
// todo: more could be closed...
195
public close(): void {
196
if (this._open_syncstrings != null) {
197
const object = misc.keys(this._open_syncstrings);
198
for (let _ in object) {
199
const s = this._open_syncstrings[_];
200
s.close();
201
}
202
delete this._open_syncstrings;
203
}
204
//return clearInterval(this._recent_syncstrings_interval);
205
}
206
207
// account_id or project_id of this client
208
public client_id(): string {
209
return this.project_id;
210
}
211
212
public get_project_id(): string {
213
return this.project_id;
214
}
215
216
// true since this client is a project
217
public is_project(): boolean {
218
return true;
219
}
220
221
public is_browser(): boolean {
222
return false;
223
}
224
225
public is_compute_server(): boolean {
226
return false;
227
}
228
229
// false since this client is not a user
230
public is_user(): boolean {
231
return false;
232
}
233
234
public is_signed_in(): boolean {
235
return true;
236
}
237
238
public is_connected(): boolean {
239
return this._connected;
240
}
241
242
// We trust the time on our own compute servers (unlike random user's browser).
243
public server_time(): Date {
244
return new Date();
245
}
246
247
// Declare that the given socket is active right now and can be used for
248
// communication with some hub (the one the socket is connected to).
249
public active_socket(socket: CoCalcSocket): void {
250
const dbg = this.dbg(
251
`active_socket(id=${socket.id},ip='${socket.remoteAddress}')`,
252
);
253
let x = this._hub_client_sockets[socket.id];
254
if (x == null) {
255
dbg();
256
x = this._hub_client_sockets[socket.id] = {
257
socket,
258
callbacks: {},
259
activity: new Date(),
260
};
261
let heartbeat_interval: ReturnType<typeof setInterval> | undefined =
262
undefined;
263
const socket_end = (): void => {
264
if (heartbeat_interval == null) {
265
// alrady destroyed it
266
return;
267
}
268
dbg("ending socket");
269
clearInterval(heartbeat_interval);
270
heartbeat_interval = undefined;
271
if (x.callbacks != null) {
272
for (const id in x.callbacks) {
273
// TODO: is this right? Should we call the callback an {event:error} object?
274
const cb = x.callbacks[id] as CB<any, string>;
275
cb?.("socket closed");
276
}
277
delete x.callbacks; // so additional trigger of end doesn't do anything
278
}
279
delete this._hub_client_sockets[socket.id];
280
dbg(
281
`number of active sockets now equals ${misc.len(
282
this._hub_client_sockets,
283
)}`,
284
);
285
if (misc.len(this._hub_client_sockets) === 0) {
286
this._connected = false;
287
dbg("lost all active sockets");
288
this.emit("disconnected");
289
}
290
socket.end();
291
socket.destroy();
292
};
293
294
socket.on("end", socket_end);
295
socket.on("error", socket_end);
296
297
const check_heartbeat = (): void => {
298
if (
299
socket.heartbeat == null ||
300
Date.now() - socket.heartbeat.getTime() >=
301
1.5 * PROJECT_HUB_HEARTBEAT_INTERVAL_S * 1000
302
) {
303
dbg("heartbeat failed");
304
socket_end();
305
} else {
306
dbg("heartbeat -- socket is working");
307
}
308
};
309
310
heartbeat_interval = setInterval(
311
check_heartbeat,
312
1.5 * PROJECT_HUB_HEARTBEAT_INTERVAL_S * 1000,
313
);
314
315
if (misc.len(this._hub_client_sockets) >= 1) {
316
dbg("CONNECTED!");
317
this._connected = true;
318
this.emit("connected");
319
}
320
} else {
321
x.activity = new Date();
322
}
323
}
324
325
// Handle a mesg coming back from some hub. If we have a callback we call it
326
// for the given message, then return true. Otherwise, return
327
// false, meaning something else should try to handle this message.
328
public handle_mesg(mesg, socket) {
329
const dbg = this.dbg(`handle_mesg(${misc.trunc_middle(json(mesg), 512)})`);
330
const f = this._hub_callbacks[mesg.id];
331
if (f != null) {
332
dbg("calling callback");
333
if (!mesg.multi_response) {
334
delete this._hub_callbacks[mesg.id];
335
delete this._hub_client_sockets[socket.id].callbacks?.[mesg.id];
336
}
337
try {
338
f(mesg);
339
} catch (err) {
340
dbg(`WARNING: error handling message from client. -- ${err}`);
341
}
342
return true;
343
} else {
344
dbg("no callback");
345
return false;
346
}
347
}
348
349
// Get a socket connection to the hub from one in our cache; choose one at random.
350
// There is obviously no guarantee to get the same hub if you call this twice!
351
// Returns undefined if there are currently no connections from any hub to us
352
// (in which case, the project must wait).
353
public get_hub_socket() {
354
const socket_ids = misc.keys(this._hub_client_sockets);
355
this.dbg("get_hub_socket")(
356
`there are ${socket_ids.length} sockets -- ${JSON.stringify(socket_ids)}`,
357
);
358
if (socket_ids.length === 0) {
359
return;
360
}
361
return this._hub_client_sockets[misc.random_choice(socket_ids)].socket;
362
}
363
364
// Send a message to some hub server and await a response (if cb defined).
365
public call(opts: {
366
message: any;
367
timeout?: number; // timeout in seconds; if specified call will error out after this much time
368
socket?: CoCalcSocket; // if specified, use this socket
369
cb?: CB<any, string>; // awaits response if given
370
}) {
371
const dbg = this.dbg(`call(message=${json(opts.message)})`);
372
dbg();
373
const socket =
374
opts.socket != null ? opts.socket : (opts.socket = this.get_hub_socket()); // set socket to best one if no socket specified
375
if (socket == null) {
376
dbg("no sockets");
377
// currently, due to the security model, there's no way out of this; that will change...
378
opts.cb?.("no hubs currently connected to this project");
379
return;
380
}
381
if (opts.cb != null) {
382
let timer;
383
if (opts.timeout) {
384
dbg("configure timeout");
385
const fail = () => {
386
dbg("failed");
387
delete this._hub_callbacks[opts.message.id];
388
opts.cb?.(`timeout after ${opts.timeout}s`);
389
delete opts.cb;
390
};
391
timer = setTimeout(fail, opts.timeout * 1000);
392
}
393
if (opts.message.id == null) {
394
opts.message.id = misc.uuid();
395
}
396
const cb = (this._hub_callbacks[opts.message.id] = (resp) => {
397
//dbg("got response: #{misc.trunc(json(resp),400)}")
398
if (timer != null) {
399
clearTimeout(timer);
400
timer = undefined;
401
}
402
if (resp?.event === "error") {
403
opts.cb?.(resp.error ? resp.error : "error");
404
} else {
405
opts.cb?.(undefined, resp);
406
}
407
});
408
const callbacks = this._hub_client_sockets[socket.id].callbacks;
409
if (callbacks != null) {
410
callbacks[opts.message.id] = cb;
411
}
412
}
413
// Finally, send the message
414
return socket.write_mesg("json", opts.message);
415
}
416
417
// Do a project_query
418
public query({
419
query,
420
options,
421
changes,
422
//standby = false, // **IGNORED**
423
timeout = 30,
424
cb,
425
}: {
426
query: any; // a query (see schema.js)
427
options?: { [key: string]: any }[]; // options to the query, e.g., [{limit:5}] )
428
changes?: boolean; // whether or not to create a changefeed
429
//standby: boolean; // **IGNORED**
430
timeout: number; // how long in *seconds* wait for initial result from hub database call
431
cb: CB<any, string>;
432
}) {
433
if (options != null && !misc.is_array(options)) {
434
throw Error("options must be an array");
435
return;
436
}
437
const mesg = message.query({
438
id: misc.uuid(),
439
query,
440
options,
441
changes,
442
multi_response: changes,
443
});
444
const socket = this.get_hub_socket();
445
if (socket == null) {
446
// It will try later when one is available...
447
cb("no hub socket available");
448
return;
449
}
450
if (changes) {
451
// Record socket for this changefeed in @_changefeed_sockets
452
this._changefeed_sockets[mesg.id] = socket;
453
// CRITICAL: On error or end, send an end error to the synctable, so that it will
454
// attempt to reconnect (and also stop writing to the socket).
455
// This is important, since for project clients
456
// the disconnected event is only emitted when *all* connections from
457
// hubs to the local_hub end. If two connections s1 and s2 are open,
458
// and s1 is used for a sync table, and s1 closes (e.g., hub1 is restarted),
459
// then s2 is still open and no 'disconnected' event is emitted. Nonetheless,
460
// it's important for the project to consider the synctable broken and
461
// try to reconnect it, which in this case it would do using s2.
462
socket.on("error", () => {
463
cb("socket-end");
464
});
465
socket.on("end", () => {
466
cb("socket-end");
467
});
468
}
469
return this.call({
470
message: mesg,
471
timeout,
472
socket,
473
cb,
474
});
475
}
476
477
// Cancel an outstanding changefeed query.
478
private _query_cancel(opts: { id: string; cb?: CB }) {
479
const socket = this._changefeed_sockets[opts.id];
480
if (socket == null) {
481
// nothing to do
482
return opts.cb?.();
483
} else {
484
return this.call({
485
message: message.query_cancel({ id: opts.id }),
486
timeout: 30,
487
socket,
488
cb: opts.cb,
489
});
490
}
491
}
492
493
// ASYNC version
494
public async query_cancel(id) {
495
return await callback2(this._query_cancel, { id });
496
}
497
498
public sync_table(query, options?: any, throttle_changes = undefined) {
499
return synctable2.synctable(query, options, this, throttle_changes);
500
}
501
502
conat = () => connectToConat();
503
504
synctable_conat: ConatSyncTableFunction = async (query, options?) => {
505
return await synctable_conat(query, options);
506
};
507
508
pubsub_conat = async ({ path, name }: { path?: string; name: string }) => {
509
return await pubsub({ path, name });
510
};
511
512
callConatService: CallConatServiceFunction = async (options) => {
513
return await callConatService(options);
514
};
515
516
createConatService: CreateConatServiceFunction = (options) => {
517
return createConatService({
518
...options,
519
project_id: this.project_id,
520
});
521
};
522
523
// WARNING: making two of the exact same sync_string or sync_db will definitely
524
// lead to corruption!
525
526
// Get the synchronized doc with the given path. Returns undefined
527
// if currently no such sync-doc.
528
syncdoc = ({ path }: { path: string }): SyncDoc | undefined => {
529
return getSyncDoc(path);
530
};
531
532
public path_access(opts: { path: string; mode: string; cb: CB }): void {
533
// mode: sub-sequence of 'rwxf' -- see https://nodejs.org/api/fs.html#fs_class_fs_stats
534
// cb(err); err = if any access fails; err=undefined if all access is OK
535
let access = 0;
536
for (let s of opts.mode) {
537
access |= fs[s.toUpperCase() + "_OK"];
538
}
539
return fs.access(opts.path, access, opts.cb);
540
}
541
542
// TODO: exists is deprecated. "To check if a file exists
543
// without manipulating it afterwards, fs.access() is
544
// recommended."
545
public path_exists(opts: { path: string; cb: CB }) {
546
const dbg = this.dbg(`checking if path (='${opts.path}') exists`);
547
dbg();
548
return fs.exists(opts.path, (exists) => {
549
dbg(`returned ${exists}`);
550
opts.cb(undefined, exists);
551
}); // err actually never happens with node.js, so we change api to be more consistent
552
}
553
554
// Size of file in bytes (divide by 1000 for K, by 10^6 for MB.)
555
public file_size(opts: { filename: string; cb: CB }): void {
556
this.path_stat({
557
path: opts.filename,
558
cb: (err, stat) => {
559
opts.cb(err, stat?.size);
560
},
561
});
562
}
563
564
// execute a command using the shell or a subprocess -- see docs for execute_code in misc_node.
565
public shell(opts: ExecuteCodeOptionsWithCallback): void {
566
execute_code(opts);
567
}
568
569
// return new sage session -- the code that actually calls this is in the @cocalc/sync package
570
// in "packages/sync/editor/generic/evaluator.ts"
571
public sage_session({
572
path,
573
}: {
574
path: string; // the path to the *worksheet* file
575
}): sage_session.SageSessionType {
576
return sage_session.sage_session({ path, client: this });
577
}
578
579
// Save a blob to the central db blobstore.
580
// The sha1 is optional.
581
public save_blob({
582
blob,
583
sha1,
584
uuid: optsUUID,
585
cb,
586
}: {
587
blob: Buffer; // Buffer of data
588
sha1?: string;
589
uuid?: string; // if given then uuid must be derived from sha1 hash
590
cb?: (err: string | undefined, resp?: any) => void;
591
}) {
592
const uuid = optsUUID ?? uuidsha1(blob, sha1);
593
const dbg = this.dbg(`save_blob(uuid='${uuid}')`);
594
const hub = this.get_hub_socket();
595
if (hub == null) {
596
dbg("fail -- no global hubs");
597
cb?.(
598
"no global hubs are connected to the local hub, so nowhere to send file",
599
);
600
return;
601
}
602
dbg("sending blob mesg");
603
hub.write_mesg("blob", { uuid, blob });
604
dbg("waiting for response");
605
blobs.receive_save_blob_message({
606
sha1: uuid,
607
cb: (resp): void => {
608
if (resp?.error) {
609
dbg(`fail -- '${resp.error}'`);
610
cb?.(resp.error, resp);
611
} else {
612
dbg("success");
613
cb?.(undefined, resp);
614
}
615
},
616
});
617
}
618
619
public get_blob(opts: {
620
blob: Buffer; // Buffer of data
621
sha1?: string;
622
uuid?: string; // if given is uuid derived from sha1
623
cb?: (err: string) => void; // (err, resp)
624
}) {
625
const dbg = this.dbg("get_blob");
626
dbg(opts.sha1);
627
opts.cb?.("get_blob: not implemented");
628
}
629
630
// no-op; assumed async api
631
touch_project(_project_id: string, _compute_server_id?: number) {}
632
633
// Return true if the file was explicitly deleted.
634
// Returns unknown if don't know
635
// Returns false if definitely not.
636
public is_deleted(
637
filename: string,
638
_project_id: string,
639
): boolean | undefined {
640
return isDeleted(filename);
641
}
642
643
public async set_deleted(
644
_filename: string,
645
_project_id?: string,
646
): Promise<void> {
647
// DEPRECATED
648
this.dbg("set_deleted: DEPRECATED");
649
}
650
}
651
652