Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/sync/table/changefeed-conat.ts
1447 views
1
/*
2
* This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.
3
* License: MS-RSL – see LICENSE.md for details
4
*/
5
6
import { EventEmitter } from "events";
7
import { changefeed, type Changefeed } from "@cocalc/conat/hub/changefeeds";
8
import { conat } from "@cocalc/conat/client";
9
10
// low level debugging of changefeeds
11
const LOW_LEVEL_DEBUG = false;
12
const log = LOW_LEVEL_DEBUG
13
? (...args) => {
14
console.log("changefeed: ", ...args);
15
}
16
: (..._args) => {};
17
18
export class ConatChangefeed extends EventEmitter {
19
private account_id: string;
20
private query;
21
private options;
22
private state: "disconnected" | "connected" | "closed" = "disconnected";
23
private cf?: Changefeed;
24
25
constructor({
26
account_id,
27
query,
28
options,
29
}: {
30
account_id: string;
31
query;
32
options?;
33
}) {
34
super();
35
this.account_id = account_id;
36
this.query = query;
37
this.options = options;
38
}
39
40
log = (...args) => {
41
if (!LOW_LEVEL_DEBUG) return;
42
log(this.query, ...args);
43
};
44
45
connect = async () => {
46
this.log("connecting...");
47
this.cf = changefeed({
48
client: await conat(),
49
account_id: this.account_id,
50
query: this.query,
51
options: this.options,
52
});
53
const { value, done } = await this.cf.next();
54
if (done) {
55
this.log("closed before receiving any values");
56
this.close();
57
return;
58
}
59
this.log("connected");
60
this.state = "connected";
61
this.watch();
62
return value[Object.keys(value)[0]];
63
};
64
65
close = (): void => {
66
this.log("close");
67
if (this.state == "closed") {
68
return;
69
}
70
this.cf?.close();
71
delete this.cf;
72
this.state = "closed";
73
this.emit("close"); // yes "close" not "closed" ;-(
74
};
75
76
get_state = (): string => {
77
return this.state;
78
};
79
80
private watch = async () => {
81
if (this.cf == null || this.state == "closed") {
82
return;
83
}
84
try {
85
for await (const x of this.cf) {
86
// this.log("got message ", x);
87
// @ts-ignore
88
if (this.state == "closed") {
89
return;
90
}
91
this.emit("update", x);
92
}
93
} catch (err) {
94
this.log("got error", err);
95
}
96
this.log("watch ended", this.query);
97
this.close();
98
};
99
}
100
101