Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/sync/table/changefeed.ts
1447 views
1
/*
2
* This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.
3
* License: MS-RSL – see LICENSE.md for details
4
*/
5
6
import { EventEmitter } from "events";
7
import { callback, delay } from "awaiting";
8
import { close } from "@cocalc/util/misc";
9
10
export type State = "closed" | "disconnected" | "connecting" | "connected";
11
12
export class Changefeed extends EventEmitter {
13
private query: any;
14
private do_query: Function;
15
private query_cancel: Function;
16
private state: State = "disconnected";
17
private table: string;
18
private id: string;
19
private options: any;
20
private handle_update_queue: { err?: any; resp?: any }[] = [];
21
22
constructor({
23
do_query,
24
query_cancel,
25
options,
26
query,
27
table,
28
}: {
29
do_query: Function;
30
query_cancel: Function;
31
options: any;
32
table: string;
33
query: any;
34
}) {
35
super();
36
this.do_query = do_query;
37
this.query_cancel = query_cancel;
38
this.query = query;
39
this.options = options;
40
this.table = table;
41
}
42
43
// Query for state of the table, connects to the
44
// changefeed, and return the initial state
45
// of the table. Throws an exception if anything
46
// goes wrong.
47
connect = async () => {
48
if (this.state != "disconnected") {
49
throw Error(
50
`can only connect if state is 'disconnected' but it is ${this.state}`,
51
);
52
}
53
this.state = "connecting";
54
const resp = await callback(this.run_the_query.bind(this));
55
if (this.state === ("closed" as State)) {
56
throw Error("after running query, changefeed state is 'closed'");
57
}
58
if (resp.event === "query_cancel") {
59
throw Error("query-cancel");
60
}
61
if (resp.query == null || resp.query[this.table] == null) {
62
throw Error(`${this.table} changefeed init -- no error and no data`);
63
}
64
// Successfully completed query
65
this.id = resp.id;
66
this.state = "connected";
67
this.process_queue_next_tick();
68
return resp.query[this.table];
69
};
70
71
close = (): void => {
72
this.state = "closed";
73
if (this.id != null) {
74
// stop listening for future updates
75
this.cancel_query(this.id);
76
}
77
this.emit("close");
78
this.removeAllListeners();
79
close(this);
80
this.state = "closed";
81
};
82
83
get_state = (): string => {
84
return this.state;
85
};
86
87
// Wait a tick, then process the queue of messages that
88
// arrived during initialization.
89
private process_queue_next_tick = async () => {
90
await delay(0);
91
while (this.state != "closed" && this.handle_update_queue.length > 0) {
92
const x = this.handle_update_queue.shift();
93
if (x != null) {
94
this.handle_update(x.err, x.resp);
95
}
96
}
97
};
98
99
private run_the_query = (cb: Function): void => {
100
// This query_function gets called first on the
101
// initial query, then repeatedly with each changefeed
102
// update. The input function "cb" will be called
103
// precisely once, and the method handle_changefeed_update
104
// may get called if there are additional
105
// changefeed updates.
106
let first_time: boolean = true;
107
this.do_query({
108
query: this.query,
109
changes: true,
110
options: this.options,
111
cb: (err, resp) => {
112
if (first_time) {
113
cb(err, resp);
114
first_time = false;
115
} else {
116
this.handle_update(err, resp);
117
}
118
},
119
});
120
};
121
122
private handle_update = (err, resp): void => {
123
if (this.state != "connected") {
124
if (this.state == "closed") {
125
// expected, since last updates after query cancel may get through...
126
return;
127
}
128
// This can and does happen when updates appear immediately
129
// after the first initial state is set (in run_the_query).
130
this.handle_update_queue.push({ err, resp });
131
return;
132
}
133
if (resp == null && err == null) {
134
err = "resp must not be null for non-error";
135
}
136
if (err || resp.event === "query_cancel") {
137
//if (err) console.warn("closing changefeed due to err", err);
138
this.close();
139
return;
140
}
141
if (resp.action == null) {
142
// Not a changefeed message. This happens, e.g., the first time
143
// when we use the standby server to get the changefeed.
144
return;
145
}
146
// Return just the new_val/old_val/action part of resp.
147
// console.log("resp=", resp);
148
const x: { new_val?: any; old_val?: any; action?: string } = {};
149
if (resp.new_val) {
150
x.new_val = resp.new_val;
151
}
152
if (resp.old_val) {
153
x.old_val = resp.old_val;
154
}
155
x.action = resp.action;
156
this.emit("update", x);
157
};
158
private cancel_query = async (id: string) => {
159
try {
160
await this.query_cancel(id);
161
} catch (err) {
162
// ignore error, which might be due to disconnecting and isn't a big deal.
163
// Basically anything that could cause an error would have also
164
// canceled the changefeed anyways.
165
}
166
};
167
}
168
169
//
170
171