Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/conat/hub/changefeeds/client.ts
1453 views
1
import { type Client } from "@cocalc/conat/core/client";
2
import { EventIterator } from "@cocalc/util/event-iterator";
3
import { getLogger } from "@cocalc/conat/client";
4
import { SERVICE, CLIENT_KEEPALIVE, KEEPALIVE_TIMEOUT } from "./util";
5
import { ConatError } from "@cocalc/conat/core/client";
6
7
const logger = getLogger("hub:changefeeds:client");
8
9
type Update = any;
10
11
function changefeedSubject({ account_id }: { account_id: string }) {
12
return `${SERVICE}.account-${account_id}`;
13
}
14
15
export type Changefeed = EventIterator<{ error?: string; update: Update }>;
16
17
export function changefeed({
18
query,
19
options,
20
client,
21
account_id,
22
}: {
23
query: object;
24
options?: object[];
25
client: Client;
26
account_id: string;
27
}) {
28
const table = Object.keys(query)[0];
29
const socket = client.socket.connect(changefeedSubject({ account_id }), {
30
reconnection: false,
31
keepAlive: CLIENT_KEEPALIVE,
32
keepAliveTimeout: KEEPALIVE_TIMEOUT,
33
desc: `postgresql-changefeed-${table}`,
34
});
35
logger.debug("creating changefeed", { table, options });
36
socket.write({ query, options });
37
const cf = new EventIterator<{ error?: string; update: Update }>(
38
socket,
39
"data",
40
{
41
map: (args) => {
42
const { error, code, update } = args[0] ?? {};
43
if (error) {
44
// console.log("changefeed: error returned from server, query");
45
throw new ConatError(error, { code });
46
} else {
47
return update;
48
}
49
},
50
onEnd: () => {
51
// console.log("changefeed: onEnd", query);
52
socket.close();
53
},
54
},
55
);
56
socket.on("closed", () => {
57
cf.throw(Error("closed"));
58
});
59
socket.on("disconnected", () => {
60
// console.log("changefeed: disconnected", query, socket.subject, socket.id);
61
cf.throw(Error("disconnected"));
62
});
63
return cf;
64
}
65
66