Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/conat/hub/changefeeds/server.ts
1453 views
1
import { type Client, type ConatSocketServer } from "@cocalc/conat/core/client";
2
import { uuid } from "@cocalc/util/misc";
3
import { UsageMonitor } from "@cocalc/conat/monitor/usage";
4
import { getLogger } from "@cocalc/conat/client";
5
import { isValidUUID } from "@cocalc/util/misc";
6
import {
7
SUBJECT,
8
MAX_PER_ACCOUNT,
9
MAX_GLOBAL,
10
SERVER_KEEPALIVE,
11
KEEPALIVE_TIMEOUT,
12
RESOURCE,
13
} from "./util";
14
export { type ConatSocketServer };
15
16
const logger = getLogger("hub:changefeeds:server");
17
18
export function changefeedServer({
19
client,
20
userQuery,
21
cancelQuery,
22
}: {
23
client: Client;
24
25
userQuery: (opts: {
26
query: object;
27
options?: object[];
28
account_id: string;
29
changes: string;
30
cb: Function;
31
}) => void;
32
33
cancelQuery: (uuid: string) => void;
34
}): ConatSocketServer {
35
logger.debug("creating changefeed server");
36
37
const usage = new UsageMonitor({
38
maxPerUser: MAX_PER_ACCOUNT,
39
max: MAX_GLOBAL,
40
resource: RESOURCE,
41
log: (...args) => {
42
logger.debug(RESOURCE, ...args);
43
},
44
});
45
46
const server = client.socket.listen(SUBJECT, {
47
keepAlive: SERVER_KEEPALIVE,
48
keepAliveTimeout: KEEPALIVE_TIMEOUT,
49
});
50
51
server.on("connection", (socket) => {
52
const v = socket.subject.split(".")[1];
53
if (!v?.startsWith("account-")) {
54
socket.write({ error: "only account users can create changefeeds" });
55
logger.debug(
56
"socket.close: due to changefeed request from non-account subject",
57
socket.subject,
58
);
59
socket.close();
60
return;
61
}
62
const account_id = v.slice("account-".length);
63
if (!isValidUUID(account_id)) {
64
logger.debug(
65
"socket.close: due to invalid uuid",
66
socket.subject,
67
account_id,
68
);
69
socket.write({
70
error: `invalid account_id -- '${account_id}', subject=${socket.subject}`,
71
});
72
socket.close();
73
return;
74
}
75
let added = false;
76
try {
77
usage.add(account_id);
78
added = true;
79
} catch (err) {
80
socket.write({ error: `${err}`, code: err.code });
81
logger.debug(
82
"socket.close: due to usage error (limit exceeded?)",
83
socket.subject,
84
err,
85
);
86
socket.close();
87
return;
88
}
89
90
const changes = uuid();
91
92
socket.on("closed", () => {
93
logger.debug(
94
"socket.close: cleaning up since socket closed for some external reason (timeout?)",
95
socket.subject,
96
);
97
if (added) {
98
usage.delete(account_id);
99
}
100
cancelQuery(changes);
101
});
102
103
let running = false;
104
socket.on("data", (data) => {
105
if (running) {
106
socket.write({ error: "exactly one query per connection" });
107
logger.debug(
108
"socket.close: due to attempt to run more than one query",
109
socket.subject,
110
);
111
socket.close();
112
return;
113
}
114
running = true;
115
const { query, options } = data;
116
try {
117
userQuery({
118
query,
119
options,
120
changes,
121
account_id,
122
cb: (error, update) => {
123
// logger.debug("got: ", { error, update });
124
try {
125
socket.write({ error, update });
126
} catch (err) {
127
// happens if buffer is full or socket is closed. in both cases, might was well
128
// just close the socket.
129
error = `${err}`;
130
}
131
if (error) {
132
logger.debug(
133
"socket.close: due to error from postgres changefeed",
134
socket.subject,
135
error,
136
);
137
socket.close();
138
}
139
},
140
});
141
} catch (err) {
142
logger.debug(
143
"socket.close: due to error creating query",
144
socket.subject,
145
err,
146
);
147
try {
148
socket.write({ error: `${err}` });
149
} catch {}
150
socket.close();
151
}
152
});
153
});
154
server.on("closed", () => {
155
logger.debug("shutting down changefeed server");
156
usage.close();
157
});
158
159
return server;
160
}
161
162