Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/frontend/conat/client.ts
1503 views
1
import { redux } from "@cocalc/frontend/app-framework";
2
import type { WebappClient } from "@cocalc/frontend/client/client";
3
import { reuseInFlight } from "@cocalc/util/reuse-in-flight";
4
import {
5
type ConatSyncTable,
6
ConatSyncTableFunction,
7
} from "@cocalc/conat/sync/synctable";
8
import { randomId, inboxPrefix } from "@cocalc/conat/names";
9
import { projectSubject } from "@cocalc/conat/names";
10
import { parseQueryWithOptions } from "@cocalc/sync/table/util";
11
import { type HubApi, initHubApi } from "@cocalc/conat/hub/api";
12
import { type ProjectApi, initProjectApi } from "@cocalc/conat/project/api";
13
import { isValidUUID } from "@cocalc/util/misc";
14
import { createOpenFiles, OpenFiles } from "@cocalc/conat/sync/open-files";
15
import { PubSub } from "@cocalc/conat/sync/pubsub";
16
import type { ChatOptions } from "@cocalc/util/types/llm";
17
import { dkv } from "@cocalc/conat/sync/dkv";
18
import { akv } from "@cocalc/conat/sync/akv";
19
import { astream } from "@cocalc/conat/sync/astream";
20
import { dko } from "@cocalc/conat/sync/dko";
21
import { dstream } from "@cocalc/conat/sync/dstream";
22
import { callConatService, createConatService } from "@cocalc/conat/service";
23
import type {
24
CallConatServiceFunction,
25
CreateConatServiceFunction,
26
} from "@cocalc/conat/service";
27
import { listingsClient } from "@cocalc/conat/service/listings";
28
import getTime, { getSkew, init as initTime } from "@cocalc/conat/time";
29
import { llm } from "@cocalc/conat/llm/client";
30
import { inventory } from "@cocalc/conat/sync/inventory";
31
import { EventEmitter } from "events";
32
import {
33
getClient as getClientWithState,
34
setConatClient,
35
type ClientWithState,
36
} from "@cocalc/conat/client";
37
import Cookies from "js-cookie";
38
import { ACCOUNT_ID_COOKIE } from "@cocalc/frontend/client/client";
39
import { info as refCacheInfo } from "@cocalc/util/refcache";
40
import { connect as connectToConat } from "@cocalc/conat/core/client";
41
import type { ConnectionStats } from "@cocalc/conat/core/types";
42
import { appBasePath } from "@cocalc/frontend/customize/app-base-path";
43
import { until } from "@cocalc/util/async-utils";
44
import { delay } from "awaiting";
45
import {
46
deleteRememberMe,
47
setRememberMe,
48
} from "@cocalc/frontend/misc/remember-me";
49
50
export interface ConatConnectionStatus {
51
state: "connected" | "disconnected";
52
reason: string;
53
details: any;
54
stats: ConnectionStats;
55
}
56
57
const DEFAULT_TIMEOUT = 15000;
58
59
const DEBUG = false;
60
61
export class ConatClient extends EventEmitter {
62
client: WebappClient;
63
public hub: HubApi;
64
public sessionId = randomId();
65
private openFilesCache: { [project_id: string]: OpenFiles } = {};
66
private clientWithState: ClientWithState;
67
private _conatClient: null | ReturnType<typeof connectToConat>;
68
public numConnectionAttempts = 0;
69
private automaticallyReconnect;
70
71
constructor(client: WebappClient) {
72
super();
73
this.setMaxListeners(100);
74
this.client = client;
75
this.hub = initHubApi(this.callHub);
76
this.initConatClient();
77
this.on("state", (state) => {
78
this.emit(state);
79
});
80
}
81
82
private setConnectionStatus = (status: Partial<ConatConnectionStatus>) => {
83
const actions = redux?.getActions("page");
84
const store = redux?.getStore("page");
85
if (actions == null || store == null) {
86
return;
87
}
88
const cur = store.get("conat")?.toJS();
89
actions.setState({ conat: { ...cur, ...status } } as any);
90
};
91
92
conat = () => {
93
if (this._conatClient == null) {
94
this.startStatsReporter();
95
const address = location.origin + appBasePath;
96
this._conatClient = connectToConat({
97
address,
98
inboxPrefix: inboxPrefix({ account_id: this.client.account_id }),
99
// it is necessary to manually managed reconnects due to a bugs
100
// in socketio that has stumped their devs
101
// -- https://github.com/socketio/socket.io/issues/5197
102
reconnection: false,
103
});
104
this._conatClient.on("connected", () => {
105
this.setConnectionStatus({
106
state: "connected",
107
reason: "",
108
details: "",
109
stats: this._conatClient?.stats,
110
});
111
this.client.emit("connected");
112
this.automaticallyReconnect = true;
113
});
114
this._conatClient.on("disconnected", (reason, details) => {
115
this.setConnectionStatus({
116
state: "disconnected",
117
reason,
118
details,
119
stats: this._conatClient?.stats,
120
});
121
this.client.emit("disconnected", "offline");
122
if (this.automaticallyReconnect) {
123
setTimeout(this.connect, 1000);
124
}
125
});
126
this._conatClient.conn.io.on("reconnect_attempt", (attempt) => {
127
this.numConnectionAttempts = attempt;
128
this.client.emit("connecting");
129
});
130
}
131
return this._conatClient!;
132
};
133
134
private permanentlyDisconnected = false;
135
permanentlyDisconnect = () => {
136
this.permanentlyDisconnected = true;
137
this.standby();
138
};
139
140
is_signed_in = (): boolean => {
141
return !!this._conatClient?.info?.user?.account_id;
142
};
143
144
is_connected = (): boolean => {
145
return !!this._conatClient?.conn?.connected;
146
};
147
148
private startStatsReporter = async () => {
149
while (true) {
150
if (this._conatClient != null) {
151
this.setConnectionStatus({ stats: this._conatClient?.stats });
152
}
153
await delay(5000);
154
}
155
};
156
157
private initConatClient = async () => {
158
setConatClient({
159
account_id: this.client.account_id,
160
conat: this.conat,
161
reconnect: async () => this.reconnect(),
162
getLogger: DEBUG
163
? (name) => {
164
return {
165
info: (...args) => console.info(name, ...args),
166
debug: (...args) => console.log(name, ...args),
167
warn: (...args) => console.warn(name, ...args),
168
silly: (...args) => console.log(name, ...args),
169
};
170
}
171
: undefined,
172
});
173
this.clientWithState = getClientWithState();
174
this.clientWithState.on("state", (state) => {
175
if (state != "closed") {
176
this.emit(state);
177
}
178
});
179
initTime();
180
const client = this.conat();
181
client.on("info", (info) => {
182
if (client.info?.user?.account_id) {
183
console.log("Connected as ", JSON.stringify(client.info?.user));
184
this.signedIn({
185
account_id: info.user.account_id,
186
hub: info.id ?? "",
187
});
188
const cookie = Cookies.get(ACCOUNT_ID_COOKIE);
189
if (cookie && cookie != client.info.user.account_id) {
190
// make sure account_id cookie is set to the actual account we're
191
// signed in as, then refresh since some things are going to be
192
// broken otherwise. To test this use dev tools and just change the account_id
193
// cookies value to something random.
194
Cookies.set(ACCOUNT_ID_COOKIE, client.info.user.account_id);
195
// and we're out of here:
196
location.reload();
197
}
198
} else {
199
console.log("Sign in failed -- ", client.info);
200
this.signInFailed(client.info?.user?.error ?? "Failed to sign in.");
201
this.client.alert_message({
202
type: "error",
203
message: "You must sign in.",
204
block: true,
205
});
206
this.standby();
207
}
208
});
209
};
210
211
public signedInMessage?: { account_id: string; hub: string };
212
private signedIn = (mesg: { account_id: string; hub: string }) => {
213
this.signedInMessage = mesg;
214
this.client.account_id = mesg.account_id;
215
setRememberMe(appBasePath);
216
this.client.emit("signed_in", mesg);
217
};
218
219
private signInFailed = (error) => {
220
deleteRememberMe(appBasePath);
221
this.client.emit("remember_me_failed", { error });
222
};
223
224
reconnect = () => {
225
this._conatClient?.conn.io.engine.close();
226
this.resume();
227
};
228
229
// if there is a connection, put it in standby
230
standby = () => {
231
// @ts-ignore
232
this.automaticallyReconnect = false;
233
this._conatClient?.disconnect();
234
};
235
236
// if there is a connection, resume it
237
resume = () => {
238
this.connect();
239
};
240
241
// keep trying until connected.
242
connect = reuseInFlight(async () => {
243
let attempts = 0;
244
await until(
245
async () => {
246
if (this.permanentlyDisconnected) {
247
console.log(
248
"Not connecting -- client is permanently disconnected and must refresh their browser",
249
);
250
return true;
251
}
252
if (this._conatClient == null) {
253
this.conat();
254
}
255
if (this._conatClient?.conn?.connected) {
256
return true;
257
}
258
this._conatClient?.disconnect();
259
await delay(750);
260
await waitForOnline();
261
attempts += 1;
262
console.log(
263
`Connecting to ${this._conatClient?.options.address}: attempts ${attempts}`,
264
);
265
this._conatClient?.conn.io.connect();
266
return false;
267
},
268
{ min: 3000, max: 15000 },
269
);
270
});
271
272
callConatService: CallConatServiceFunction = async (options) => {
273
return await callConatService(options);
274
};
275
276
createConatService: CreateConatServiceFunction = (options) => {
277
return createConatService(options);
278
};
279
280
projectWebsocketApi = async ({
281
project_id,
282
compute_server_id,
283
mesg,
284
timeout = DEFAULT_TIMEOUT,
285
}) => {
286
const cn = this.conat();
287
const subject = projectSubject({
288
project_id,
289
compute_server_id,
290
service: "browser-api",
291
});
292
const resp = await cn.request(subject, mesg, {
293
timeout,
294
waitForInterest: true,
295
});
296
return resp.data;
297
};
298
299
private callHub = async ({
300
service = "api",
301
name,
302
args = [],
303
timeout = DEFAULT_TIMEOUT,
304
}: {
305
service?: string;
306
name: string;
307
args: any[];
308
timeout?: number;
309
}) => {
310
const cn = this.conat();
311
const subject = `hub.account.${this.client.account_id}.${service}`;
312
try {
313
const data = { name, args };
314
const resp = await cn.request(subject, data, { timeout });
315
return resp.data;
316
} catch (err) {
317
err.message = `${err.message} - callHub: subject='${subject}', name='${name}', `;
318
throw err;
319
}
320
};
321
322
// Returns api for RPC calls to the project with typescript support!
323
// if compute_server_id is NOT given then:
324
// if path is given use compute server id for path (assuming mapping is loaded)
325
// if path is not given, use current project default
326
projectApi = ({
327
project_id,
328
compute_server_id,
329
path,
330
timeout = DEFAULT_TIMEOUT,
331
}: {
332
project_id: string;
333
path?: string;
334
compute_server_id?: number;
335
// IMPORTANT: this timeout is only AFTER user is connected.
336
timeout?: number;
337
}): ProjectApi => {
338
if (!isValidUUID(project_id)) {
339
throw Error(`project_id = '${project_id}' must be a valid uuid`);
340
}
341
if (compute_server_id == null) {
342
const actions = redux.getProjectActions(project_id);
343
if (path != null) {
344
compute_server_id =
345
actions.getComputeServerIdForFile({ path }) ??
346
actions.getComputeServerId();
347
} else {
348
compute_server_id = actions.getComputeServerId();
349
}
350
}
351
const callProjectApi = async ({ name, args }) => {
352
return await this.callProject({
353
project_id,
354
compute_server_id,
355
timeout,
356
service: "api",
357
name,
358
args,
359
});
360
};
361
return initProjectApi(callProjectApi);
362
};
363
364
private callProject = async ({
365
service = "api",
366
project_id,
367
compute_server_id,
368
name,
369
args = [],
370
timeout = DEFAULT_TIMEOUT,
371
}: {
372
service?: string;
373
project_id: string;
374
compute_server_id?: number;
375
name: string;
376
args: any[];
377
timeout?: number;
378
}) => {
379
const cn = this.conat();
380
const subject = projectSubject({ project_id, compute_server_id, service });
381
const resp = await cn.request(
382
subject,
383
{ name, args },
384
// we use waitForInterest because often the project hasn't
385
// quite fully started.
386
{ timeout, waitForInterest: true },
387
);
388
return resp.data;
389
};
390
391
synctable: ConatSyncTableFunction = async (
392
query0,
393
options?,
394
): Promise<ConatSyncTable> => {
395
const { query, table } = parseQueryWithOptions(query0, options);
396
if (options?.project_id != null && query[table][0]["project_id"] === null) {
397
query[table][0]["project_id"] = options.project_id;
398
}
399
return await this.conat().sync.synctable({
400
...options,
401
query,
402
account_id: this.client.account_id,
403
});
404
};
405
406
primus = ({
407
project_id,
408
compute_server_id = 0,
409
channel,
410
}: {
411
project_id: string;
412
compute_server_id?: number;
413
channel?: string;
414
}) => {
415
let subject = projectSubject({
416
project_id,
417
compute_server_id,
418
service: "primus",
419
});
420
if (channel) {
421
subject += "." + channel;
422
}
423
return this.conat().socket.connect(subject, {
424
desc: `primus-${channel ?? ""}`,
425
});
426
};
427
428
openFiles = reuseInFlight(async (project_id: string) => {
429
if (this.openFilesCache[project_id] == null) {
430
const openFiles = await createOpenFiles({
431
project_id,
432
});
433
this.openFilesCache[project_id] = openFiles;
434
openFiles.on("closed", () => {
435
delete this.openFilesCache[project_id];
436
});
437
openFiles.on("change", (entry) => {
438
if (entry.deleted?.deleted) {
439
setDeleted({
440
project_id,
441
path: entry.path,
442
deleted: entry.deleted.time,
443
});
444
} else {
445
setNotDeleted({ project_id, path: entry.path });
446
}
447
});
448
const recentlyDeletedPaths: any = {};
449
for (const { path, deleted } of openFiles.getAll()) {
450
if (deleted?.deleted) {
451
recentlyDeletedPaths[path] = deleted.time;
452
}
453
}
454
const store = redux.getProjectStore(project_id);
455
store.setState({ recentlyDeletedPaths });
456
}
457
return this.openFilesCache[project_id]!;
458
});
459
460
closeOpenFiles = (project_id) => {
461
this.openFilesCache[project_id]?.close();
462
};
463
464
pubsub = async ({
465
project_id,
466
path,
467
name,
468
}: {
469
project_id: string;
470
path?: string;
471
name: string;
472
}) => {
473
return new PubSub({ client: this.conat(), project_id, path, name });
474
};
475
476
// Evaluate an llm. This streams the result if stream is given an option,
477
// AND it also always returns the result.
478
llm = async (opts: ChatOptions): Promise<string> => {
479
return await llm({ account_id: this.client.account_id, ...opts });
480
};
481
482
dstream = dstream;
483
astream = astream;
484
dkv = dkv;
485
akv = akv;
486
dko = dko;
487
488
listings = async (opts: {
489
project_id: string;
490
compute_server_id?: number;
491
}) => {
492
return await listingsClient(opts);
493
};
494
495
getTime = (): number => {
496
return getTime();
497
};
498
499
getSkew = async (): Promise<number> => {
500
return await getSkew();
501
};
502
503
inventory = async (location: {
504
account_id?: string;
505
project_id?: string;
506
}) => {
507
const inv = await inventory(location);
508
// @ts-ignore
509
if (console.log_original != null) {
510
const ls_orig = inv.ls;
511
// @ts-ignore
512
inv.ls = (opts) => ls_orig({ ...opts, log: console.log_original });
513
}
514
return inv;
515
};
516
517
refCacheInfo = () => refCacheInfo();
518
}
519
520
function setDeleted({ project_id, path, deleted }) {
521
if (!redux.hasProjectStore(project_id)) {
522
return;
523
}
524
const actions = redux.getProjectActions(project_id);
525
actions.setRecentlyDeleted(path, deleted);
526
}
527
528
function setNotDeleted({ project_id, path }) {
529
if (!redux.hasProjectStore(project_id)) {
530
return;
531
}
532
const actions = redux.getProjectActions(project_id);
533
actions?.setRecentlyDeleted(path, 0);
534
}
535
536
async function waitForOnline(): Promise<void> {
537
if (navigator.onLine) return;
538
await new Promise<void>((resolve) => {
539
const handler = () => {
540
window.removeEventListener("online", handler);
541
resolve();
542
};
543
window.addEventListener("online", handler);
544
});
545
}
546
547