Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/conat/sync/synctable-kv.ts
1452 views
1
/*
2
3
4
5
6
*/
7
8
import { keys } from "lodash";
9
import { client_db } from "@cocalc/util/db-schema/client-db";
10
import type { State } from "@cocalc/conat/types";
11
import type { Client } from "@cocalc/conat/core/client";
12
import { EventEmitter } from "events";
13
import { dkv as createDkv, type DKV } from "./dkv";
14
import { dko as createDko, type DKO } from "./dko";
15
import jsonStableStringify from "json-stable-stringify";
16
import { toKey } from "@cocalc/conat/util";
17
import { wait } from "@cocalc/util/async-wait";
18
import { fromJS, Map } from "immutable";
19
import type { JSONValue } from "@cocalc/util/types";
20
import type { Configuration } from "@cocalc/conat/sync/core-stream";
21
22
export class SyncTableKV extends EventEmitter {
23
public readonly table;
24
private query;
25
private atomic: boolean;
26
private primaryKeys: string[];
27
private project_id?: string;
28
private account_id?: string;
29
private state: State = "disconnected";
30
private dkv?: DKV | DKO;
31
private client: Client;
32
private getHook: Function;
33
private config?: Partial<Configuration>;
34
private desc?: JSONValue;
35
private ephemeral?: boolean;
36
37
constructor({
38
query,
39
client,
40
account_id,
41
project_id,
42
atomic,
43
immutable,
44
config,
45
desc,
46
ephemeral,
47
}: {
48
query;
49
client: Client;
50
account_id?: string;
51
project_id?: string;
52
atomic?: boolean;
53
immutable?: boolean;
54
config?: Partial<Configuration>;
55
desc?: JSONValue;
56
ephemeral?: boolean;
57
}) {
58
super();
59
this.setMaxListeners(1000);
60
this.atomic = !!atomic;
61
this.getHook = immutable ? fromJS : (x) => x;
62
this.query = query;
63
this.config = config;
64
this.client = client;
65
this.desc = desc;
66
this.ephemeral = ephemeral;
67
this.table = keys(query)[0];
68
if (query[this.table][0].string_id && query[this.table][0].project_id) {
69
this.project_id = query[this.table][0].project_id;
70
} else {
71
this.account_id = account_id ?? query[this.table][0].account_id;
72
this.project_id = project_id;
73
}
74
this.primaryKeys = client_db.primary_keys(this.table);
75
}
76
77
private set_state = (state: State): void => {
78
this.state = state;
79
this.emit(state);
80
};
81
82
get_state = () => {
83
return this.state;
84
};
85
86
// WARNING: be *VERY* careful before changing how the name is
87
// derived from the query, since if you change this all the current
88
// data in conat that caches the changefeeds is basically lost
89
// and users MUST refresh their browsers (and maybe projects restart?)
90
// to get new changefeeds, since they are watching something given
91
// by this name. I.e., this name shouldn't ever be changed.
92
// The point of the name is that it uniquely identifies the
93
// changefeed query, so just using the query itself should be fine.
94
// A big choice here is the full name or just something short like the
95
// sha1 hash, but I've chosen the full name, since then it is always easy
96
// to know what the query was, i.e., use base64 decoding then you
97
// have the query. It's less efficient though since the conat subjects
98
// can be long, depending on the query.
99
// This way if we are just watching general conat traffic and see something
100
// suspicious, even if we have no idea initially where it came from,
101
// we can easily see by decoding it.
102
// Including even the fields with no values distinguishes different
103
// changefeeds that pick off different columns from the database.
104
// PLAN: Longterm there's no doubt that changefeeds in postgresql will
105
// be eliminated from cocalc completely, and at that point situation
106
// will melt away.
107
private getName = () => {
108
const spec = this.query[this.table][0];
109
if (spec.string_id) {
110
// special case -- the tables with a string_id never touch the database
111
// and are used with *different* spec at the same time to coordinate
112
// between browser and project, so we can't use the spec.
113
return `${this.table}:${spec.string_id}`;
114
}
115
return `${this.table}:${jsonStableStringify(spec)}`;
116
};
117
118
init = async () => {
119
const name = this.getName();
120
if (this.atomic) {
121
this.dkv = await createDkv({
122
client: this.client,
123
name,
124
account_id: this.account_id,
125
project_id: this.project_id,
126
config: this.config,
127
desc: this.desc,
128
ephemeral: this.ephemeral,
129
});
130
} else {
131
this.dkv = await createDko({
132
client: this.client,
133
name,
134
account_id: this.account_id,
135
project_id: this.project_id,
136
config: this.config,
137
desc: this.desc,
138
ephemeral: this.ephemeral,
139
});
140
}
141
// For some reason this one line confuses typescript and break building the compute server package (nothing else similar happens).
142
// Do not remove. The error is that "this.dkv.on" is not callable.
143
// @ts-ignore
144
this.dkv.on("change", (x) => {
145
if (!this.atomic) {
146
if (x.value === undefined) {
147
// delete
148
x = { ...x, prev: this.dkv?.get(x.key) };
149
} else {
150
// change
151
x = { ...x, value: this.dkv?.get(x.key) };
152
}
153
}
154
// change api was to emit array of keys.
155
// We also use this packages/sync/table/changefeed-conat.ts which needs the value,
156
// so we emit that object second.
157
this.emit("change", [x.key], x);
158
});
159
this.set_state("connected");
160
};
161
162
getKey = (obj_or_key): string => {
163
if (typeof obj_or_key == "string") {
164
return obj_or_key;
165
}
166
let obj = obj_or_key;
167
if (Map.isMap(obj)) {
168
obj = obj.toJS();
169
}
170
if (this.primaryKeys.length === 1) {
171
return toKey(obj[this.primaryKeys[0]] ?? "")!;
172
} else {
173
// compound primary key
174
return toKey(this.primaryKeys.map((pk) => obj[pk]))!;
175
}
176
};
177
178
set = (obj) => {
179
if (this.dkv == null) throw Error("closed");
180
if (Map.isMap(obj)) {
181
obj = obj.toJS();
182
}
183
this.dkv.set(this.getKey(obj), obj);
184
};
185
186
delete = (obj_or_key) => {
187
if (this.dkv == null) throw Error("closed");
188
this.dkv.delete(this.getKey(obj_or_key));
189
};
190
191
get = (obj_or_key?) => {
192
if (this.dkv == null) throw Error("closed");
193
if (obj_or_key == null) {
194
return this.getHook(this.dkv.getAll());
195
}
196
return this.getHook(this.dkv.get(this.getKey(obj_or_key)));
197
};
198
199
get_one = () => {
200
if (this.dkv == null) throw Error("closed");
201
// TODO: insanely inefficient, especially if !atomic!
202
for (const key in this.dkv.getAll()) {
203
return this.get(key);
204
}
205
};
206
207
save = async () => {
208
await this.dkv?.save();
209
};
210
211
close = async () => {
212
if (this.state == "closed") return;
213
this.set_state("closed");
214
this.removeAllListeners();
215
await this.dkv?.close();
216
delete this.dkv;
217
// @ts-ignore
218
delete this.client;
219
};
220
221
public async wait(until: Function, timeout: number = 30): Promise<any> {
222
if (this.state == "closed") {
223
throw Error("wait: must not be closed");
224
}
225
return await wait({
226
obj: this,
227
until,
228
timeout,
229
change_event: "change",
230
});
231
}
232
}
233
234