Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/conat/persist/storage.ts
1453 views
1
/*
2
Persistent storage of a specific stream or kv store.
3
4
You can set a message by providing optionally a key, buffer and/or json value.
5
A sequence number and time (in ms since epoch) is assigned and returned.
6
If the key is provided, it is an arbitrary string and all older messages
7
with that same key are deleted. You can efficiently retrieve a message
8
by its key. The message content itself is given by the buffer and/or json
9
value. The buffer is like the "payload" in NATS, and the json is like
10
the headers in NATS.
11
12
This module is:
13
14
- efficient -- buffer is automatically compressed using zstandard
15
- synchronous -- fast enough to meet our requirements even with blocking
16
- memory efficient -- nothing in memory beyond whatever key you request
17
18
We care about memory efficiency here since it's likely we'll want to have
19
possibly thousands of these in a single nodejs process at once, but with
20
less than 1 read/write per second for each. Thus memory is critical, and
21
supporting at least 1000 writes/second is what we need.
22
Fortunately, this implementation can do ~50,000+ writes per second and read
23
over 500,000 per second. Yes, it blocks the main thread, but by using
24
better-sqlite3 and zstd-napi, we get 10x speed increases over async code,
25
so this is worth it.
26
27
28
COMPRESSION:
29
30
I implemented *sync* lz4-napi compression here and it's very fast,
31
but it has to be run with async waits in a loop or it doesn't give back
32
memory, and such throttling may significantly negatively impact performance
33
and mean we don't get a 100% sync api (like we have now).
34
The async functions in lz4-napi seem fine. Upstream report (by me):
35
https://github.com/antoniomuso/lz4-napi/issues/678
36
I also tried the rust sync snappy and it had a similar memory leak. Finally,
37
I tried zstd-napi and it has a very fast sync implementation that does *not*
38
need async pauses to not leak memory. So zstd-napi it is.
39
And I like zstandard anyways.
40
41
NOTE:
42
43
We use seconds instead of ms in sqlite since that is the standard
44
convention for times in sqlite.
45
46
DEVELOPMENT:
47
48
49
s = require('@cocalc/backend/conat/persist').pstream({path:'/tmp/a.db'})
50
51
*/
52
53
import { refCacheSync } from "@cocalc/util/refcache";
54
import { createDatabase, type Database, compress, decompress } from "./context";
55
import type { JSONValue } from "@cocalc/util/types";
56
import { EventEmitter } from "events";
57
import {
58
DataEncoding,
59
type Headers,
60
ConatError,
61
} from "@cocalc/conat/core/client";
62
import TTL from "@isaacs/ttlcache";
63
import { getLogger } from "@cocalc/conat/client";
64
65
const logger = getLogger("persist:storage");
66
67
export interface PartialInventory {
68
// how much space is used by this stream
69
bytes: number;
70
limits: Partial<Configuration>;
71
// number of messages
72
count: number;
73
// for streams, the seq number up to which this data is valid, i.e.,
74
// this data is for all elements of the stream with sequence
75
// number <= seq.
76
seq: number;
77
}
78
79
export interface Configuration {
80
// How many messages may be in a Stream, oldest messages will be removed
81
// if the Stream exceeds this size. -1 for unlimited.
82
max_msgs: number;
83
84
// Maximum age of any message in the stream,
85
// expressed in milliseconds. 0 for unlimited.
86
// **Note that max_age is in milliseconds.**
87
max_age: number;
88
89
// How big the Stream may be. When the stream size
90
// exceeds this, old messages are removed. -1 for unlimited.
91
// The size of a message is the sum of the raw uncompressed blob
92
// size, the headers json and the key length.
93
max_bytes: number;
94
95
// The largest message that will be accepted. -1 for unlimited.
96
max_msg_size: number;
97
98
// Attempting to publish a message that causes either of the following
99
// two rate limits to be exceeded throws an exception.
100
// For dstream, the messages are explicitly rejected and the client
101
// gets a "reject" event emitted. E.g., the terminal running in the project
102
// writes [...] when it gets these rejects, indicating that data was dropped.
103
// -1 for unlimited
104
max_bytes_per_second: number;
105
106
// -1 for unlimited
107
max_msgs_per_second: number;
108
109
// old = delete old messages to make room for nw
110
// new = refuse writes if they exceed the limits
111
discard_policy: "old" | "new";
112
113
// If true (default: false), messages will be automatically deleted after their ttl
114
// Use the option {ttl:number of MILLISECONDS} when publishing to set a ttl.
115
allow_msg_ttl: boolean;
116
117
// description of this table
118
desc: JSONValue;
119
}
120
121
const CONFIGURATION = {
122
max_msgs: { def: -1, fromDb: parseInt, toDb: (x) => `${parseInt(x)}` },
123
max_age: { def: 0, fromDb: parseInt, toDb: (x) => `${parseInt(x)}` },
124
max_bytes: { def: -1, fromDb: parseInt, toDb: (x) => `${parseInt(x)}` },
125
max_msg_size: { def: -1, fromDb: parseInt, toDb: (x) => `${parseInt(x)}` },
126
max_bytes_per_second: {
127
def: -1,
128
fromDb: parseInt,
129
toDb: (x) => `${parseInt(x)}`,
130
},
131
max_msgs_per_second: {
132
def: -1,
133
fromDb: parseInt,
134
toDb: (x) => `${parseInt(x)}`,
135
},
136
discard_policy: {
137
def: "old",
138
fromDb: (x) => `${x}`,
139
toDb: (x) => (x == "new" ? "new" : "old"),
140
},
141
allow_msg_ttl: {
142
def: false,
143
fromDb: (x) => x == "true",
144
toDb: (x) => `${!!x}`,
145
},
146
desc: {
147
def: null,
148
fromDb: JSON.parse,
149
toDb: JSON.stringify,
150
},
151
};
152
153
export const EPHEMERAL_MAX_BYTES = 64 * 1e6;
154
155
enum CompressionAlgorithm {
156
None = 0,
157
Zstd = 1,
158
}
159
160
interface Compression {
161
// compression algorithm to use
162
algorithm: CompressionAlgorithm;
163
// only compress data above this size
164
threshold: number;
165
}
166
167
const DEFAULT_COMPRESSION = {
168
algorithm: CompressionAlgorithm.Zstd,
169
threshold: 1024,
170
};
171
172
export interface StoredMessage {
173
// server assigned positive increasing integer number
174
seq: number;
175
// server assigned time in ms since epoch
176
time: number;
177
// user assigned key -- when set all previous messages with that key are deleted.
178
key?: string;
179
// the encoding used to encode the raw data
180
encoding: DataEncoding;
181
// arbitrary binary data
182
raw: Buffer;
183
// arbitrary JSON-able object -- analogue of NATS headers, but anything JSON-able
184
headers?: Headers;
185
}
186
187
export interface SetOperation extends StoredMessage {
188
op: undefined;
189
msgID?: string;
190
}
191
192
export interface DeleteOperation {
193
op: "delete";
194
// sequence numbers of deleted messages
195
seqs: number[];
196
}
197
198
export interface StorageOptions {
199
// absolute path to sqlite database file. This needs to be a valid filename
200
// path, and must also be kept under 1K so it can be stored in cloud storage.
201
path: string;
202
// if false (the default) do not require sync writes to disk on every set
203
sync?: boolean;
204
// if set, then data is never saved to disk at all. To avoid using a lot of server
205
// RAM there is always a hard cap of at most EPHEMERAL_MAX_BYTES on any ephemeral
206
// table, which is enforced on all writes. Clients should always set max_bytes,
207
// possibly as low as they can, and check by reading back what is set.
208
ephemeral?: boolean;
209
// compression configuration
210
compression?: Compression;
211
}
212
213
// persistence for stream of messages with subject
214
export class PersistentStream extends EventEmitter {
215
private readonly options: StorageOptions;
216
private readonly db: Database;
217
private readonly msgIDs = new TTL({ ttl: 2 * 60 * 1000 });
218
private conf: Configuration;
219
220
constructor(options: StorageOptions) {
221
super();
222
logger.debug("constructor ", options.path);
223
224
this.setMaxListeners(1000);
225
options = { compression: DEFAULT_COMPRESSION, ...options };
226
this.options = options;
227
const location = this.options.ephemeral
228
? ":memory:"
229
: this.options.path + ".db";
230
this.db = createDatabase(location);
231
//console.log(location);
232
this.init();
233
}
234
235
init = () => {
236
if (!this.options.sync && !this.options.ephemeral) {
237
// Unless sync is set, we do not require that the filesystem has commited changes
238
// to disk after every insert. This can easily make things 10x faster. sets are
239
// typically going to come in one-by-one as users edit files, so this works well
240
// for our application. Also, loss of a few seconds persistence is acceptable
241
// in a lot of applications, e.g., if it is just edit history for a file.
242
this.db.prepare("PRAGMA synchronous=OFF").run();
243
}
244
// time is in *seconds* since the epoch, since that is standard for sqlite.
245
// ttl is in milliseconds.
246
this.db
247
.prepare(
248
`CREATE TABLE IF NOT EXISTS messages (
249
seq INTEGER PRIMARY KEY AUTOINCREMENT, key TEXT UNIQUE, time INTEGER NOT NULL, headers TEXT, compress NUMBER NOT NULL, encoding NUMBER NOT NULL, raw BLOB NOT NULL, size NUMBER NOT NULL, ttl NUMBER
250
)
251
`,
252
)
253
.run();
254
this.db
255
.prepare(
256
`
257
CREATE TABLE IF NOT EXISTS config (
258
field TEXT PRIMARY KEY, value TEXT NOT NULL
259
)`,
260
)
261
.run();
262
this.db
263
.prepare("CREATE INDEX IF NOT EXISTS idx_messages_key ON messages(key)")
264
.run();
265
this.db
266
.prepare("CREATE INDEX IF NOT EXISTS idx_messages_time ON messages(time)")
267
.run();
268
269
this.conf = this.config();
270
};
271
272
close = () => {
273
logger.debug("close ", this.options.path);
274
if (this.db != null) {
275
this.vacuum();
276
this.db.prepare("PRAGMA wal_checkpoint(FULL)").run();
277
this.db.close();
278
// @ts-ignore
279
}
280
// @ts-ignore
281
delete this.options;
282
this.msgIDs?.clear();
283
// @ts-ignore
284
delete this.msgIDs;
285
};
286
287
private compress = (
288
raw: Buffer,
289
): { raw: Buffer; compress: CompressionAlgorithm } => {
290
if (
291
this.options.compression!.algorithm == CompressionAlgorithm.None ||
292
raw.length <= this.options.compression!.threshold
293
) {
294
return { raw, compress: CompressionAlgorithm.None };
295
}
296
if (this.options.compression!.algorithm == CompressionAlgorithm.Zstd) {
297
return { raw: compress(raw), compress: CompressionAlgorithm.Zstd };
298
}
299
throw Error(
300
`unknown compression algorithm: ${this.options.compression!.algorithm}`,
301
);
302
};
303
304
set = ({
305
encoding,
306
raw,
307
headers,
308
key,
309
ttl,
310
previousSeq,
311
msgID,
312
}: {
313
encoding: DataEncoding;
314
raw: Buffer;
315
headers?: JSONValue;
316
key?: string;
317
ttl?: number;
318
previousSeq?: number;
319
// if given, any attempt to publish something again with the same msgID
320
// is deduplicated. Use this to prevent accidentally writing twice, e.g.,
321
// due to not getting a response back from the server.
322
msgID?: string;
323
}): { seq: number; time: number } => {
324
if (previousSeq === null) {
325
previousSeq = undefined;
326
}
327
if (key === null) {
328
key = undefined;
329
}
330
if (msgID != null && this.msgIDs?.has(msgID)) {
331
return this.msgIDs.get(msgID)!;
332
}
333
if (key !== undefined && previousSeq !== undefined) {
334
// throw error if current seq number for the row
335
// with this key is not previousSeq.
336
const { seq } = this.db // there is an index on the key so this is fast
337
.prepare("SELECT seq FROM messages WHERE key=?")
338
.get(key) as any;
339
if (seq != previousSeq) {
340
throw new ConatError("wrong last sequence", {
341
code: "wrong-last-sequence",
342
});
343
}
344
}
345
const time = Date.now();
346
const compressedRaw = this.compress(raw);
347
const serializedHeaders = JSON.stringify(headers);
348
const size =
349
(serializedHeaders?.length ?? 0) +
350
(raw?.length ?? 0) +
351
(key?.length ?? 0);
352
353
this.enforceLimits(size);
354
355
const tx = this.db.transaction(
356
(time, compress, encoding, raw, headers, key, size, ttl) => {
357
if (key !== undefined) {
358
// insert with key -- delete all previous messages, as they will
359
// never be needed again and waste space.
360
this.db.prepare("DELETE FROM messages WHERE key = ?").run(key);
361
}
362
return this.db
363
.prepare(
364
"INSERT INTO messages(time, compress, encoding, raw, headers, key, size, ttl) VALUES (?, ?, ?, ?, ?, ?, ?, ?) RETURNING seq",
365
)
366
.get(time / 1000, compress, encoding, raw, headers, key, size, ttl);
367
},
368
);
369
const row = tx(
370
time,
371
compressedRaw.compress,
372
encoding,
373
compressedRaw.raw,
374
serializedHeaders,
375
key,
376
size,
377
ttl,
378
);
379
const seq = Number((row as any).seq);
380
// lastInsertRowid - is a bigint from sqlite, but we won't hit that limit
381
this.emit("change", {
382
seq,
383
time,
384
key,
385
encoding,
386
raw,
387
headers,
388
msgID,
389
});
390
if (msgID !== undefined) {
391
this.msgIDs.set(msgID, { time, seq });
392
}
393
return { time, seq };
394
};
395
396
get = ({
397
seq,
398
key,
399
}: { seq: number; key: undefined } | { seq: undefined; key: string }):
400
| StoredMessage
401
| undefined => {
402
let x;
403
if (seq) {
404
x = this.db
405
.prepare(
406
"SELECT seq, key, time, compress, encoding, raw, headers FROM messages WHERE seq=?",
407
)
408
.get(seq);
409
} else if (key != null) {
410
// NOTE: we guarantee when doing set above that there is at most one
411
// row with a given key. Also there's a unique constraint.
412
x = this.db
413
.prepare(
414
"SELECT seq, key, time, compress, encoding, raw, headers FROM messages WHERE key=?",
415
)
416
.get(key);
417
} else {
418
x = undefined;
419
}
420
return dbToMessage(x as any);
421
};
422
423
*getAll({
424
start_seq,
425
end_seq,
426
}: {
427
end_seq?: number;
428
start_seq?: number;
429
} = {}): IterableIterator<StoredMessage> {
430
let query: string, stmt;
431
432
const where: string[] = [];
433
const v: number[] = [];
434
if (start_seq != null) {
435
where.push("seq>=?");
436
v.push(start_seq);
437
}
438
if (end_seq != null) {
439
where.push("seq<=?");
440
v.push(end_seq);
441
}
442
query = `SELECT seq, key, time, compress, encoding, raw, headers FROM messages ${where.length == 0 ? "" : " where " + where.join(" AND ")} ORDER BY seq`;
443
stmt = this.db.prepare(query);
444
for (const row of stmt.iterate(...v)) {
445
yield dbToMessage(row)!;
446
}
447
}
448
449
delete = ({
450
seq,
451
last_seq,
452
all,
453
}: {
454
seq?: number;
455
last_seq?: number;
456
all?: boolean;
457
}): { seqs: number[] } => {
458
let seqs: number[] = [];
459
if (all) {
460
seqs = this.db
461
.prepare("SELECT seq FROM messages")
462
.all()
463
.map((row: any) => row.seq);
464
this.db.prepare("DELETE FROM messages").run();
465
this.vacuum();
466
} else if (last_seq) {
467
seqs = this.db
468
.prepare("SELECT seq FROM messages WHERE seq<=?")
469
.all(last_seq)
470
.map((row: any) => row.seq);
471
this.db.prepare("DELETE FROM messages WHERE seq<=?").run(last_seq);
472
this.vacuum();
473
} else if (seq) {
474
seqs = this.db
475
.prepare("SELECT seq FROM messages WHERE seq=?")
476
.all(seq)
477
.map((row: any) => row.seq);
478
this.db.prepare("DELETE FROM messages WHERE seq=?").run(seq);
479
}
480
this.emit("change", { op: "delete", seqs });
481
return { seqs };
482
};
483
484
vacuum = () => {
485
try {
486
this.db.prepare("VACUUM").run();
487
} catch {}
488
};
489
490
get length(): number {
491
const { length } = this.db
492
.prepare("SELECT COUNT(*) AS length FROM messages")
493
.get() as { length: number };
494
return length;
495
}
496
497
totalSize = (): number => {
498
return (
499
(this.db.prepare(`SELECT SUM(size) AS sum FROM messages`).get() as any)
500
.sum ?? 0
501
);
502
};
503
504
seq = (): number => {
505
return (
506
(this.db.prepare(`SELECT MAX(seq) AS seq FROM messages`).get() as any)
507
.seq ?? 0
508
);
509
};
510
511
inventory = (): PartialInventory => {
512
return {
513
bytes: this.totalSize(),
514
count: this.length,
515
limits: this.getConfig(),
516
seq: this.seq(),
517
};
518
};
519
520
keys = (): string[] => {
521
const v = this.db
522
.prepare("SELECT key FROM messages WHERE key IS NOT NULL")
523
.all() as {
524
key: string;
525
}[];
526
return v.map(({ key }) => key);
527
};
528
529
sqlite = (statement: string, params: any[] = []): any[] => {
530
// Matches "attach database" (case-insensitive, ignores whitespace)
531
if (/\battach\s+database\b/i.test(statement)) {
532
throw Error("ATTACH DATABASE not allowed");
533
}
534
const stmt = this.db.prepare(statement);
535
try {
536
return stmt.all(...params);
537
} catch (err) {
538
if (err.message.includes("run() instead")) {
539
stmt.run(...params);
540
return [];
541
} else {
542
throw err;
543
}
544
}
545
};
546
547
// only returns fields that are not set to their default value,
548
// and doesn't enforce any limits
549
getConfig = (): Partial<Configuration> => {
550
const cur: any = {};
551
for (const { field, value } of this.db
552
.prepare("SELECT * FROM config")
553
.all() as any) {
554
const { def, fromDb } = CONFIGURATION[field];
555
cur[field] = fromDb(value);
556
if (cur[field] == def) {
557
delete cur[field];
558
}
559
}
560
return cur;
561
};
562
563
config = (config?: Partial<Configuration>): Configuration => {
564
const cur: any = {};
565
for (const { field, value } of this.db
566
.prepare("SELECT * FROM config")
567
.all() as any) {
568
cur[field] = value;
569
}
570
const full: Partial<Configuration> = {};
571
for (const key in CONFIGURATION) {
572
const { def, fromDb, toDb } = CONFIGURATION[key];
573
full[key] =
574
config?.[key] ?? (cur[key] !== undefined ? fromDb(cur[key]) : def);
575
let x = toDb(full[key]);
576
if (config?.[key] != null && full[key] != (cur[key] ?? def)) {
577
// making a change
578
this.db
579
.prepare(
580
`INSERT INTO config (field, value) VALUES(?, ?) ON CONFLICT(field) DO UPDATE SET value=excluded.value`,
581
)
582
.run(key, x);
583
}
584
full[key] = fromDb(x);
585
if (
586
this.options.ephemeral &&
587
key == "max_bytes" &&
588
(full[key] == null || full[key] <= 0 || full[key] > EPHEMERAL_MAX_BYTES)
589
) {
590
// for ephemeral we always make it so max_bytes is capped
591
// (note -- this isn't explicitly set in the sqlite database, since we might
592
// change it, and by not setting it in the database we can)
593
full[key] = EPHEMERAL_MAX_BYTES;
594
}
595
}
596
this.conf = full as Configuration;
597
// ensure any new limits are enforced
598
this.enforceLimits(0);
599
return full as Configuration;
600
};
601
602
private emitDelete = (rows) => {
603
if (rows.length > 0) {
604
const seqs = rows.map((row: { seq: number }) => row.seq);
605
this.emit("change", { op: "delete", seqs });
606
}
607
};
608
609
// do whatever limit enforcement and throttling is needed when inserting one new message
610
// with the given size; if size=0 assume not actually inserting a new message, and just
611
// enforcingt current limits
612
private enforceLimits = (size: number = 0) => {
613
if (
614
size > 0 &&
615
(this.conf.max_msgs_per_second > 0 || this.conf.max_bytes_per_second > 0)
616
) {
617
const { msgs, bytes } = this.db
618
.prepare(
619
"SELECT COUNT(*) AS msgs, SUM(size) AS bytes FROM messages WHERE time >= ?",
620
)
621
.get(Date.now() / 1000 - 1) as { msgs: number; bytes: number };
622
if (
623
this.conf.max_msgs_per_second > 0 &&
624
msgs > this.conf.max_msgs_per_second
625
) {
626
throw new ConatError("max_msgs_per_second exceeded", {
627
code: "reject",
628
});
629
}
630
if (
631
this.conf.max_bytes_per_second > 0 &&
632
bytes > this.conf.max_bytes_per_second
633
) {
634
throw new ConatError("max_bytes_per_second exceeded", {
635
code: "reject",
636
});
637
}
638
}
639
640
if (this.conf.max_msgs > -1) {
641
const length = this.length + (size > 0 ? 1 : 0);
642
if (length > this.conf.max_msgs) {
643
if (this.conf.discard_policy == "new") {
644
if (size > 0) {
645
throw new ConatError("max_msgs limit reached", { code: "reject" });
646
}
647
} else {
648
// delete earliest messages to make room
649
const rows = this.db
650
.prepare(
651
`DELETE FROM messages WHERE seq IN (SELECT seq FROM messages ORDER BY seq ASC LIMIT ?) RETURNING seq`,
652
)
653
.all(length - this.conf.max_msgs);
654
this.emitDelete(rows);
655
}
656
}
657
}
658
659
if (this.conf.max_age > 0) {
660
const rows = this.db
661
.prepare(
662
`DELETE FROM messages WHERE seq IN (SELECT seq FROM messages WHERE time <= ?) RETURNING seq`,
663
)
664
.all((Date.now() - this.conf.max_age) / 1000);
665
this.emitDelete(rows);
666
}
667
668
if (this.conf.max_bytes > -1) {
669
if (size > this.conf.max_bytes) {
670
if (this.conf.discard_policy == "new") {
671
if (size > 0) {
672
throw new ConatError("max_bytes limit reached", { code: "reject" });
673
}
674
} else {
675
// new message exceeds total, so this is the same as adding in the new message,
676
// then deleting everything.
677
this.delete({ all: true });
678
}
679
} else {
680
// delete all the earliest (in terms of seq number) messages
681
// so that the sum of the remaining
682
// sizes along with the new size is <= max_bytes.
683
// Only enforce if actually inserting, or if current sum is over
684
const totalSize = this.totalSize();
685
const newTotal = totalSize + size;
686
if (newTotal > this.conf.max_bytes) {
687
const bytesToFree = newTotal - this.conf.max_bytes;
688
let freed = 0;
689
let lastSeqToDelete: number | null = null;
690
691
for (const { seq, size: msgSize } of this.db
692
.prepare(`SELECT seq, size FROM messages ORDER BY seq ASC`)
693
.iterate() as any) {
694
if (freed >= bytesToFree) break;
695
freed += msgSize;
696
lastSeqToDelete = seq;
697
}
698
699
if (lastSeqToDelete !== null) {
700
if (this.conf.discard_policy == "new") {
701
if (size > 0) {
702
throw new ConatError("max_bytes limit reached", {
703
code: "reject",
704
});
705
}
706
} else {
707
const rows = this.db
708
.prepare(`DELETE FROM messages WHERE seq <= ? RETURNING seq`)
709
.all(lastSeqToDelete);
710
this.emitDelete(rows);
711
}
712
}
713
}
714
}
715
}
716
717
if (this.conf.allow_msg_ttl) {
718
const rows = this.db
719
.prepare(
720
`DELETE FROM messages WHERE ttl IS NOT null AND time + ttl/1000 < ? RETURNING seq`,
721
)
722
.all(Date.now() / 1000);
723
this.emitDelete(rows);
724
}
725
726
if (this.conf.max_msg_size > -1 && size > this.conf.max_msg_size) {
727
throw new ConatError(
728
`max_msg_size of ${this.conf.max_msg_size} bytes exceeded`,
729
{ code: "reject" },
730
);
731
}
732
};
733
}
734
735
function dbToMessage(
736
x:
737
| {
738
seq: number;
739
key?: string;
740
time: number;
741
compress: CompressionAlgorithm;
742
encoding: DataEncoding;
743
raw: Buffer;
744
headers?: string;
745
}
746
| undefined,
747
): StoredMessage | undefined {
748
if (x === undefined) {
749
return x;
750
}
751
return {
752
seq: x.seq,
753
time: x.time * 1000,
754
key: x.key != null ? x.key : undefined,
755
encoding: x.encoding,
756
raw: handleDecompress(x),
757
headers: x.headers ? JSON.parse(x.headers) : undefined,
758
};
759
}
760
761
function handleDecompress({
762
raw,
763
compress,
764
}: {
765
raw: Buffer;
766
compress: CompressionAlgorithm;
767
}) {
768
if (compress == CompressionAlgorithm.None) {
769
return raw;
770
} else if (compress == CompressionAlgorithm.Zstd) {
771
return decompress(raw);
772
} else {
773
throw Error(`unknown compression ${compress}`);
774
}
775
}
776
777
interface CreateOptions extends StorageOptions {
778
noCache?: boolean;
779
}
780
781
export const cache = refCacheSync<CreateOptions, PersistentStream>({
782
name: "persistent-storage-stream",
783
createKey: ({ path }: CreateOptions) => path,
784
createObject: (options: CreateOptions) => {
785
const pstream = new PersistentStream(options);
786
pstream.init();
787
return pstream;
788
},
789
});
790
791
export function pstream(
792
options: StorageOptions & { noCache?: boolean },
793
): PersistentStream {
794
return cache(options);
795
}
796
797