Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/conat/sync/core-stream.ts
1452 views
1
/*
2
core-stream.ts = the Core Stream data structure for conat.
3
4
This is the core data structure that easy-to-use ephemeral and persistent
5
streams and kv stores are built on. It is NOT meant to be super easy and
6
simple to use, with save in the background. Instead, operations
7
are async, and the API is complicated. We build dkv, dstream, akv, etc. on
8
top of this with a much friendly API.
9
10
NOTE: unlike in conat, in kv mode, the keys can be any utf-8 string.
11
We use the subject to track communication involving this stream, but
12
otherwise it has no relevant to the keys. Conat's core pub/sub/request/
13
reply model is very similar to NATS, but the analogue of Jetstream is
14
different because I don't find Jetstream useful at all, and find this
15
much more useful.
16
17
DEVELOPMENT:
18
19
~/cocalc/src/packages/backend$ node
20
21
require('@cocalc/backend/conat'); a = require('@cocalc/conat/sync/core-stream'); s = await a.cstream({name:'test'})
22
23
*/
24
25
import { EventEmitter } from "events";
26
import {
27
Message,
28
type Headers,
29
messageData,
30
decode,
31
} from "@cocalc/conat/core/client";
32
import { isNumericString } from "@cocalc/util/misc";
33
import refCache from "@cocalc/util/refcache";
34
import { conat } from "@cocalc/conat/client";
35
import type { Client } from "@cocalc/conat/core/client";
36
import jsonStableStringify from "json-stable-stringify";
37
import type {
38
SetOperation,
39
DeleteOperation,
40
StoredMessage,
41
Configuration,
42
} from "@cocalc/conat/persist/storage";
43
export type { Configuration };
44
import { join } from "path";
45
import {
46
type StorageOptions,
47
type PersistStreamClient,
48
stream as persist,
49
type SetOptions,
50
} from "@cocalc/conat/persist/client";
51
import { reuseInFlight } from "@cocalc/util/reuse-in-flight";
52
import { until } from "@cocalc/util/async-utils";
53
import { type PartialInventory } from "@cocalc/conat/persist/storage";
54
import { getLogger } from "@cocalc/conat/client";
55
56
const logger = getLogger("sync:core-stream");
57
58
const PUBLISH_MANY_BATCH_SIZE = 500;
59
60
const log = (..._args) => {};
61
//const log = console.log;
62
63
// when this many bytes of key:value have been changed (so need to be freed),
64
// we do a garbage collection pass.
65
export const KEY_GC_THRESH = 10 * 1e6;
66
67
// NOTE: when you do delete this.deleteKv(key), we ensure the previous
68
// messages with the given key is completely deleted from sqlite, and
69
// also create a *new* lightweight tombstone. That tombstone has this
70
// ttl, which defaults to DEFAULT_TOMBSTONE_TTL (one week), so the tombstone
71
// itself will be removed after 1 week. The tombstone is only needed for
72
// clients that go offline during the delete, then come back, and reply the
73
// partial log of what was missed. Such clients should reset if the
74
// offline time is longer than DEFAULT_TOMBSTONE_TTL.
75
// This only happens if allow_msg_ttl is configured to true, which is
76
// done with dkv, but not on by default otherwise.
77
export const DEFAULT_TOMBSTONE_TTL = 7 * 24 * 60 * 60 * 1000; // 1 week
78
79
export interface RawMsg extends Message {
80
timestamp: number;
81
seq: number;
82
key?: string;
83
}
84
85
export interface ChangeEvent<T> {
86
mesg?: T;
87
raw?: Partial<RawMsg>;
88
key?: string;
89
prev?: T;
90
msgID?: string;
91
}
92
93
const HEADER_PREFIX = "CN-";
94
95
export const COCALC_TOMBSTONE_HEADER = `${HEADER_PREFIX}Tombstone`;
96
export const COCALC_STREAM_HEADER = `${HEADER_PREFIX}Stream`;
97
export const COCALC_OPTIONS_HEADER = `${HEADER_PREFIX}Options`;
98
99
export interface CoreStreamOptions {
100
// what it's called
101
name: string;
102
// where it is located -- this is who **owns the resource**, which
103
// may or may not being who is accessing it.
104
account_id?: string;
105
project_id?: string;
106
config?: Partial<Configuration>;
107
// only load historic messages starting at the given seq number.
108
start_seq?: number;
109
110
ephemeral?: boolean;
111
112
client?: Client;
113
114
noCache?: boolean;
115
}
116
117
export interface User {
118
account_id?: string;
119
project_id?: string;
120
}
121
122
export function storagePath({
123
account_id,
124
project_id,
125
name,
126
}: User & { name: string }) {
127
let userPath;
128
if (account_id) {
129
userPath = `accounts/${account_id}`;
130
} else if (project_id) {
131
userPath = `projects/${project_id}`;
132
} else {
133
userPath = "hub";
134
}
135
return join(userPath, name);
136
}
137
138
export class CoreStream<T = any> extends EventEmitter {
139
public readonly name: string;
140
141
private configOptions?: Partial<Configuration>;
142
private _start_seq?: number;
143
144
// don't do "this.raw=" or "this.messages=" anywhere in this class
145
// because dstream directly references the public raw/messages.
146
public readonly raw: RawMsg[] = [];
147
public readonly messages: T[] = [];
148
public readonly kv: { [key: string]: { mesg: T; raw: RawMsg } } = {};
149
private kvChangeBytes = 0;
150
151
// this msgID's is ONLY used in ephemeral mode by the leader.
152
private readonly msgIDs = new Set<any>();
153
// lastSeq used by clients to keep track of what they have received; if one
154
// is skipped they reconnect starting with the last one they didn't miss.
155
private lastSeq: number = 0;
156
// IMPORTANT: user here means the *owner* of the resource, **NOT** the
157
// client who is accessing it! For example, a stream of edits of a file
158
// in a project has user {project_id} even if it is being accessed by
159
// an account.
160
private user: User;
161
private storage: StorageOptions;
162
private client?: Client;
163
private persistClient: PersistStreamClient;
164
165
constructor({
166
name,
167
project_id,
168
account_id,
169
config,
170
start_seq,
171
ephemeral = false,
172
client,
173
}: CoreStreamOptions) {
174
super();
175
logger.debug("constructor", name);
176
if (client == null) {
177
throw Error("client must be specified");
178
}
179
this.client = client;
180
this.user = { account_id, project_id };
181
this.name = name;
182
this.storage = {
183
path: storagePath({ account_id, project_id, name }),
184
ephemeral,
185
};
186
this._start_seq = start_seq;
187
this.configOptions = config;
188
return new Proxy(this, {
189
get(target, prop) {
190
return typeof prop == "string" && isNumericString(prop)
191
? target.get(parseInt(prop))
192
: target[String(prop)];
193
},
194
});
195
}
196
197
private initialized = false;
198
init = async () => {
199
if (this.initialized) {
200
throw Error("init can only be called once");
201
}
202
this.initialized = true;
203
if (this.client == null) {
204
this.client = await conat();
205
}
206
this.persistClient = persist({
207
client: this.client,
208
user: this.user,
209
storage: this.storage,
210
});
211
this.persistClient.on("error", (err) => {
212
if (!process.env.COCALC_TEST_MODE) {
213
console.log(`WARNING: persistent stream issue -- ${err}`);
214
}
215
});
216
await this.getAllFromPersist({
217
start_seq: this._start_seq,
218
noEmit: true,
219
});
220
221
await until(
222
async () => {
223
if (this.client == null) {
224
return true;
225
}
226
try {
227
this.configOptions = await this.config(this.configOptions);
228
return true;
229
} catch (err) {
230
if (err.code == 403) {
231
// fatal permission error
232
throw err;
233
}
234
}
235
return false;
236
},
237
{ start: 750 },
238
);
239
240
// NOTE: if we miss a message between getAllFromLeader and when we start listening,
241
// it will get filled in, due to sequence number tracking.
242
this.listen();
243
};
244
245
config = async (
246
config: Partial<Configuration> = {},
247
): Promise<Configuration> => {
248
if (this.storage == null) {
249
throw Error("bug -- storage must be set");
250
}
251
return await this.persistClient.config({ config });
252
};
253
254
close = () => {
255
logger.debug("close", this.name);
256
delete this.client;
257
this.removeAllListeners();
258
this.persistClient?.close();
259
// @ts-ignore
260
delete this.persistClient;
261
// @ts-ignore
262
delete this.kv;
263
// @ts-ignore
264
delete this.messages;
265
// @ts-ignore
266
delete this.raw;
267
// @ts-ignore
268
delete this.msgIDs;
269
// @ts-ignore
270
delete this.storage;
271
};
272
273
inventory = async (): Promise<PartialInventory> => {
274
return await this.persistClient.inventory();
275
};
276
277
// NOTE: It's assumed elsewhere that getAllFromPersist will not throw,
278
// and will keep retrying until (1) it works, or (2) self is closed,
279
// or (3) there is a fatal failure, e.g., lack of permissions.
280
private getAllFromPersist = async ({
281
start_seq = 0,
282
noEmit,
283
}: { start_seq?: number; noEmit?: boolean } = {}) => {
284
if (this.storage == null) {
285
throw Error("bug -- storage must be set");
286
}
287
await until(
288
async () => {
289
if (this.client == null) {
290
return true;
291
}
292
try {
293
// console.log("get persistent stream", { start_seq }, this.storage);
294
const sub = await this.persistClient.getAll({
295
start_seq,
296
});
297
// console.log("got sub", { noEmit });
298
while (true) {
299
const { value, done } = await sub.next();
300
if (done) {
301
return true;
302
}
303
const messages = value as StoredMessage[];
304
const seq = this.processPersistentMessages(messages, {
305
noEmit,
306
noSeqCheck: true,
307
});
308
if (seq != null) {
309
// we update start_seq in case we need to try again
310
start_seq = seq! + 1;
311
}
312
}
313
} catch (err) {
314
if (err.code == 403) {
315
// fatal permission error
316
throw err;
317
}
318
if (err.code == 429) {
319
// too many users
320
throw err;
321
}
322
if (!process.env.COCALC_TEST_MODE) {
323
console.log(
324
`WARNING: getAllFromPersist - failed -- ${err}, code=${err.code}`,
325
);
326
}
327
}
328
return false;
329
},
330
{ start: 750 },
331
);
332
};
333
334
private processPersistentMessages = (
335
messages: (SetOperation | DeleteOperation | StoredMessage)[],
336
opts: { noEmit?: boolean; noSeqCheck?: boolean },
337
) => {
338
// console.log("processPersistentMessages", messages.length, " messages");
339
if (this.raw === undefined) {
340
// closed
341
return;
342
}
343
let seq = undefined;
344
for (const mesg of messages) {
345
try {
346
this.processPersistentMessage(mesg, opts);
347
if (mesg["seq"] != null) {
348
seq = mesg["seq"];
349
}
350
} catch (err) {
351
console.log("WARNING: issue processing message", mesg, err);
352
}
353
}
354
return seq;
355
};
356
357
private processPersistentMessage = (
358
mesg: SetOperation | DeleteOperation | StoredMessage,
359
opts: { noEmit?: boolean; noSeqCheck?: boolean },
360
) => {
361
if ((mesg as DeleteOperation).op == "delete") {
362
this.processPersistentDelete(mesg as DeleteOperation, opts);
363
} else {
364
// set is the default
365
this.processPersistentSet(mesg as SetOperation, opts);
366
}
367
};
368
369
private processPersistentDelete = (
370
{ seqs }: DeleteOperation,
371
{ noEmit }: { noEmit?: boolean },
372
) => {
373
if (this.raw == null) return;
374
//console.log("processPersistentDelete", seqs);
375
const X = new Set<number>(seqs);
376
// seqs is a list of integers. We remove
377
// every entry from this.raw, this.messages, and this.kv
378
// where this.raw.seq is in X by mutating raw/messages/kv,
379
// not by making new objects (since external references).
380
// This is a rare operation so we're not worried too much
381
// about performance.
382
const keys: { [seq: number]: string } = {};
383
for (const key in this.kv) {
384
const seq = this.kv[key]?.raw?.seq;
385
if (X.has(seq)) {
386
delete this.kv[key];
387
keys[key] = seq;
388
}
389
}
390
const indexes: number[] = [];
391
for (let i = 0; i < this.raw.length; i++) {
392
const seq = this.raw[i].seq;
393
if (X.has(seq)) {
394
indexes.push(i);
395
if (!noEmit) {
396
this.emitChange({
397
mesg: undefined,
398
raw: { seq },
399
key: keys[seq],
400
prev: this.messages[i],
401
});
402
}
403
}
404
}
405
406
//console.log({ indexes, seqs, noEmit });
407
// remove this.raw[i] and this.messages[i] for all i in indexes,
408
// with special case to be fast in the very common case of contiguous indexes.
409
if (indexes.length > 1 && indexes.every((v, i) => v === indexes[0] + i)) {
410
// Contiguous: bulk remove for performance
411
const start = indexes[0];
412
const deleteCount = indexes.length;
413
this.raw.splice(start, deleteCount);
414
this.messages.splice(start, deleteCount);
415
} else {
416
// Non-contiguous: fallback to individual reverse splices
417
for (let i = indexes.length - 1; i >= 0; i--) {
418
const idx = indexes[i];
419
this.raw.splice(idx, 1);
420
this.messages.splice(idx, 1);
421
}
422
}
423
};
424
425
private processPersistentSetLargestSeq: number = 0;
426
private missingMessages = new Set<number>();
427
private processPersistentSet = (
428
{ seq, time, key, encoding, raw: data, headers, msgID }: SetOperation,
429
{
430
noEmit,
431
noSeqCheck,
432
}: {
433
noEmit?: boolean;
434
noSeqCheck?: boolean;
435
},
436
) => {
437
if (this.raw == null) return;
438
if (!noSeqCheck && this.processPersistentSetLargestSeq > 0) {
439
const expected = this.processPersistentSetLargestSeq + 1;
440
if (seq > expected) {
441
log(
442
"processPersistentSet -- detected missed seq number",
443
{ seq, expected: this.processPersistentSetLargestSeq + 1 },
444
this.storage,
445
);
446
// We record that some are missing.
447
for (let s = expected; s <= seq - 1; s++) {
448
this.missingMessages.add(s);
449
this.getAllMissingMessages();
450
}
451
}
452
}
453
454
if (seq > this.processPersistentSetLargestSeq) {
455
this.processPersistentSetLargestSeq = seq;
456
}
457
458
const mesg = decode({ encoding, data });
459
// console.log("processPersistentSet", seq, mesg)
460
const raw = {
461
timestamp: time,
462
headers,
463
seq,
464
raw: data,
465
key,
466
} as RawMsg;
467
if (seq > (this.raw.slice(-1)[0]?.seq ?? 0)) {
468
// easy fast initial load to the end of the list (common special case)
469
this.messages.push(mesg);
470
this.raw.push(raw);
471
} else {
472
// [ ] TODO: insert in the correct place. This should only
473
// happen when calling load of old ata. The algorithm below is
474
// dumb and could be replaced by a binary search. However, we'll
475
// change how we batch load so there's less point.
476
let i = 0;
477
while (i < this.raw.length && this.raw[i].seq < seq) {
478
i += 1;
479
}
480
this.raw.splice(i, 0, raw);
481
this.messages.splice(i, 0, mesg);
482
}
483
let prev: T | undefined = undefined;
484
if (typeof key == "string") {
485
prev = this.kv[key]?.mesg;
486
if (raw.headers?.[COCALC_TOMBSTONE_HEADER]) {
487
delete this.kv[key];
488
} else {
489
if (this.kv[key] !== undefined) {
490
const { raw } = this.kv[key];
491
this.kvChangeBytes += raw.raw.length;
492
}
493
494
this.kv[key] = { raw, mesg };
495
496
if (this.kvChangeBytes >= KEY_GC_THRESH) {
497
this.gcKv();
498
}
499
}
500
}
501
this.lastSeq = Math.max(this.lastSeq, seq);
502
if (!noEmit) {
503
this.emitChange({ mesg, raw, key, prev, msgID });
504
}
505
};
506
507
private emitChange = (e: ChangeEvent<T>) => {
508
if (this.raw == null) return;
509
this.emit("change", e);
510
};
511
512
private listen = async () => {
513
log("core-stream: listen", this.storage);
514
await until(
515
async () => {
516
if (this.client == null) {
517
return true;
518
}
519
try {
520
log("core-stream: START listening on changefeed", this.storage);
521
const changefeed = await this.persistClient.changefeed();
522
// console.log("listening on the changefeed...", this.storage);
523
for await (const updates of changefeed) {
524
// console.log("changefeed", this.storage, updates);
525
log("core-stream: process updates", updates, this.storage);
526
if (this.client == null) return true;
527
this.processPersistentMessages(updates, {
528
noEmit: false,
529
noSeqCheck: false,
530
});
531
}
532
// console.log("DONE listening on the changefeed...", this.storage);
533
} catch (err) {
534
// console.log("error listening on the changefeed...");
535
// This normally doesn't happen but could if a persist server is being restarted
536
// frequently or things are seriously broken. We cause this in
537
// backend/conat/test/core/core-stream-break.test.ts
538
if (!process.env.COCALC_TEST_MODE) {
539
log(
540
`WARNING: core-stream changefeed error -- ${err}`,
541
this.storage,
542
);
543
}
544
}
545
log("core-stream: STOP listening on changefeed", this.storage);
546
// above loop exits when the persistent server
547
// stops sending messages for some reason. In that
548
// case we reconnect, picking up where we left off:
549
if (this.client == null) return true;
550
log(
551
"core-stream: get missing from when changefeed ended",
552
this.storage,
553
);
554
await this.getAllFromPersist({
555
start_seq: this.lastSeq + 1,
556
noEmit: false,
557
});
558
return false;
559
},
560
{ start: 500, max: 7500, decay: 1.2 },
561
);
562
};
563
564
publish = async (
565
mesg: T,
566
options?: PublishOptions,
567
): Promise<{ seq: number; time: number } | undefined> => {
568
if (mesg === undefined) {
569
if (options?.key !== undefined) {
570
// undefined can't be JSON encoded, so we can't possibly represent it, and this
571
// *must* be treated as a delete.
572
this.deleteKv(options?.key, { previousSeq: options?.previousSeq });
573
return;
574
} else {
575
throw Error("stream non-kv publish - mesg must not be 'undefined'");
576
}
577
}
578
579
if (options?.msgID && this.msgIDs.has(options.msgID)) {
580
// it's a dup
581
return;
582
}
583
const md = messageData(mesg, { headers: options?.headers });
584
const x = await this.persistClient.set({
585
key: options?.key,
586
messageData: md,
587
previousSeq: options?.previousSeq,
588
msgID: options?.msgID,
589
ttl: options?.ttl,
590
timeout: options?.timeout,
591
});
592
if (options?.msgID) {
593
this.msgIDs?.add(options.msgID);
594
}
595
return x;
596
};
597
598
publishMany = async (
599
messages: { mesg: T; options?: PublishOptions }[],
600
): Promise<
601
({ seq: number; time: number } | { error: string; code?: any })[]
602
> => {
603
let result: (
604
| { seq: number; time: number }
605
| { error: string; code?: any }
606
)[] = [];
607
608
for (let i = 0; i < messages.length; i += PUBLISH_MANY_BATCH_SIZE) {
609
const batch = messages.slice(i, i + PUBLISH_MANY_BATCH_SIZE);
610
result = result.concat(await this.publishMany0(batch));
611
}
612
613
return result;
614
};
615
616
private publishMany0 = async (
617
messages: { mesg: T; options?: PublishOptions }[],
618
): Promise<
619
({ seq: number; time: number } | { error: string; code?: any })[]
620
> => {
621
const v: SetOptions[] = [];
622
let timeout: number | undefined = undefined;
623
for (const { mesg, options } of messages) {
624
if (options?.timeout) {
625
if (timeout === undefined) {
626
timeout = options.timeout;
627
} else {
628
timeout = Math.min(timeout, options.timeout);
629
}
630
}
631
const md = messageData(mesg, { headers: options?.headers });
632
v.push({
633
key: options?.key,
634
messageData: md,
635
previousSeq: options?.previousSeq,
636
msgID: options?.msgID,
637
ttl: options?.ttl,
638
});
639
}
640
return await this.persistClient.setMany(v, { timeout });
641
};
642
643
get = (n?): T | T[] => {
644
if (n == null) {
645
return this.getAll();
646
} else {
647
return this.messages[n];
648
}
649
};
650
651
seq = (n: number): number | undefined => {
652
return this.raw[n]?.seq;
653
};
654
655
getAll = (): T[] => {
656
return [...this.messages];
657
};
658
659
get length(): number {
660
return this.messages.length;
661
}
662
663
get start_seq(): number | undefined {
664
return this._start_seq;
665
}
666
667
headers = (n: number): { [key: string]: any } | undefined => {
668
return this.raw[n]?.headers;
669
};
670
671
// key:value interface for subset of messages pushed with key option set.
672
// NOTE: This does NOT throw an error if our local seq is out of date (leave that
673
// to dkv built on this).
674
setKv = async (
675
key: string,
676
mesg: T,
677
options?: {
678
headers?: Headers;
679
previousSeq?: number;
680
},
681
): Promise<{ seq: number; time: number } | undefined> => {
682
return await this.publish(mesg, { ...options, key });
683
};
684
685
setKvMany = async (
686
x: {
687
key: string;
688
mesg: T;
689
options?: {
690
headers?: Headers;
691
previousSeq?: number;
692
};
693
}[],
694
): Promise<
695
({ seq: number; time: number } | { error: string; code?: any })[]
696
> => {
697
const messages: { mesg: T; options?: PublishOptions }[] = [];
698
for (const { key, mesg, options } of x) {
699
messages.push({ mesg, options: { ...options, key } });
700
}
701
return await this.publishMany(messages);
702
};
703
704
deleteKv = async (
705
key: string,
706
options?: {
707
msgID?: string;
708
previousSeq?: number;
709
},
710
) => {
711
if (this.kv[key] === undefined) {
712
// nothing to do
713
return;
714
}
715
return await this.publish(null as any, {
716
...options,
717
headers: { [COCALC_TOMBSTONE_HEADER]: true },
718
key,
719
ttl: DEFAULT_TOMBSTONE_TTL,
720
});
721
};
722
723
getKv = (key: string): T | undefined => {
724
return this.kv[key]?.mesg;
725
};
726
727
hasKv = (key: string): boolean => {
728
return this.kv?.[key] !== undefined;
729
};
730
731
getAllKv = (): { [key: string]: T } => {
732
const all: { [key: string]: T } = {};
733
for (const key in this.kv) {
734
all[key] = this.kv[key].mesg;
735
}
736
return all;
737
};
738
739
// efficient way to get just the keys -- use this instead of
740
// getAllKv if you just need the keys.
741
keysKv = (): string[] => {
742
return Object.keys(this.kv);
743
};
744
745
seqKv = (key: string): number | undefined => {
746
return this.kv[key]?.raw.seq;
747
};
748
749
timeKv = (key?: string): Date | { [key: string]: Date } | undefined => {
750
if (key === undefined) {
751
const all: { [key: string]: Date } = {};
752
for (const key in this.kv) {
753
all[key] = new Date(this.kv[key].raw.timestamp);
754
}
755
return all;
756
}
757
const r = this.kv[key]?.raw;
758
if (r == null) {
759
return;
760
}
761
return new Date(r.timestamp);
762
};
763
764
headersKv = (key: string): { [key: string]: any } | undefined => {
765
return this.kv[key]?.raw?.headers;
766
};
767
768
get lengthKv(): number {
769
return Object.keys(this.kv).length;
770
}
771
772
// load older messages starting at start_seq up to the oldest message
773
// we currently have.
774
load = async ({
775
start_seq,
776
noEmit,
777
}: {
778
start_seq: number;
779
noEmit?: boolean;
780
}) => {
781
// This is used for loading more TimeTravel history
782
if (this.storage == null) {
783
throw Error("bug");
784
}
785
// this is one before the oldest we have
786
const end_seq = (this.raw[0]?.seq ?? this._start_seq ?? 1) - 1;
787
if (start_seq > end_seq) {
788
// nothing to load
789
return;
790
}
791
// we're moving start_seq back to this point
792
this._start_seq = start_seq;
793
const sub = await this.persistClient.getAll({
794
start_seq,
795
end_seq,
796
});
797
for await (const updates of sub) {
798
this.processPersistentMessages(updates, { noEmit, noSeqCheck: true });
799
}
800
};
801
802
private getAllMissingMessages = reuseInFlight(async () => {
803
await until(
804
async () => {
805
if (this.client == null || this.missingMessages.size == 0) {
806
return true;
807
}
808
try {
809
const missing = Array.from(this.missingMessages);
810
missing.sort();
811
log("core-stream: getMissingSeq", missing, this.storage);
812
const sub = await this.persistClient.getAll({
813
start_seq: missing[0],
814
end_seq: missing[missing.length - 1],
815
});
816
for await (const updates of sub) {
817
this.processPersistentMessages(updates, {
818
noEmit: false,
819
noSeqCheck: true,
820
});
821
}
822
for (const seq of missing) {
823
this.missingMessages.delete(seq);
824
}
825
} catch (err) {
826
log(
827
"core-stream: WARNING -- issue getting missing updates",
828
err,
829
this.storage,
830
);
831
}
832
return false;
833
},
834
{ start: 1000, max: 15000, decay: 1.3 },
835
);
836
});
837
838
// get server assigned time of n-th message in stream
839
time = (n: number): Date | undefined => {
840
const r = this.raw[n];
841
if (r == null) {
842
return;
843
}
844
return new Date(r.timestamp);
845
};
846
847
times = () => {
848
const v: (Date | undefined)[] = [];
849
for (let i = 0; i < this.length; i++) {
850
v.push(this.time(i));
851
}
852
return v;
853
};
854
855
stats = ({
856
start_seq = 1,
857
}: {
858
start_seq?: number;
859
} = {}): { count: number; bytes: number } | undefined => {
860
if (this.raw == null) {
861
return;
862
}
863
let count = 0;
864
let bytes = 0;
865
for (const { raw, seq } of this.raw) {
866
if (seq == null) {
867
continue;
868
}
869
if (seq < start_seq) {
870
continue;
871
}
872
count += 1;
873
bytes += raw.length;
874
}
875
return { count, bytes };
876
};
877
878
// delete all messages up to and including the
879
// one at position index, i.e., this.messages[index]
880
// is deleted.
881
// NOTE: For ephemeral streams, clients will NOT see the result of a delete,
882
// except when they load the stream later. For persistent streams all
883
// **connected** clients will see the delete. THAT said, this is not a "proper"
884
// distributed computing primitive with tombstones, etc. This is primarily
885
// meant for reducing space usage, and shouldn't be relied on for
886
// any other purpose.
887
delete = async ({
888
all,
889
last_index,
890
seq,
891
last_seq,
892
key,
893
}: {
894
// give exactly ONE parameter -- by default nothing happens with no params
895
// all: delete everything
896
all?: boolean;
897
// last_index: everything up to and including index'd message
898
last_index?: number;
899
// seq: delete message with this sequence number
900
seq?: number;
901
// last_seq: delete everything up to and including this sequence number
902
last_seq?: number;
903
// key: delete the message with this key
904
key?: string;
905
} = {}): Promise<{ seqs: number[] }> => {
906
let opts;
907
if (all) {
908
opts = { all: true };
909
} else if (last_index != null) {
910
if (last_index >= this.raw.length) {
911
opts = { all: true };
912
} else if (last_index < 0) {
913
return { seqs: [] };
914
} else {
915
const last_seq = this.raw[last_index].seq;
916
if (last_seq === undefined) {
917
throw Error(`BUG: invalid index ${last_index}`);
918
}
919
opts = { last_seq };
920
}
921
} else if (seq != null) {
922
opts = { seq };
923
} else if (last_seq != null) {
924
opts = { last_seq };
925
} else if (key != null) {
926
const seq = this.kv[key]?.raw?.seq;
927
if (seq === undefined) {
928
return { seqs: [] };
929
}
930
opts = { seq };
931
}
932
return await this.persistClient.delete(opts);
933
};
934
935
// delete messages that are no longer needed since newer values have been written
936
gcKv = () => {
937
this.kvChangeBytes = 0;
938
for (let i = 0; i < this.raw.length; i++) {
939
const key = this.raw[i].key;
940
if (key !== undefined) {
941
if (this.raw[i].raw.length > 0 && this.raw[i] !== this.kv[key].raw) {
942
this.raw[i] = {
943
...this.raw[i],
944
headers: undefined,
945
raw: Buffer.from(""),
946
} as RawMsg;
947
this.messages[i] = undefined as T;
948
}
949
}
950
}
951
};
952
}
953
954
export interface PublishOptions {
955
// headers for this message
956
headers?: Headers;
957
// unique id for this message to dedup so if you send the same
958
// message more than once with the same id it doesn't get published
959
// multiple times.
960
msgID?: string;
961
// key -- if specified a key field is also stored on the server,
962
// and any previous messages with the same key are deleted. Also,
963
// an entry is set in this.kv[key] so that this.getKv(key), etc. work.
964
key?: string;
965
// if key is specified and previousSeq is set, the server throws
966
// an error if the sequence number of the current key is
967
// not previousSeq. We use this with this.seqKv(key) to
968
// provide read/change/write semantics and to know when we
969
// should resovle a merge conflict. This is ignored if
970
// key is not specified.
971
previousSeq?: number;
972
// if set to a number of ms AND the config option allow_msg_ttl
973
// is set on this persistent stream, then
974
// this message will be deleted after the given amount of time (in ms).
975
ttl?: number;
976
timeout?: number;
977
}
978
979
export const cache = refCache<CoreStreamOptions, CoreStream>({
980
name: "core-stream",
981
createObject: async (options: CoreStreamOptions) => {
982
if (options.client == null) {
983
options = { ...options, client: await conat() };
984
}
985
const cstream = new CoreStream(options);
986
await cstream.init();
987
return cstream;
988
},
989
createKey: ({ client, ...options }) => {
990
return jsonStableStringify(options)!;
991
},
992
});
993
994
export async function cstream<T>(
995
options: CoreStreamOptions,
996
): Promise<CoreStream<T>> {
997
return await cache(options);
998
}
999
1000