Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/database/postgres/changefeed.ts
1503 views
1
/*
2
* This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.
3
* License: MS-RSL – see LICENSE.md for details
4
*/
5
6
/*
7
The Changes class is a useful building block
8
for making changefeeds. It lets you watch when given
9
columns change in a given table, and be notified
10
when a where condition is satisfied.
11
12
IMPORTANT: If an error event is emitted then
13
Changes object will close and not work any further!
14
You must recreate it.
15
*/
16
17
import { EventEmitter } from "events";
18
import * as misc from "@cocalc/util/misc";
19
import { opToFunction, OPERATORS, Operator } from "@cocalc/util/db-schema";
20
import { callback } from "awaiting";
21
import { PostgreSQL, QuerySelect } from "./types";
22
import { query } from "./changefeed-query";
23
24
type WhereCondition = Function | object | object[];
25
26
type ChangeAction = "delete" | "insert" | "update";
27
function parse_action(obj: string): ChangeAction {
28
const s: string = `${obj.toLowerCase()}`;
29
if (s === "delete" || s === "insert" || s === "update") {
30
return s;
31
}
32
throw Error(`invalid action "${s}"`);
33
}
34
35
export interface ChangeEvent {
36
action: ChangeAction;
37
new_val?: object;
38
old_val?: object;
39
}
40
41
export class Changes extends EventEmitter {
42
private db: PostgreSQL;
43
private table: string;
44
private select: QuerySelect;
45
private watch: string[];
46
private where: WhereCondition;
47
48
private trigger_name: string;
49
private closed: boolean;
50
private condition?: { [field: string]: Function };
51
private match_condition: Function;
52
53
private val_update_cache: { [key: string]: any } = {};
54
55
constructor(
56
db: PostgreSQL,
57
table: string,
58
select: QuerySelect,
59
watch: string[],
60
where: WhereCondition,
61
cb: Function,
62
) {
63
super();
64
this.db = db;
65
this.table = table;
66
this.select = select;
67
this.watch = watch;
68
this.where = where;
69
this.init(cb);
70
}
71
72
init = async (cb: Function): Promise<void> => {
73
this.dbg("constructor")(
74
`select=${misc.to_json(this.select)}, watch=${misc.to_json(
75
this.watch,
76
)}, @_where=${misc.to_json(this.where)}`,
77
);
78
79
try {
80
this.init_where();
81
} catch (e) {
82
cb(`error initializing where conditions -- ${e}`);
83
return;
84
}
85
86
try {
87
this.trigger_name = await callback(
88
this.db._listen,
89
this.table,
90
this.select,
91
this.watch,
92
);
93
} catch (err) {
94
cb(err);
95
return;
96
}
97
this.db.on(this.trigger_name, this.handle_change);
98
// NOTE: we close on *connect*, not on disconnect, since then clients
99
// that try to reconnect will only try to do so when we have an actual
100
// connection to the database. No point in worrying them while trying
101
// to reconnect, which only makes matters worse (as they panic and
102
// requests pile up!).
103
104
// This setMaxListeners is here because I keep getting warning about
105
// this despite setting it in the db constructor. Putting this here
106
// definitely does work, whereas having it only in the constructor
107
// definitely does NOT. Don't break this without thought, as it has very bad
108
// consequences when the database connection drops.
109
this.db.setMaxListeners(0);
110
111
this.db.once("connect", this.close);
112
cb(undefined, this);
113
};
114
115
private dbg = (f: string): Function => {
116
return this.db._dbg(`Changes(table='${this.table}').${f}`);
117
};
118
119
// this breaks the changefeed -- client must recreate it; nothing further will work at all.
120
private fail = (err): void => {
121
if (this.closed) {
122
return;
123
}
124
this.dbg("_fail")(`err='${err}'`);
125
this.emit("error", new Error(err));
126
this.close();
127
};
128
129
close = (): void => {
130
if (this.closed) {
131
return;
132
}
133
this.emit("close", { action: "close" });
134
this.removeAllListeners();
135
if (this.db != null) {
136
this.db.removeListener(this.trigger_name, this.handle_change);
137
this.db.removeListener("connect", this.close);
138
this.db._stop_listening(this.table, this.select, this.watch);
139
}
140
misc.close(this);
141
this.closed = true;
142
};
143
144
insert = async (where): Promise<void> => {
145
const where0: { [field: string]: any } = {};
146
for (const k in where) {
147
const v = where[k];
148
where0[`${k} = $`] = v;
149
}
150
let results: { [field: string]: any }[];
151
try {
152
results = await query({
153
db: this.db,
154
select: this.watch.concat(misc.keys(this.select)),
155
table: this.table,
156
where: where0,
157
one: false,
158
});
159
} catch (err) {
160
this.fail(err); // this is game over
161
return;
162
}
163
for (const x of results) {
164
if (this.match_condition(x)) {
165
misc.map_mutate_out_undefined_and_null(x);
166
const change: ChangeEvent = { action: "insert", new_val: x };
167
this.emit("change", change);
168
}
169
}
170
};
171
172
delete = (where): void => {
173
// listener is meant to delete everything that *matches* the where, so
174
// there is no need to actually do a query.
175
const change: ChangeEvent = { action: "delete", old_val: where };
176
this.emit("change", change);
177
};
178
179
private handle_change = async (mesg): Promise<void> => {
180
if (this.closed) {
181
return;
182
}
183
// this.dbg("handle_change")(JSON.stringify(mesg));
184
if (mesg[0] === "DELETE") {
185
if (!this.match_condition(mesg[2])) {
186
return;
187
}
188
this.emit("change", { action: "delete", old_val: mesg[2] });
189
return;
190
}
191
let k: string, r: ChangeEvent, v: any;
192
if (typeof mesg[0] !== "string") {
193
throw Error(`invalid mesg -- mesg[0] must be a string`);
194
}
195
let action: ChangeAction = parse_action(mesg[0]);
196
if (!this.match_condition(mesg[1])) {
197
// object does not match condition
198
if (action !== "update") {
199
// new object that doesn't match condition -- nothing to do.
200
return;
201
}
202
// fill in for each part that we watch in new object the same
203
// data in the old object, in case it is missing.
204
// TODO: when is this actually needed?
205
for (k in mesg[1]) {
206
v = mesg[1][k];
207
if (mesg[2][k] == null) {
208
mesg[2][k] = v;
209
}
210
}
211
if (this.match_condition(mesg[2])) {
212
// the old object was in our changefeed, but the UPDATE made it not
213
// anymore, so we emit delete action.
214
this.emit("change", { action: "delete", old_val: mesg[2] });
215
}
216
// Nothing more to do.
217
return;
218
}
219
if (this.watch.length === 0) {
220
// No additional columns are being watched at all -- we only
221
// care about what's in the mesg.
222
r = { action, new_val: mesg[1] };
223
this.emit("change", r);
224
return;
225
}
226
// Additional columns are watched so we must do a query to get them.
227
// There's no way around this due to the size limits on postgres LISTEN/NOTIFY.
228
const where = {};
229
for (k in mesg[1]) {
230
v = mesg[1][k];
231
where[`${k} = $`] = v;
232
}
233
let result: undefined | { [field: string]: any };
234
try {
235
result = await query({
236
db: this.db,
237
select: this.watch,
238
table: this.table,
239
where,
240
one: true,
241
});
242
} catch (err) {
243
this.fail(err);
244
return;
245
}
246
247
// we do know from stacktraces that new_val_update is called after closed
248
// this must have happened during waiting on the query. aborting early.
249
if (this.closed) {
250
return;
251
}
252
253
if (result == null) {
254
// This happens when record isn't deleted, but some
255
// update results in the object being removed from our
256
// selection criterion... which we view as "delete".
257
this.emit("change", { action: "delete", old_val: mesg[1] });
258
return;
259
}
260
261
const key = JSON.stringify(mesg[1]);
262
const this_val = misc.merge(result, mesg[1]);
263
let new_val;
264
if (action == "update") {
265
const x = this.new_val_update(mesg[1], this_val, key);
266
if (x == null) {
267
// happens if this.closed is true -- double check for safety (and typescript).
268
return;
269
}
270
action = x.action; // may be insert in case no previous cached info.
271
new_val = x.new_val;
272
} else {
273
// not update and not delete (could have been a delete and write
274
// before we did above query, so treat as insert).
275
action = "insert";
276
new_val = this_val;
277
}
278
this.val_update_cache[key] = this_val;
279
280
r = { action, new_val };
281
this.emit("change", r);
282
};
283
284
private new_val_update = (
285
primary_part: { [key: string]: any },
286
this_val: { [key: string]: any },
287
key: string,
288
):
289
| { new_val: { [key: string]: any }; action: "insert" | "update" }
290
| undefined => {
291
if (this.closed) {
292
return;
293
}
294
const prev_val = this.val_update_cache[key];
295
if (prev_val == null) {
296
return { new_val: this_val, action: "insert" }; // not enough info to make a diff
297
}
298
this.dbg("new_val_update")(`${JSON.stringify({ this_val, prev_val })}`);
299
300
// Send only the fields that changed between
301
// prev_val and this_val, along with the primary part.
302
const new_val = misc.copy(primary_part);
303
// Not using lodash isEqual below, since we want equal Date objects
304
// to compare as equal. If JSON is randomly re-ordered, that's fine since
305
// it is just slightly less efficienct.
306
for (const field in this_val) {
307
if (
308
new_val[field] === undefined &&
309
JSON.stringify(this_val[field]) != JSON.stringify(prev_val[field])
310
) {
311
new_val[field] = this_val[field];
312
}
313
}
314
for (const field in prev_val) {
315
if (prev_val[field] != null && this_val[field] == null) {
316
// field was deleted / set to null -- we must inform in the update
317
new_val[field] = null;
318
}
319
}
320
return { new_val, action: "update" };
321
};
322
323
private init_where = (): void => {
324
if (typeof this.where === "function") {
325
// user provided function
326
this.match_condition = this.where;
327
return;
328
}
329
330
let w: any[];
331
if (misc.is_object(this.where)) {
332
w = [this.where];
333
} else {
334
// TODO: misc.is_object needs to be a typescript checker instead, so
335
// this as isn't needed.
336
w = this.where as object[];
337
}
338
339
this.condition = {};
340
const add_condition = (field: string, op: Operator, val: any): void => {
341
if (this.condition == null) {
342
return; // won't happen
343
}
344
let f: Function, g: Function;
345
field = field.trim();
346
if (field[0] === '"') {
347
// de-quote
348
field = field.slice(1, field.length - 1);
349
}
350
if (this.select[field] == null) {
351
throw Error(
352
`'${field}' must be in select="${JSON.stringify(this.select)}"`,
353
);
354
}
355
if (misc.is_object(val)) {
356
throw Error(`val (=${misc.to_json(val)}) must not be an object`);
357
}
358
if (misc.is_array(val)) {
359
if (op === "=" || op === "==") {
360
// containment
361
f = function (x) {
362
for (const v of val) {
363
if (x === v) {
364
return true;
365
}
366
}
367
return false;
368
};
369
} else if (op === "!=" || op === "<>") {
370
// not contained in
371
f = function (x) {
372
for (const v of val) {
373
if (x === v) {
374
return false;
375
}
376
}
377
return true;
378
};
379
} else {
380
throw Error("if val is an array, then op must be = or !=");
381
}
382
} else if (misc.is_date(val)) {
383
// Inputs to condition come back as JSON, which doesn't know
384
// about timestamps, so we convert them to date objects.
385
if (op == "=" || op == "==") {
386
f = (x) => new Date(x).valueOf() - val.valueOf() === 0;
387
} else if (op == "!=" || op == "<>") {
388
f = (x) => new Date(x).valueOf() - val.valueOf() !== 0;
389
} else {
390
g = opToFunction(op);
391
f = (x) => g(new Date(x), val);
392
}
393
} else {
394
g = opToFunction(op);
395
f = (x) => g(x, val);
396
}
397
this.condition[field] = f;
398
};
399
400
for (const obj of w) {
401
if (misc.is_object(obj)) {
402
for (const k in obj) {
403
const val = obj[k];
404
/*
405
k should be of one of the following forms
406
- "field op $::TYPE"
407
- "field op $" or
408
- "field op any($)"
409
- "$ op any(field)"
410
- 'field' (defaults to =)
411
where op is one of =, <, >, <=, >=, !=
412
413
val must be:
414
- something where javascript === and comparisons works as you expect!
415
- or an array, in which case op must be = or !=, and we ALWAYS do inclusion (analogue of any).
416
*/
417
if (k.startsWith("$")) {
418
/*
419
The "$ op any(field)" is used, e.g., for having multiple owners
420
of a single thing, e.g.,:
421
422
pg_where: [{ "$::UUID = ANY(owner_account_ids)": "account_id" }]
423
424
where we need to get the field(=owner_account_ids) and check that
425
val(=account_id) is in it, at the javascript level.
426
*/
427
if (k.includes("<") || k.includes(">")) {
428
throw Error("only = and != are supported");
429
}
430
const isEquals = !k.includes("!=");
431
const i = k.toLowerCase().indexOf("any(");
432
if (i == -1) {
433
throw Error(
434
"condition must be $=ANY(...) or $!=ANY(...) -- missing close paren",
435
);
436
}
437
const j = k.lastIndexOf(")");
438
if (j == -1) {
439
throw Error(
440
"condition must be $=ANY(...) or $!=ANY(...) -- missing close parent",
441
);
442
}
443
const field = k.slice(i + 4, j);
444
if (isEquals) {
445
this.condition[field] = (x) => !!x?.includes(val);
446
} else {
447
this.condition[field] = (x) => !x?.includes(val);
448
}
449
} else {
450
let found = false;
451
for (const op of OPERATORS) {
452
const i = k.indexOf(op);
453
if (i !== -1) {
454
const field = k.slice(0, i).trim();
455
add_condition(field, op, val);
456
found = true;
457
break;
458
}
459
}
460
if (!found) {
461
throw Error(`unable to parse '${k}'`);
462
}
463
}
464
}
465
} else if (typeof obj === "string") {
466
let found = false;
467
for (const op of OPERATORS) {
468
const i = obj.indexOf(op);
469
if (i !== -1) {
470
add_condition(
471
obj.slice(0, i),
472
op,
473
eval(obj.slice(i + op.length).trim()),
474
);
475
found = true;
476
break;
477
}
478
}
479
if (!found) {
480
throw Error(`unable to parse '${obj}'`);
481
}
482
} else {
483
throw Error("NotImplementedError");
484
}
485
}
486
if (misc.len(this.condition) === 0) {
487
delete this.condition;
488
}
489
490
this.match_condition = (obj: object): boolean => {
491
//console.log '_match_condition', obj
492
if (this.condition == null) {
493
return true;
494
}
495
for (const field in this.condition) {
496
const f = this.condition[field];
497
if (!f(obj[field])) {
498
//console.log 'failed due to field ', field
499
return false;
500
}
501
}
502
return true;
503
};
504
};
505
}
506
507