Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/conat/persist/client.ts
1452 views
1
import {
2
type Message as ConatMessage,
3
type Client,
4
type MessageData,
5
ConatError,
6
} from "@cocalc/conat/core/client";
7
import { type ConatSocketClient } from "@cocalc/conat/socket";
8
import { EventIterator } from "@cocalc/util/event-iterator";
9
import type {
10
StorageOptions,
11
Configuration,
12
SetOperation,
13
DeleteOperation,
14
StoredMessage,
15
PartialInventory,
16
} from "./storage";
17
export { StoredMessage, StorageOptions };
18
import { persistSubject, type User } from "./util";
19
import { assertHasWritePermission as assertHasWritePermission0 } from "./auth";
20
import { refCacheSync } from "@cocalc/util/refcache";
21
import { EventEmitter } from "events";
22
import { getLogger } from "@cocalc/conat/client";
23
import { delay } from "awaiting";
24
25
const logger = getLogger("persist:client");
26
27
export type ChangefeedEvent = (SetOperation | DeleteOperation)[];
28
29
export type Changefeed = EventIterator<ChangefeedEvent>;
30
31
// const paths = new Set<string>();
32
33
export { type PersistStreamClient };
34
class PersistStreamClient extends EventEmitter {
35
public socket: ConatSocketClient;
36
private changefeeds: any[] = [];
37
private state: "ready" | "closed" = "ready";
38
private lastSeq?: number;
39
private reconnecting = false;
40
private gettingMissed = false;
41
private changesWhenGettingMissed: ChangefeedEvent[] = [];
42
43
constructor(
44
private client: Client,
45
private storage: StorageOptions,
46
private user: User,
47
) {
48
super();
49
// paths.add(this.storage.path);
50
logger.debug("constructor", this.storage);
51
this.init();
52
}
53
54
private init = () => {
55
if (this.client.state == "closed") {
56
this.close();
57
return;
58
}
59
if (this.state == "closed") {
60
return;
61
}
62
this.socket?.close();
63
// console.log("making a socket connection to ", persistSubject(this.user));
64
this.socket = this.client.socket.connect(persistSubject(this.user), {
65
desc: `persist: ${this.storage.path}`,
66
reconnection: false,
67
});
68
logger.debug(
69
"init",
70
this.storage.path,
71
"connecting to ",
72
persistSubject(this.user),
73
);
74
// console.log(
75
// "persist -- create",
76
// this.storage.path,
77
// paths,
78
// "with id=",
79
// this.socket.id,
80
// );
81
this.socket.write({
82
storage: this.storage,
83
changefeed: this.changefeeds.length > 0,
84
});
85
86
// get any messages from the stream that we missed while offline.
87
if (this.reconnecting) {
88
this.getMissed();
89
}
90
91
this.socket.once("disconnected", () => {
92
this.reconnecting = true;
93
this.socket.removeAllListeners();
94
setTimeout(this.init, 1000);
95
});
96
this.socket.once("closed", () => {
97
this.reconnecting = true;
98
this.socket.removeAllListeners();
99
setTimeout(this.init, 1000);
100
});
101
102
this.socket.on("data", (updates, headers) => {
103
if (updates == null && headers != null) {
104
// has to be an error
105
this.emit(
106
"error",
107
new ConatError(headers?.error, { code: headers?.code }),
108
);
109
this.close();
110
}
111
if (this.gettingMissed) {
112
this.changesWhenGettingMissed.push(updates);
113
} else {
114
this.changefeedEmit(updates);
115
}
116
});
117
};
118
119
private getMissed = async () => {
120
try {
121
this.gettingMissed = true;
122
this.changesWhenGettingMissed.length = 0;
123
while (this.state == "ready") {
124
try {
125
await this.socket.waitUntilReady(90000);
126
break;
127
} catch {
128
// timeout
129
await delay(1000);
130
}
131
}
132
// console.log("getMissed", {
133
// path: this.storage.path,
134
// lastSeq: this.lastSeq,
135
// changefeeds: this.changefeeds.length,
136
// });
137
if (this.changefeeds.length == 0) {
138
return;
139
}
140
// we are resuming after a disconnect when we had some data up to lastSeq.
141
// let's grab anything we missed.
142
const sub = await this.socket.requestMany(null, {
143
headers: {
144
cmd: "getAll",
145
start_seq: this.lastSeq,
146
timeout: 15000,
147
} as any,
148
timeout: 15000,
149
maxWait: 15000,
150
});
151
for await (const { data: updates, headers } of sub) {
152
if (headers?.error) {
153
// give up
154
return;
155
}
156
if (updates == null || this.socket.state == "closed") {
157
// done
158
return;
159
}
160
this.changefeedEmit(updates);
161
}
162
} finally {
163
this.gettingMissed = false;
164
for (const updates of this.changesWhenGettingMissed) {
165
this.changefeedEmit(updates);
166
}
167
this.changesWhenGettingMissed.length = 0;
168
}
169
};
170
171
private changefeedEmit = (updates: ChangefeedEvent) => {
172
updates = updates.filter((update) => {
173
if (update.op == "delete") {
174
return true;
175
} else {
176
if (update.seq > (this.lastSeq ?? 0)) {
177
this.lastSeq = update.seq;
178
return true;
179
}
180
}
181
return false;
182
});
183
if (updates.length == 0) {
184
return;
185
}
186
this.emit("changefeed", updates);
187
};
188
189
close = () => {
190
logger.debug("close", this.storage);
191
// paths.delete(this.storage.path);
192
// console.log("persist -- close", this.storage.path, paths);
193
this.state = "closed";
194
this.emit("closed");
195
for (const iter of this.changefeeds) {
196
iter.close();
197
this.changefeeds.length = 0;
198
}
199
this.socket.close();
200
};
201
202
// The changefeed is *guaranteed* to deliver every message
203
// in the stream **exactly once and in order**, even if there
204
// are disconnects, failovers, etc. Dealing with dropped messages,
205
// duplicates, etc., is NOT the responsibility of clients.
206
changefeed = async (): Promise<Changefeed> => {
207
// activate changefeed mode (so server publishes updates -- this is idempotent)
208
const resp = await this.socket.request(null, {
209
headers: {
210
cmd: "changefeed",
211
},
212
});
213
if (resp.headers?.error) {
214
throw new ConatError(`${resp.headers?.error}`, {
215
code: resp.headers?.code,
216
});
217
}
218
// an iterator over any updates that are published.
219
const iter = new EventIterator<ChangefeedEvent>(this, "changefeed", {
220
map: (args) => args[0],
221
});
222
this.changefeeds.push(iter);
223
return iter;
224
};
225
226
set = async ({
227
key,
228
ttl,
229
previousSeq,
230
msgID,
231
messageData,
232
timeout,
233
}: SetOptions & { timeout?: number }): Promise<{
234
seq: number;
235
time: number;
236
}> => {
237
return this.checkForError(
238
await this.socket.request(null, {
239
raw: messageData.raw,
240
encoding: messageData.encoding,
241
headers: {
242
headers: messageData.headers,
243
cmd: "set",
244
key,
245
ttl,
246
previousSeq,
247
msgID,
248
timeout,
249
},
250
timeout,
251
}),
252
);
253
};
254
255
setMany = async (
256
ops: SetOptions[],
257
{ timeout }: { timeout?: number } = {},
258
): Promise<
259
({ seq: number; time: number } | { error: string; code?: any })[]
260
> => {
261
return this.checkForError(
262
await this.socket.request(ops, {
263
headers: {
264
cmd: "setMany",
265
timeout,
266
},
267
timeout,
268
}),
269
);
270
};
271
272
delete = async ({
273
timeout,
274
seq,
275
last_seq,
276
all,
277
}: {
278
timeout?: number;
279
seq?: number;
280
last_seq?: number;
281
all?: boolean;
282
}): Promise<{ seqs: number[] }> => {
283
return this.checkForError(
284
await this.socket.request(null, {
285
headers: {
286
cmd: "delete",
287
seq,
288
last_seq,
289
all,
290
timeout,
291
},
292
timeout,
293
}),
294
);
295
};
296
297
config = async ({
298
config,
299
timeout,
300
}: {
301
config?: Partial<Configuration>;
302
timeout?: number;
303
} = {}): Promise<Configuration> => {
304
return this.checkForError(
305
await this.socket.request(null, {
306
headers: {
307
cmd: "config",
308
config,
309
timeout,
310
} as any,
311
timeout,
312
}),
313
);
314
};
315
316
inventory = async (timeout?): Promise<PartialInventory> => {
317
return this.checkForError(
318
await this.socket.request(null, {
319
headers: {
320
cmd: "inventory",
321
} as any,
322
timeout,
323
}),
324
);
325
};
326
327
get = async ({
328
seq,
329
key,
330
timeout,
331
}: {
332
timeout?: number;
333
} & (
334
| { seq: number; key?: undefined }
335
| { key: string; seq?: undefined }
336
)): Promise<ConatMessage | undefined> => {
337
const resp = await this.socket.request(null, {
338
headers: { cmd: "get", seq, key, timeout } as any,
339
timeout,
340
});
341
this.checkForError(resp, true);
342
if (resp.headers == null) {
343
return undefined;
344
}
345
return resp;
346
};
347
348
// returns async iterator over arrays of stored messages
349
async *getAll({
350
start_seq,
351
end_seq,
352
timeout,
353
maxWait,
354
}: {
355
start_seq?: number;
356
end_seq?: number;
357
timeout?: number;
358
maxWait?: number;
359
} = {}): AsyncGenerator<StoredMessage[], void, unknown> {
360
const sub = await this.socket.requestMany(null, {
361
headers: {
362
cmd: "getAll",
363
start_seq,
364
end_seq,
365
timeout,
366
} as any,
367
timeout,
368
maxWait,
369
});
370
for await (const { data, headers } of sub) {
371
if (headers?.error) {
372
throw new ConatError(`${headers.error}`, { code: headers.code });
373
}
374
if (data == null || this.socket.state == "closed") {
375
// done
376
return;
377
}
378
yield data;
379
}
380
}
381
382
keys = async ({ timeout }: { timeout?: number } = {}): Promise<string[]> => {
383
return this.checkForError(
384
await this.socket.request(null, {
385
headers: { cmd: "keys", timeout } as any,
386
timeout,
387
}),
388
);
389
};
390
391
sqlite = async ({
392
timeout,
393
statement,
394
params,
395
}: {
396
timeout?: number;
397
statement: string;
398
params?: any[];
399
}): Promise<any[]> => {
400
return this.checkForError(
401
await this.socket.request(null, {
402
headers: {
403
cmd: "sqlite",
404
statement,
405
params,
406
} as any,
407
timeout,
408
}),
409
);
410
};
411
412
private checkForError = (mesg, noReturn = false) => {
413
if (mesg.headers != null) {
414
const { error, code } = mesg.headers;
415
if (error || code) {
416
throw new ConatError(error ?? "error", { code });
417
}
418
}
419
if (!noReturn) {
420
return mesg.data;
421
}
422
};
423
424
// id of the remote server we're connected to
425
serverId = async () => {
426
return this.checkForError(
427
await this.socket.request(null, {
428
headers: { cmd: "serverId" },
429
}),
430
);
431
};
432
}
433
434
export interface SetOptions {
435
messageData: MessageData;
436
key?: string;
437
ttl?: number;
438
previousSeq?: number;
439
msgID?: string;
440
timeout?: number;
441
}
442
443
interface Options {
444
client: Client;
445
// who is accessing persistent storage
446
user: User;
447
// what storage they are accessing
448
storage: StorageOptions;
449
noCache?: boolean;
450
}
451
452
export const stream = refCacheSync<Options, PersistStreamClient>({
453
name: "persistent-stream-client",
454
createKey: ({ user, storage, client }: Options) => {
455
return JSON.stringify([user, storage, client.id]);
456
},
457
createObject: ({ client, user, storage }: Options) => {
458
// avoid wasting server resources, etc., by always checking permissions client side first
459
assertHasWritePermission({ user, storage });
460
return new PersistStreamClient(client, storage, user);
461
},
462
});
463
464
let permissionChecks = true;
465
export function disablePermissionCheck() {
466
if (!process.env.COCALC_TEST_MODE) {
467
throw Error("disabling permission check only allowed in test mode");
468
}
469
permissionChecks = false;
470
}
471
472
const assertHasWritePermission = ({ user, storage }) => {
473
if (!permissionChecks) {
474
// should only be used for unit testing, since otherwise would
475
// make clients slower and possibly increase server load.
476
return;
477
}
478
const subject = persistSubject(user);
479
assertHasWritePermission0({ subject, path: storage.path });
480
};
481
482