Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/conat/sync/dkv.ts
1453 views
1
/*
2
Eventually Consistent Distributed Key:Value Store
3
4
- You give one subject and general-dkv provides a synchronous eventually consistent
5
"multimaster" distributed way to work with the KV store of keys matching that subject,
6
inside of the named KV store.
7
8
- You may define a 3-way merge function, which is used to automatically resolve all
9
conflicting writes. The default is to use our local version, i.e., "last write
10
to remote wins". The function is run locally so can have access to any state.
11
12
- All set/get/delete operations are synchronous.
13
14
- The state gets sync'd in the backend to persistent storage on Conat as soon as possible,
15
and there is an async save function.
16
17
This class is based on top of the Consistent Centralized Key:Value Store defined in kv.ts.
18
You can use the same key:value store at the same time via both interfaces, and if the store
19
is a DKV, you can also access the underlying KV via "store.kv".
20
21
- You must explicitly call "await store.init()" to initialize this before using it.
22
23
- The store emits an event ('change', key) whenever anything changes.
24
25
- Calling "store.getAll()" provides ALL the data, and "store.get(key)" gets one value.
26
27
- Use "store.set(key,value)" or "store.set({key:value, key2:value2, ...})" to set data,
28
with the following semantics:
29
30
- in the background, changes propagate to Conat. You do not do anything explicitly and
31
this should never raise an exception.
32
33
- you can call "store.hasUnsavedChanges()" to see if there are any unsaved changes.
34
35
- call "store.unsavedChanges()" to see the unsaved keys.
36
37
- The 3-way merge function takes as input {local,remote,prev,key}, where
38
- key = the key where there's a conflict
39
- local = your version of the value
40
- remote = the remote value, which conflicts in that isEqual(local,remote) is false.
41
- prev = a known common prev of local and remote.
42
43
(any of local, remote or prev can be undefined, e.g., no previous value or a key was deleted)
44
45
You can do anything synchronously you want to resolve such conflicts, i.e., there are no
46
axioms that have to be satisifed. If the 3-way merge function throws an exception (or is
47
not specified) we silently fall back to "last write wins".
48
49
50
DEVELOPMENT:
51
52
~/cocalc/src/packages/backend$ node
53
54
s = await require("@cocalc/backend/conat/sync").dkv({name:'test', merge:({local,remote})=>{return {...remote,...local}}});
55
56
57
In the browser console:
58
59
> s = await cc.client.conat_client.dkv({filter:['foo.>'],merge:({local,remote})=>{return {...remote,...local}}})
60
61
# NOTE that the name is account-{account_id} or project-{project_id},
62
# and if not given defaults to the account-{user's account id}
63
> s.kv.name
64
'account-6aae57c6-08f1-4bb5-848b-3ceb53e61ede'
65
66
> s.on('change',(key)=>console.log(key));0;
67
68
*/
69
70
import { EventEmitter } from "events";
71
import {
72
CoreStream,
73
type Configuration,
74
type ChangeEvent,
75
} from "./core-stream";
76
import { reuseInFlight } from "@cocalc/util/reuse-in-flight";
77
import { isEqual } from "lodash";
78
import { delay, map as awaitMap } from "awaiting";
79
import {
80
type Client,
81
ConatError,
82
type Headers,
83
} from "@cocalc/conat/core/client";
84
import refCache from "@cocalc/util/refcache";
85
import { type JSONValue } from "@cocalc/util/types";
86
import { conat } from "@cocalc/conat/client";
87
import { asyncThrottle, until } from "@cocalc/util/async-utils";
88
import {
89
inventory,
90
type Inventory,
91
INVENTORY_UPDATE_INTERVAL,
92
} from "./inventory";
93
94
export const TOMBSTONE = Symbol("tombstone");
95
const MAX_PARALLEL = 250;
96
97
const DEBUG = false;
98
99
export type MergeFunction = (opts: {
100
key: string;
101
prev: any;
102
local: any;
103
remote: any;
104
}) => any;
105
106
interface SetOptions {
107
headers?: Headers;
108
}
109
110
export interface DKVOptions {
111
name: string;
112
account_id?: string;
113
project_id?: string;
114
desc?: JSONValue;
115
client?: Client;
116
// 3-way merge conflict resolution
117
merge?: (opts: { key: string; prev?: any; local?: any; remote?: any }) => any;
118
config?: Partial<Configuration>;
119
120
// if noAutosave is set, local changes are never saved until you explicitly
121
// call "await this.save()", which will try once to save. Changes made during
122
// the save may not be saved though.
123
// CAUTION: noAutosave is really only meant for unit testing! The save is
124
// reuseInFlighted so a safe somewhere far away could be in progress starting
125
// before your call to save, and when it finishes that's it, so what you just
126
// did is not saved. Take care.
127
noAutosave?: boolean;
128
129
ephemeral?: boolean;
130
131
noCache?: boolean;
132
noInventory?: boolean;
133
}
134
135
export class DKV<T = any> extends EventEmitter {
136
private kv?: CoreStream<T>;
137
private merge?: MergeFunction;
138
private local: { [key: string]: T | typeof TOMBSTONE } = {};
139
private options: { [key: string]: SetOptions } = {};
140
private saved: { [key: string]: T | typeof TOMBSTONE } = {};
141
private changed: Set<string> = new Set();
142
private noAutosave: boolean;
143
public readonly name: string;
144
public readonly desc?: JSONValue;
145
private saveErrors: boolean = false;
146
private invalidSeq = new Set<number>();
147
private opts: DKVOptions;
148
149
constructor(opts: DKVOptions) {
150
super();
151
if (opts.client == null) {
152
throw Error("client must be specified");
153
}
154
this.opts = opts;
155
const {
156
name,
157
project_id,
158
account_id,
159
desc,
160
client,
161
merge,
162
config,
163
noAutosave,
164
ephemeral = false,
165
} = opts;
166
this.name = name;
167
this.desc = desc;
168
this.merge = merge;
169
this.noAutosave = !!noAutosave;
170
this.kv = new CoreStream({
171
name,
172
project_id,
173
account_id,
174
client,
175
config,
176
ephemeral,
177
});
178
179
return new Proxy(this, {
180
deleteProperty(target, prop) {
181
if (typeof prop == "string") {
182
target.delete(prop);
183
}
184
return true;
185
},
186
set(target, prop, value) {
187
prop = String(prop);
188
if (prop == "_eventsCount" || prop == "_events" || prop == "close") {
189
target[prop] = value;
190
return true;
191
}
192
if (target[prop] != null) {
193
throw Error(`method name '${prop}' is read only`);
194
}
195
target.set(prop, value);
196
return true;
197
},
198
get(target, prop) {
199
return target[String(prop)] ?? target.get(String(prop));
200
},
201
});
202
}
203
204
private initialized = false;
205
init = async () => {
206
if (this.initialized) {
207
throw Error("init can only be called once");
208
}
209
this.initialized = true;
210
if (this.kv == null) {
211
throw Error("closed");
212
}
213
this.kv.on("change", this.handleRemoteChange);
214
await this.kv.init();
215
// allow_msg_ttl is used for deleting tombstones.
216
await this.kv.config({ allow_msg_ttl: true });
217
this.emit("connected");
218
};
219
220
isClosed = () => {
221
return this.kv == null;
222
};
223
224
close = () => {
225
if (this.isClosed()) {
226
return;
227
}
228
const kv = this.kv;
229
delete this.kv;
230
if (kv != null) {
231
kv.removeListener("change", this.handleRemoteChange);
232
kv.close();
233
}
234
this.emit("closed");
235
this.removeAllListeners();
236
// @ts-ignore
237
delete this.local;
238
// @ts-ignore
239
delete this.options;
240
// @ts-ignore
241
delete this.changed;
242
delete this.merge;
243
// @ts-ignore
244
delete this.opts;
245
};
246
247
private discardLocalState = (key: string) => {
248
delete this.local[key];
249
delete this.options[key];
250
delete this.saved[key];
251
if (this.isStable()) {
252
this.emit("stable");
253
}
254
};
255
256
// stable = everything is saved *and* also echoed back from the server as confirmation.
257
isStable = () => {
258
for (const _ in this.local) {
259
return false;
260
}
261
return true;
262
};
263
264
private handleRemoteChange = ({
265
mesg: remote,
266
key,
267
prev,
268
}: ChangeEvent<T>) => {
269
if (key === undefined) {
270
// not part of kv store data
271
return;
272
}
273
const local = this.local[key] === TOMBSTONE ? undefined : this.local[key];
274
let value: any = remote;
275
if (local !== undefined) {
276
// we have an unsaved local value, so let's check to see if there is a
277
// conflict or not.
278
if (isEqual(local, remote)) {
279
// incoming remote value is equal to unsaved local value, so we can
280
// just discard our local value (no need to save it).
281
this.discardLocalState(key);
282
} else {
283
// There is a conflict. Let's resolve the conflict:
284
// console.log("merge conflict", { key, remote, local, prev });
285
try {
286
value = this.merge?.({ key, local, remote, prev }) ?? local;
287
// console.log("merge conflict --> ", value);
288
// console.log("handle merge conflict", {
289
// key,
290
// local,
291
// remote,
292
// prev,
293
// value,
294
// });
295
} catch (err) {
296
console.warn("exception in merge conflict resolution", err);
297
// user provided a merge function that throws an exception. We select local, since
298
// it is the newest, i.e., "last write wins"
299
value = local;
300
// console.log("merge conflict ERROR --> ", err, value);
301
}
302
if (isEqual(value, remote)) {
303
// no change, so forget our local value
304
this.discardLocalState(key);
305
} else {
306
// resolve with the new value, or if it is undefined, a TOMBSTONE,
307
// meaning choice is to delete.
308
// console.log("conflict resolution: ", { key, value });
309
if (value === TOMBSTONE) {
310
this.delete(key);
311
} else {
312
this.set(key, value);
313
}
314
}
315
}
316
}
317
this.emit("change", { key, value, prev });
318
};
319
320
get(key: string): T | undefined;
321
get(): { [key: string]: T };
322
get(key?: string): T | { [key: string]: T } | undefined {
323
if (this.kv == null) {
324
throw Error("closed");
325
}
326
if (key === undefined) {
327
return this.getAll();
328
}
329
const local = this.local[key];
330
if (local === TOMBSTONE) {
331
return undefined;
332
}
333
if (local !== undefined) {
334
return local;
335
}
336
return this.kv.getKv(key);
337
}
338
339
get length(): number {
340
// not efficient
341
return Object.keys(this.getAll()).length;
342
}
343
344
getAll = (): { [key: string]: T } => {
345
if (this.kv == null) {
346
throw Error("closed");
347
}
348
const x = { ...this.kv.getAllKv(), ...this.local };
349
for (const key in this.local) {
350
if (this.local[key] === TOMBSTONE) {
351
delete x[key];
352
}
353
}
354
return x as { [key: string]: T };
355
};
356
357
// gets all the keys; fast because doesn't decode messages
358
keys = (): string[] => {
359
if (this.kv == null) {
360
return [];
361
}
362
// this is fast
363
const keys = this.kv.keysKv();
364
365
// have to add any unsaved keys in this.local
366
let X: Set<string> | null = null;
367
for (const key in this.local) {
368
if (X === null) {
369
X = new Set(keys);
370
}
371
if (!X.has(key)) {
372
keys.push(key);
373
}
374
}
375
return keys;
376
};
377
378
has = (key: string): boolean => {
379
if (this.kv == null) {
380
throw Error("closed");
381
}
382
const a = this.local[key];
383
if (a === TOMBSTONE) {
384
return false;
385
}
386
if (a !== undefined) {
387
return true;
388
}
389
return this.kv.hasKv(key);
390
};
391
392
time = (key?: string): { [key: string]: Date } | Date | undefined => {
393
if (this.kv == null) {
394
throw Error("closed");
395
}
396
return this.kv.timeKv(key);
397
};
398
399
seq = (key: string): number | undefined => {
400
if (this.kv == null) {
401
throw Error("closed");
402
}
403
return this.kv.seqKv(key);
404
};
405
406
private _delete = (key) => {
407
this.local[key] = TOMBSTONE;
408
this.changed.add(key);
409
};
410
411
delete = (key) => {
412
this._delete(key);
413
if (!this.noAutosave) {
414
this.save();
415
}
416
};
417
418
clear = () => {
419
if (this.kv == null) {
420
throw Error("closed");
421
}
422
for (const key in this.kv.getAllKv()) {
423
this._delete(key);
424
}
425
for (const key in this.local) {
426
this._delete(key);
427
}
428
if (!this.noAutosave) {
429
this.save();
430
}
431
};
432
433
private toValue = (obj) => {
434
if (obj === undefined) {
435
return TOMBSTONE;
436
}
437
return obj;
438
};
439
440
headers = (key: string): Headers | undefined => {
441
if (this.options[key] != null) {
442
return this.options[key]?.headers;
443
} else {
444
return this.kv?.headersKv(key);
445
}
446
};
447
448
set = (key: string, value: T, options?: SetOptions) => {
449
const obj = this.toValue(value);
450
this.local[key] = obj;
451
if (options != null) {
452
this.options[key] = options;
453
}
454
this.changed.add(key);
455
if (!this.noAutosave) {
456
this.save();
457
}
458
this.updateInventory();
459
};
460
461
setMany = (obj) => {
462
for (const key in obj) {
463
this.local[key] = this.toValue(obj[key]);
464
this.changed.add(key);
465
}
466
if (!this.noAutosave) {
467
this.save();
468
}
469
this.updateInventory();
470
};
471
472
hasUnsavedChanges = () => {
473
if (this.kv == null) {
474
return false;
475
}
476
return this.unsavedChanges().length > 0;
477
};
478
479
unsavedChanges = (): string[] => {
480
return Object.keys(this.local).filter(
481
(key) => this.local[key] !== this.saved[key],
482
);
483
};
484
485
save = reuseInFlight(async () => {
486
if (this.noAutosave) {
487
return await this.attemptToSave();
488
}
489
let status;
490
491
await until(
492
async () => {
493
if (this.kv == null) {
494
return true;
495
}
496
try {
497
status = await this.attemptToSave();
498
//console.log("successfully saved");
499
} catch (err) {
500
if (false && !process.env.COCALC_TEST_MODE) {
501
console.log(
502
"WARNING: dkv attemptToSave failed -- ",
503
this.name,
504
this.kv?.name,
505
err,
506
);
507
}
508
}
509
return !this.hasUnsavedChanges();
510
},
511
{ start: 150, decay: 1.3, max: 10000 },
512
);
513
return status;
514
});
515
516
private attemptToSave = async () => {
517
if (true) {
518
await this.attemptToSaveMany();
519
} else {
520
await this.attemptToSaveParallel();
521
}
522
};
523
524
private attemptToSaveMany = reuseInFlight(async () => {
525
let start = Date.now();
526
if (DEBUG) {
527
console.log("attemptToSaveMany: start");
528
}
529
if (this.kv == null) {
530
throw Error("closed");
531
}
532
this.changed.clear();
533
const status = { unsaved: 0, set: 0, delete: 0 };
534
const obj = { ...this.local };
535
for (const key in obj) {
536
if (obj[key] === TOMBSTONE) {
537
status.unsaved += 1;
538
await this.kv.deleteKv(key);
539
if (this.kv == null) return;
540
status.delete += 1;
541
status.unsaved -= 1;
542
delete obj[key];
543
if (!this.changed.has(key)) {
544
// successfully saved this and user didn't make a change *during* the set
545
this.discardLocalState(key);
546
}
547
}
548
}
549
let errors = false;
550
const x: {
551
key: string;
552
mesg: T;
553
options?: {
554
headers?: Headers;
555
previousSeq?: number;
556
};
557
}[] = [];
558
for (const key in obj) {
559
const previousSeq = this.merge != null ? this.seq(key) : undefined;
560
if (previousSeq && this.invalidSeq.has(previousSeq)) {
561
continue;
562
}
563
status.unsaved += 1;
564
x.push({
565
key,
566
mesg: obj[key] as T,
567
options: {
568
...this.options[key],
569
previousSeq,
570
},
571
});
572
}
573
const results = await this.kv.setKvMany(x);
574
575
let i = 0;
576
for (const resp of results) {
577
const { key } = x[i];
578
i++;
579
if (this.kv == null) return;
580
if (!(resp as any).error) {
581
status.unsaved -= 1;
582
status.set += 1;
583
} else {
584
const { code, error } = resp as any;
585
if (DEBUG) {
586
console.log("kv store -- attemptToSave failed", this.desc, error, {
587
key,
588
value: obj[key],
589
code: code,
590
});
591
}
592
errors = true;
593
if (code == "reject") {
594
const value = this.local[key];
595
// can never save this.
596
this.discardLocalState(key);
597
status.unsaved -= 1;
598
this.emit("reject", { key, value });
599
}
600
if (code == "wrong-last-sequence") {
601
// This happens when another client has published a NEWER version of this key,
602
// so the right thing is to just ignore this. In a moment there will be no
603
// need to save anything, since we'll receive a message that overwrites this key.
604
// It's very important that the changefeed actually be working, of course, which
605
// is why the this.invalidSeq, so we never retry in this case, since it can't work.
606
if (x[i]?.options?.previousSeq) {
607
this.invalidSeq.add(x[i].options!.previousSeq!);
608
}
609
return;
610
}
611
if (code == 408) {
612
// timeout -- expected to happen periodically, of course
613
if (!process.env.COCALC_TEST_MODE) {
614
console.log("WARNING: timeout saving (will try again soon)");
615
}
616
return;
617
}
618
if (!process.env.COCALC_TEST_MODE) {
619
console.warn(
620
`WARNING: unexpected error saving dkv '${this.name}' -- ${error}`,
621
);
622
}
623
}
624
}
625
if (errors) {
626
this.saveErrors = true;
627
throw Error(`there were errors saving dkv '${this.name}'`);
628
// so it retries
629
} else {
630
if (
631
!process.env.COCALC_TEST_MODE &&
632
this.saveErrors &&
633
status.unsaved == 0
634
) {
635
this.saveErrors = false;
636
console.log(`SUCCESS: dkv ${this.name} fully saved`);
637
}
638
}
639
if (DEBUG) {
640
console.log("attemptToSaveMany: done", Date.now() - start);
641
}
642
643
return status;
644
});
645
646
attemptToSaveParallel = reuseInFlight(async () => {
647
let start = Date.now();
648
if (DEBUG) {
649
console.log("attemptToSaveParallel: start");
650
}
651
if (this.kv == null) {
652
throw Error("closed");
653
}
654
this.changed.clear();
655
const status = { unsaved: 0, set: 0, delete: 0 };
656
const obj = { ...this.local };
657
for (const key in obj) {
658
if (obj[key] === TOMBSTONE) {
659
status.unsaved += 1;
660
await this.kv.deleteKv(key);
661
if (this.kv == null) return;
662
status.delete += 1;
663
status.unsaved -= 1;
664
delete obj[key];
665
if (!this.changed.has(key)) {
666
// successfully saved this and user didn't make a change *during* the set
667
this.discardLocalState(key);
668
}
669
}
670
}
671
let errors = false;
672
const f = async (key: string) => {
673
if (this.kv == null) {
674
// closed
675
return;
676
}
677
const previousSeq = this.merge != null ? this.seq(key) : undefined;
678
try {
679
if (previousSeq && this.invalidSeq.has(previousSeq)) {
680
throw new ConatError("waiting on new sequence via changefeed", {
681
code: "wrong-last-sequence",
682
});
683
}
684
status.unsaved += 1;
685
await this.kv.setKv(key, obj[key] as T, {
686
...this.options[key],
687
previousSeq,
688
});
689
if (this.kv == null) return;
690
if (DEBUG) {
691
console.log("kv store -- attemptToSave succeed", this.desc, {
692
key,
693
value: obj[key],
694
});
695
}
696
status.unsaved -= 1;
697
status.set += 1;
698
// note that we CANNOT call this.discardLocalState(key) here, because
699
// this.get(key) needs to work immediately after save, but if this.local[key]
700
// is deleted, then this.get(key) would be undefined, because
701
// this.kv.getKv(key) only has value in it once the value is
702
// echoed back from the server.
703
} catch (err) {
704
if (DEBUG) {
705
console.log("kv store -- attemptToSave failed", this.desc, err, {
706
key,
707
value: obj[key],
708
code: err.code,
709
});
710
}
711
errors = true;
712
if (err.code == "reject") {
713
const value = this.local[key];
714
// can never save this.
715
this.discardLocalState(key);
716
status.unsaved -= 1;
717
this.emit("reject", { key, value });
718
}
719
if (err.code == "wrong-last-sequence") {
720
// This happens when another client has published a NEWER version of this key,
721
// so the right thing is to just ignore this. In a moment there will be no
722
// need to save anything, since we'll receive a message that overwrites this key.
723
// It's very important that the changefeed actually be working, of course, which
724
// is why the this.invalidSeq, so we never retry in this case, since it can't work.
725
if (previousSeq) {
726
this.invalidSeq.add(previousSeq);
727
}
728
return;
729
}
730
if (err.code == 408) {
731
// timeout -- expected to happen periodically, of course
732
if (!process.env.COCALC_TEST_MODE) {
733
console.log("WARNING: timeout saving (will try again soon)");
734
}
735
return;
736
}
737
if (!process.env.COCALC_TEST_MODE) {
738
console.warn(
739
`WARNING: unexpected error saving dkv '${this.name}' -- ${err}`,
740
);
741
}
742
}
743
};
744
await awaitMap(Object.keys(obj), MAX_PARALLEL, f);
745
if (errors) {
746
this.saveErrors = true;
747
throw Error(`there were errors saving dkv '${this.name}'`);
748
// so it retries
749
} else {
750
if (
751
!process.env.COCALC_TEST_MODE &&
752
this.saveErrors &&
753
status.unsaved == 0
754
) {
755
this.saveErrors = false;
756
console.log(`SUCCESS: dkv ${this.name} fully saved`);
757
}
758
}
759
if (DEBUG) {
760
console.log("attemptToSaveParallel: done", Date.now() - start);
761
}
762
763
return status;
764
});
765
766
stats = () => this.kv?.stats();
767
768
// get or set config
769
config = async (
770
config: Partial<Configuration> = {},
771
): Promise<Configuration> => {
772
if (this.kv == null) {
773
throw Error("not initialized");
774
}
775
return await this.kv.config(config);
776
};
777
778
private updateInventory = asyncThrottle(
779
async () => {
780
if (this.isClosed() || this.opts == null || this.opts.noInventory) {
781
return;
782
}
783
await delay(500);
784
if (this.isClosed() || this.kv == null) {
785
return;
786
}
787
let inv: Inventory | undefined = undefined;
788
try {
789
const { account_id, project_id, desc } = this.opts;
790
const inv = await inventory({ account_id, project_id });
791
if (this.isClosed()) {
792
return;
793
}
794
const status = {
795
type: "kv" as "kv",
796
name: this.opts.name,
797
desc,
798
...(await this.kv.inventory()),
799
};
800
inv.set(status);
801
} catch (err) {
802
if (!process.env.COCALC_TEST_MODE) {
803
console.log(
804
`WARNING: unable to update inventory. name='${this.opts.name} -- ${err}'`,
805
);
806
}
807
} finally {
808
// @ts-ignore
809
inv?.close();
810
}
811
},
812
INVENTORY_UPDATE_INTERVAL,
813
{ leading: true, trailing: true },
814
);
815
}
816
817
export const cache = refCache<DKVOptions, DKV>({
818
name: "dkv",
819
createKey: ({ name, account_id, project_id }) =>
820
JSON.stringify({ name, account_id, project_id }),
821
createObject: async (opts) => {
822
if (opts.client == null) {
823
opts = { ...opts, client: await conat() };
824
}
825
const k = new DKV(opts);
826
await k.init();
827
return k;
828
},
829
});
830
831
export async function dkv<T>(options: DKVOptions): Promise<DKV<T>> {
832
return await cache(options);
833
}
834
835