Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/conat/sync/dstream.ts
1452 views
1
/*
2
Eventually Consistent Distributed Message Stream
3
4
DEVELOPMENT:
5
6
7
# in node -- note the package directory!!
8
~/cocalc/src/packages/backend node
9
10
> s = await require("@cocalc/backend/conat/sync").dstream({name:'test'});
11
> s = await require("@cocalc/backend/conat/sync").dstream({project_id:cc.current().project_id,name:'foo'});0
12
13
See the guide for dkv, since it's very similar, especially for use in a browser.
14
*/
15
16
import { EventEmitter } from "events";
17
import {
18
CoreStream,
19
type RawMsg,
20
type ChangeEvent,
21
type PublishOptions,
22
} from "./core-stream";
23
import { randomId } from "@cocalc/conat/names";
24
import { reuseInFlight } from "@cocalc/util/reuse-in-flight";
25
import { isNumericString } from "@cocalc/util/misc";
26
import refCache from "@cocalc/util/refcache";
27
import {
28
type Client,
29
type Headers,
30
ConatError,
31
} from "@cocalc/conat/core/client";
32
import jsonStableStringify from "json-stable-stringify";
33
import type { JSONValue } from "@cocalc/util/types";
34
import { Configuration } from "./core-stream";
35
import { conat } from "@cocalc/conat/client";
36
import { delay, map as awaitMap } from "awaiting";
37
import { asyncThrottle, until } from "@cocalc/util/async-utils";
38
import {
39
inventory,
40
type Inventory,
41
INVENTORY_UPDATE_INTERVAL,
42
} from "./inventory";
43
import { getLogger } from "@cocalc/conat/client";
44
45
const logger = getLogger("sync:dstream");
46
47
export interface DStreamOptions {
48
// what it's called by us
49
name: string;
50
account_id?: string;
51
project_id?: string;
52
config?: Partial<Configuration>;
53
// only load historic messages starting at the given seq number.
54
start_seq?: number;
55
desc?: JSONValue;
56
57
client?: Client;
58
noAutosave?: boolean;
59
ephemeral?: boolean;
60
61
noCache?: boolean;
62
noInventory?: boolean;
63
}
64
65
export class DStream<T = any> extends EventEmitter {
66
public readonly name: string;
67
private stream: CoreStream;
68
private messages: T[];
69
private raw: RawMsg[];
70
private noAutosave: boolean;
71
// TODO: using Map for these will be better because we use .length a bunch, which is O(n) instead of O(1).
72
private local: { [id: string]: T } = {};
73
private publishOptions: {
74
[id: string]: { headers?: Headers };
75
} = {};
76
private saved: { [seq: number]: T } = {};
77
private opts: DStreamOptions;
78
79
constructor(opts: DStreamOptions) {
80
super();
81
logger.debug("constructor", opts.name);
82
if (opts.client == null) {
83
throw Error("client must be specified");
84
}
85
this.opts = opts;
86
this.noAutosave = !!opts.noAutosave;
87
this.name = opts.name;
88
this.stream = new CoreStream(opts);
89
this.messages = this.stream.messages;
90
this.raw = this.stream.raw;
91
return new Proxy(this, {
92
get(target, prop) {
93
return typeof prop == "string" && isNumericString(prop)
94
? target.get(parseInt(prop))
95
: target[String(prop)];
96
},
97
});
98
}
99
100
private initialized = false;
101
init = async () => {
102
if (this.initialized) {
103
throw Error("init can only be called once");
104
}
105
this.initialized = true;
106
if (this.isClosed()) {
107
throw Error("closed");
108
}
109
this.stream.on("change", this.handleChange);
110
this.stream.on("reset", () => {
111
this.local = {};
112
this.saved = {};
113
});
114
await this.stream.init();
115
this.emit("connected");
116
};
117
118
private handleChange = ({ mesg, raw, msgID }: ChangeEvent<T>) => {
119
if (raw?.seq !== undefined) {
120
delete this.saved[raw.seq];
121
}
122
if (mesg === undefined) {
123
return;
124
}
125
if (msgID) {
126
// this is critical with core-stream.ts, since otherwise there is a moment
127
// when the same message is in both this.local *and* this.messages, and you'll
128
// see it doubled in this.getAll().
129
delete this.local[msgID];
130
}
131
this.emit("change", mesg, raw?.seq);
132
if (this.isStable()) {
133
this.emit("stable");
134
}
135
};
136
137
isStable = () => {
138
for (const _ in this.saved) {
139
return false;
140
}
141
for (const _ in this.local) {
142
return false;
143
}
144
return true;
145
};
146
147
isClosed = () => {
148
return this.stream == null;
149
};
150
151
close = () => {
152
if (this.isClosed()) {
153
return;
154
}
155
logger.debug("close", this.name);
156
const stream = this.stream;
157
stream.removeListener("change", this.handleChange);
158
// @ts-ignore
159
delete this.stream;
160
stream.close();
161
this.emit("closed");
162
this.removeAllListeners();
163
// @ts-ignore
164
delete this.local;
165
// @ts-ignore
166
delete this.messages;
167
// @ts-ignore
168
delete this.raw;
169
// @ts-ignore
170
delete this.opts;
171
};
172
173
get = (n?): T | T[] => {
174
if (this.isClosed()) {
175
throw Error("closed");
176
}
177
if (n == null) {
178
return this.getAll();
179
} else {
180
if (n < this.messages.length) {
181
return this.messages[n];
182
}
183
const v = Object.keys(this.saved);
184
if (n < v.length + this.messages.length) {
185
return this.saved[n - this.messages.length];
186
}
187
return Object.values(this.local)[n - this.messages.length - v.length];
188
}
189
};
190
191
getAll = (): T[] => {
192
if (this.isClosed()) {
193
throw Error("closed");
194
}
195
return [
196
...this.messages,
197
...Object.values(this.saved),
198
...Object.values(this.local),
199
];
200
};
201
202
// sequence number of n-th message
203
seq = (n: number): number | undefined => {
204
if (n < this.raw.length) {
205
return this.raw[n].seq;
206
}
207
const v = Object.keys(this.saved);
208
if (n < v.length + this.raw.length) {
209
return parseInt(v[n - this.raw.length]);
210
}
211
};
212
213
time = (n: number): Date | undefined => {
214
if (this.isClosed()) {
215
throw Error("not initialized");
216
}
217
return this.stream.time(n);
218
};
219
220
// all server assigned times of messages in the stream.
221
times = (): (Date | undefined)[] => {
222
if (this.isClosed()) {
223
throw Error("not initialized");
224
}
225
return this.stream.times();
226
};
227
228
get length(): number {
229
return (
230
this.messages.length +
231
Object.keys(this.saved).length +
232
Object.keys(this.local).length
233
);
234
}
235
236
publish = (
237
mesg: T,
238
// NOTE: if you call this.headers(n) it is NOT visible until
239
// the publish is confirmed. This could be changed with more work if it matters.
240
options?: { headers?: Headers; ttl?: number },
241
): void => {
242
const id = randomId();
243
this.local[id] = mesg;
244
if (options != null) {
245
this.publishOptions[id] = options;
246
}
247
if (!this.noAutosave) {
248
this.save();
249
}
250
this.updateInventory();
251
};
252
253
headers = (n) => {
254
if (this.isClosed()) {
255
throw Error("closed");
256
}
257
return this.stream.headers(n);
258
};
259
260
push = (...args: T[]) => {
261
if (this.isClosed()) {
262
throw Error("closed");
263
}
264
for (const mesg of args) {
265
this.publish(mesg);
266
}
267
};
268
269
hasUnsavedChanges = (): boolean => {
270
if (this.isClosed()) {
271
return false;
272
}
273
return Object.keys(this.local).length > 0;
274
};
275
276
unsavedChanges = (): T[] => {
277
return Object.values(this.local);
278
};
279
280
save = reuseInFlight(async () => {
281
await until(
282
async () => {
283
if (this.isClosed()) {
284
return true;
285
}
286
try {
287
await this.attemptToSave();
288
//console.log("successfully saved");
289
} catch (err) {
290
if (false && !process.env.COCALC_TEST_MODE) {
291
console.log(
292
`WARNING: dstream attemptToSave failed - ${err}`,
293
this.name,
294
);
295
}
296
}
297
return !this.hasUnsavedChanges();
298
},
299
{ start: 150, decay: 1.3, max: 10000 },
300
);
301
});
302
303
private attemptToSave = async () => {
304
if (true) {
305
await this.attemptToSaveBatch();
306
} else {
307
await this.attemptToSaveParallel();
308
}
309
};
310
311
private attemptToSaveBatch = reuseInFlight(async () => {
312
if (this.isClosed()) {
313
throw Error("closed");
314
}
315
const v: { mesg: T; options: PublishOptions }[] = [];
316
const ids = Object.keys(this.local);
317
for (const id of ids) {
318
const mesg = this.local[id];
319
const options = {
320
...this.publishOptions[id],
321
msgID: id,
322
};
323
v.push({ mesg, options });
324
}
325
const w: (
326
| { seq: number; time: number; error?: undefined }
327
| { error: string; code?: any }
328
)[] = await this.stream.publishMany(v);
329
330
if (this.isClosed()) {
331
return;
332
}
333
334
let errors = false;
335
for (let i = 0; i < w.length; i++) {
336
const id = ids[i];
337
if (w[i].error) {
338
const x = w[i] as { error: string; code?: any };
339
if (x.code == "reject") {
340
delete this.local[id];
341
const err = new ConatError(x.error, { code: x.code });
342
// err has mesg and subject set.
343
this.emit("reject", { err, mesg: v[i].mesg });
344
}
345
if (!process.env.COCALC_TEST_MODE) {
346
console.warn(
347
`WARNING -- error saving dstream '${this.name}' -- ${w[i].error}`,
348
);
349
}
350
errors = true;
351
continue;
352
}
353
const { seq } = w[i] as { seq: number };
354
if ((this.raw[this.raw.length - 1]?.seq ?? -1) < seq) {
355
// it still isn't in this.raw
356
this.saved[seq] = v[i].mesg;
357
}
358
delete this.local[id];
359
delete this.publishOptions[id];
360
}
361
if (errors) {
362
throw Error(`there were errors saving dstream '${this.name}'`);
363
}
364
});
365
366
// non-batched version
367
private attemptToSaveParallel = reuseInFlight(async () => {
368
const f = async (id) => {
369
if (this.isClosed()) {
370
throw Error("closed");
371
}
372
const mesg = this.local[id];
373
try {
374
// @ts-ignore
375
const { seq } = await this.stream.publish(mesg, {
376
...this.publishOptions[id],
377
msgID: id,
378
});
379
if (this.isClosed()) {
380
return;
381
}
382
if ((this.raw[this.raw.length - 1]?.seq ?? -1) < seq) {
383
// it still isn't in this.raw
384
this.saved[seq] = mesg;
385
}
386
delete this.local[id];
387
delete this.publishOptions[id];
388
} catch (err) {
389
if (err.code == "reject") {
390
delete this.local[id];
391
// err has mesg and subject set.
392
this.emit("reject", { err, mesg });
393
} else {
394
if (!process.env.COCALC_TEST_MODE) {
395
console.warn(
396
`WARNING: problem saving dstream ${this.name} -- ${err}`,
397
);
398
}
399
}
400
}
401
if (this.isStable()) {
402
this.emit("stable");
403
}
404
};
405
// NOTE: ES6 spec guarantees "String keys are returned in the order
406
// in which they were added to the object."
407
const ids = Object.keys(this.local);
408
const MAX_PARALLEL = 50;
409
await awaitMap(ids, MAX_PARALLEL, f);
410
});
411
412
// load older messages starting at start_seq
413
load = async (opts: { start_seq: number }) => {
414
if (this.isClosed()) {
415
throw Error("closed");
416
}
417
await this.stream.load(opts);
418
};
419
420
// this is not synchronous -- it makes sure everything is saved out,
421
// then delete the persistent stream
422
// NOTE: for ephemeral streams, other clients will NOT see the result of a delete (unless they reconnect).
423
delete = async (opts?) => {
424
await this.save();
425
if (this.isClosed()) {
426
throw Error("closed");
427
}
428
return await this.stream.delete(opts);
429
};
430
431
get start_seq(): number | undefined {
432
return this.stream?.start_seq;
433
}
434
435
// get or set config
436
config = async (
437
config: Partial<Configuration> = {},
438
): Promise<Configuration> => {
439
if (this.isClosed()) {
440
throw Error("closed");
441
}
442
return await this.stream.config(config);
443
};
444
445
private updateInventory = asyncThrottle(
446
async () => {
447
if (this.isClosed() || this.opts == null || this.opts.noInventory) {
448
return;
449
}
450
await delay(500);
451
if (this.isClosed()) {
452
return;
453
}
454
let inv: Inventory | undefined = undefined;
455
try {
456
const { account_id, project_id, desc } = this.opts;
457
const inv = await inventory({ account_id, project_id });
458
if (this.isClosed()) {
459
return;
460
}
461
const status = {
462
type: "stream" as "stream",
463
name: this.opts.name,
464
desc,
465
...(await this.stream.inventory()),
466
};
467
inv.set(status);
468
} catch (err) {
469
if (!process.env.COCALC_TEST_MODE) {
470
console.log(
471
`WARNING: unable to update inventory. name='${this.opts.name} -- ${err}'`,
472
);
473
}
474
} finally {
475
// @ts-ignore
476
inv?.close();
477
}
478
},
479
INVENTORY_UPDATE_INTERVAL,
480
{ leading: true, trailing: true },
481
);
482
}
483
484
export const cache = refCache<DStreamOptions, DStream>({
485
name: "dstream",
486
createKey: (options: DStreamOptions) => {
487
if (!options.name) {
488
throw Error("name must be specified");
489
}
490
const { name, account_id, project_id } = options;
491
return jsonStableStringify({ name, account_id, project_id })!;
492
},
493
createObject: async (options: DStreamOptions) => {
494
if (options.client == null) {
495
options = { ...options, client: await conat() };
496
}
497
const dstream = new DStream(options);
498
await dstream.init();
499
return dstream;
500
},
501
});
502
503
export async function dstream<T>(options: DStreamOptions): Promise<DStream<T>> {
504
return await cache(options);
505
}
506
507