Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/conat/sync/dko.ts
1453 views
1
/*
2
Distributed eventually consistent key:object store, where changes propogate sparsely.
3
4
The "values" MUST be objects and no keys or fields of objects can container the
5
sep character, which is '|' by default.
6
7
NOTE: Whenever you do a set, the lodash isEqual function is used to see which fields
8
you are setting are actually different, and only those get sync'd out.
9
This takes more resources on each client, but less on the network and servers.
10
It also means that if two clients write to an object at the same time but to
11
different field (a merge conflict), then the result gets merged together properly
12
with last write wins per field.
13
14
DEVELOPMENT:
15
16
~/cocalc/src/packages/backend n
17
> t = await require("@cocalc/backend/conat/sync").dko({name:'test'})
18
19
*/
20
21
import { EventEmitter } from "events";
22
import { dkv as createDKV, DKV, DKVOptions } from "./dkv";
23
import { is_object } from "@cocalc/util/misc";
24
import refCache from "@cocalc/util/refcache";
25
import jsonStableStringify from "json-stable-stringify";
26
import { isEqual } from "lodash";
27
28
export function userKvKey(options: DKVOptions) {
29
if (!options.name) {
30
throw Error("name must be specified");
31
}
32
const { client, ...x } = options;
33
return jsonStableStringify(x)!;
34
}
35
36
export class DKO<T = any> extends EventEmitter {
37
dkv?: DKV; // can't type this
38
39
constructor(private opts: DKVOptions) {
40
super();
41
return new Proxy(this, {
42
deleteProperty(target, prop) {
43
if (typeof prop == "string") {
44
target.delete(prop);
45
return true;
46
}
47
return false;
48
},
49
set(target, prop, value) {
50
prop = String(prop);
51
if (prop == "_eventsCount" || prop == "_events" || prop == "close") {
52
target[prop] = value;
53
return true;
54
}
55
if (target[prop] != null) {
56
throw Error(`method name '${prop}' is read only`);
57
}
58
target.set(prop, value);
59
return true;
60
},
61
get(target, prop) {
62
return target[String(prop)] ?? target.get(String(prop));
63
},
64
});
65
}
66
67
private dkvOnChange = ({ key: path, value }) => {
68
if (path == null) {
69
// TODO: could this happen?
70
return;
71
}
72
const { key, field } = this.fromPath(path);
73
if (!field) {
74
// there is no field part of the path, which happens
75
// only for delete of entire object, after setting all
76
// the fields to null.
77
this.emit("change", { key });
78
} else {
79
if (value === undefined && this.dkv?.get(key) == null) {
80
// don't emit change setting fields to undefined if the
81
// object was already deleted.
82
return;
83
}
84
this.emit("change", { key, field, value });
85
}
86
};
87
88
private dkvOnReject = ({ key: path, value }) => {
89
if (path == null) {
90
// TODO: would this happen?
91
return;
92
}
93
const { key, field } = this.fromPath(path);
94
if (!field) {
95
this.emit("reject", { key });
96
} else {
97
this.emit("reject", { key, field, value });
98
}
99
};
100
101
private initialized = false;
102
init = async () => {
103
if (this.initialized) {
104
throw Error("init can only be called once");
105
}
106
this.initialized = true;
107
this.dkv = await createDKV<{ [key: string]: any }>({
108
...this.opts,
109
name: dkoPrefix(this.opts.name),
110
});
111
this.dkv.on("change", this.dkvOnChange);
112
this.dkv.on("reject", this.dkvOnReject);
113
};
114
115
close = async () => {
116
if (this.dkv == null) {
117
return;
118
}
119
this.dkv.removeListener("change", this.dkvOnChange);
120
this.dkv.removeListener("reject", this.dkvOnReject);
121
await this.dkv.close();
122
delete this.dkv;
123
// @ts-ignore
124
delete this.opts;
125
this.emit("closed");
126
this.removeAllListeners();
127
};
128
129
// WARNING: Do *NOT* change toPath and fromPath except in a backward incompat
130
// way, since it would corrupt all user data involving this.
131
private toPath = (key: string, field: string): string => {
132
return JSON.stringify([key, field]);
133
};
134
135
private fromPath = (path: string): { key: string; field?: string } => {
136
if (path.startsWith("[") && path.endsWith("]")) {
137
// *might* be json encoded as above
138
try {
139
const v = JSON.parse(path);
140
if (v.length == 2) {
141
const [key, field] = v;
142
return { key, field };
143
} else {
144
throw Error("fallback");
145
}
146
} catch {
147
// it wasn't json encoded
148
// see https://github.com/sagemathinc/cocalc/issues/8386
149
return { key: path };
150
}
151
} else {
152
// not encoded since no field -- the value of this one is the list of keys
153
return { key: path };
154
}
155
};
156
157
delete = (key: string) => {
158
if (this.dkv == null) {
159
throw Error("closed");
160
}
161
const fields = this.dkv.get(key);
162
if (fields == null) {
163
return;
164
}
165
this.dkv.delete(key);
166
for (const field of fields) {
167
this.dkv.delete(this.toPath(key, field));
168
}
169
};
170
171
clear = () => {
172
this.dkv?.clear();
173
};
174
175
get = (key: string): T | undefined => {
176
if (this.dkv == null) {
177
throw Error("closed");
178
}
179
const fields = this.dkv.get(key);
180
if (fields == null) {
181
return undefined;
182
}
183
const x: any = {};
184
try {
185
for (const field of fields) {
186
x[field] = this.dkv.get(this.toPath(key, field));
187
}
188
return x;
189
} catch {
190
return undefined;
191
}
192
};
193
194
has = (key: string): boolean => {
195
if (this.dkv == null) {
196
throw Error("closed");
197
}
198
return this.dkv.has(key);
199
};
200
201
getAll = (): { [key: string]: T } => {
202
// get everything
203
if (this.dkv == null) {
204
throw Error("closed");
205
}
206
const all = this.dkv.getAll();
207
const result: any = {};
208
for (const x in all) {
209
const { key, field } = this.fromPath(x);
210
if (!field) {
211
continue;
212
}
213
if (result[key] == null) {
214
result[key] = { [field]: all[x] };
215
} else {
216
result[key][field] = all[x];
217
}
218
}
219
return result;
220
};
221
222
set = (key: string, obj: T) => {
223
if (this.dkv == null) {
224
throw Error("closed");
225
}
226
if (obj == null) {
227
this.delete(key);
228
return;
229
}
230
if (!is_object(obj)) {
231
throw Error("values must be objects");
232
}
233
const fields = Object.keys(obj);
234
const cur = this.dkv.get(key);
235
if (!isEqual(cur, fields)) {
236
this.dkv.set(key, fields);
237
}
238
for (const field of fields) {
239
const path = this.toPath(key, field);
240
const value = obj[field];
241
const cur = this.dkv.get(path);
242
if (!isEqual(cur, value)) {
243
this.dkv.set(path, value);
244
}
245
}
246
};
247
248
hasUnsavedChanges = (): boolean => {
249
return !!this.dkv?.hasUnsavedChanges();
250
};
251
252
unsavedChanges = (): { key: string; field: string }[] => {
253
const dkv = this.dkv;
254
if (dkv == null) {
255
return [];
256
}
257
const v = dkv.unsavedChanges();
258
const w: { key: string; field: string }[] = [];
259
for (const path of v) {
260
const { key, field } = this.fromPath(path);
261
if (field) {
262
w.push({ key, field });
263
}
264
}
265
return w;
266
};
267
268
save = async () => {
269
await this.dkv?.save();
270
};
271
}
272
273
export const cache = refCache<DKVOptions, DKO>({
274
name: "dko",
275
createKey: userKvKey,
276
createObject: async (opts) => {
277
const k = new DKO(opts);
278
await k.init();
279
return k;
280
},
281
});
282
283
// WARNING: changing this or it will silently delete user data.
284
export const DKO_PREFIX = "__dko__";
285
286
function dkoPrefix(name: string): string {
287
return `${DKO_PREFIX}${name}`;
288
}
289
290
export async function dko<T>(options: DKVOptions): Promise<DKO<T>> {
291
return await cache(options);
292
}
293
294