Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/conat/persist/server.ts
1452 views
1
/*
2
CONAT_SERVER=http://localhost:3000 node
3
4
// making a server from scratch
5
6
// initialize persist context
7
8
require('@cocalc/backend/conat/persist');
9
10
// a conat server and client
11
s = require('@cocalc/conat/core/server').init({port:4567, getUser:()=>{return {hub_id:'hub'}}}); client = s.client();
12
13
// persist server
14
p = require('@cocalc/conat/persist/server').server({client}); 0;
15
16
17
18
// a client for persist server
19
20
c = require('@cocalc/conat/persist/client').stream({client, user:{hub_id:'hub'}, storage:{path:'b.txt'}});
21
22
for await (x of await c.getAll()) { console.log(x) }
23
24
25
await c.set({messageData:client.message(123)})
26
27
for await (x of await c.getAll()) { console.log(x) }
28
29
[ { seq: 1, time: 1750218209211, encoding: 0, raw: <Buffer 7b> } ]
30
31
(await c.get({seq:5})).data
32
33
await c.set({key:'foo', messageData:client.message('bar')})
34
(await c.get({key:'foo'})).data
35
36
await c.delete({seq:6})
37
38
39
client = await require('@cocalc/backend/conat').conat(); kv = require('@cocalc/backend/conat/sync').akv({project_id:'3fa218e5-7196-4020-8b30-e2127847cc4f', name:'a.txt', client})
40
41
client = await require('@cocalc/backend/conat').conat(); s = require('@cocalc/backend/conat/sync').astream({project_id:'3fa218e5-7196-4020-8b30-e2127847cc4f', name:'b.txt', client})
42
43
client = await require('@cocalc/backend/conat').conat(); s = await require('@cocalc/backend/conat/sync').dstream({project_id:'3fa218e5-7196-4020-8b30-e2127847cc4f', name:'ds2.txt', client})
44
45
46
client = await require('@cocalc/backend/conat').conat(); kv = require('@cocalc/backend/conat/sync').akv({project_id:'3fa218e5-7196-4020-8b30-e2127847cc4f', name:'a.txt', client})
47
48
49
client = await require('@cocalc/backend/conat').conat(); kv = await require('@cocalc/backend/conat/sync').dkv({project_id:'3fa218e5-7196-4020-8b30-e2127847cc4f', name:'a1', client})
50
51
52
client = await require('@cocalc/backend/conat').conat(); s = await require('@cocalc/conat/sync/core-stream').cstream({name:'d.txt',client})
53
54
55
*/
56
57
import { type Client, ConatError } from "@cocalc/conat/core/client";
58
import {
59
type ConatSocketServer,
60
type ServerSocket,
61
} from "@cocalc/conat/socket";
62
import { getLogger } from "@cocalc/conat/client";
63
import type {
64
StoredMessage,
65
PersistentStream,
66
StorageOptions,
67
} from "./storage";
68
import { getStream, SERVICE, MAX_PER_USER, MAX_GLOBAL, RESOURCE } from "./util";
69
import { throttle } from "lodash";
70
import { type SetOptions } from "./client";
71
import { once } from "@cocalc/util/async-utils";
72
import { UsageMonitor } from "@cocalc/conat/monitor/usage";
73
74
const logger = getLogger("persist:server");
75
76
// When sending a large number of message for
77
// getAll or change updates, we combine together messages
78
// until hitting this size, then send them all at once.
79
// This bound is to avoid potentially using a huge amount of RAM
80
// when streaming a large saved database to the client.
81
// Note: if a single message is larger than this, it still
82
// gets sent, just individually.
83
const DEFAULT_MESSAGES_THRESH = 20 * 1e6;
84
//const DEFAULT_MESSAGES_THRESH = 1e5;
85
86
// I added an experimental way to run any sqlite query... but it is disabled
87
// since of course there are major DOS and security concerns.
88
const ENABLE_SQLITE_GENERAL_QUERIES = false;
89
90
const SEND_THROTTLE = 30;
91
92
export function server({
93
client,
94
messagesThresh = DEFAULT_MESSAGES_THRESH,
95
}: {
96
client: Client;
97
messagesThresh?: number;
98
}) {
99
logger.debug("server: creating...");
100
if (client == null) {
101
throw Error("client must be specified");
102
}
103
const subject = `${SERVICE}.*`;
104
const server: ConatSocketServer = client.socket.listen(subject);
105
logger.debug("server: listening on ", { subject });
106
const usage = new UsageMonitor({
107
maxPerUser: MAX_PER_USER,
108
max: MAX_GLOBAL,
109
resource: RESOURCE,
110
log: (...args) => {
111
logger.debug(RESOURCE, ...args);
112
},
113
});
114
server.on("close", () => {
115
usage.close();
116
});
117
118
server.on("connection", (socket: ServerSocket) => {
119
logger.debug("server: got new connection", {
120
id: socket.id,
121
subject: socket.subject,
122
});
123
let error = "";
124
let errorCode: any = undefined;
125
let changefeed = false;
126
let storage: undefined | StorageOptions = undefined;
127
let stream: undefined | PersistentStream = undefined;
128
let user = "";
129
let added = false;
130
socket.on("data", async (data) => {
131
// logger.debug("server: got data ", data);
132
if (stream == null) {
133
storage = data.storage;
134
changefeed = data.changefeed;
135
try {
136
user = socket.subject.split(".")[1];
137
usage.add(user);
138
added = true;
139
stream = await getStream({
140
subject: socket.subject,
141
storage,
142
});
143
if (changefeed) {
144
startChangefeed({ socket, stream, messagesThresh });
145
}
146
socket.emit("stream-initialized");
147
} catch (err) {
148
error = `${err}`;
149
errorCode = err.code;
150
socket.write(null, { headers: { error, code: errorCode } });
151
}
152
}
153
});
154
socket.on("closed", () => {
155
logger.debug("socket closed", socket.subject);
156
storage = undefined;
157
stream?.close();
158
stream = undefined;
159
if (added) {
160
usage.delete(user);
161
}
162
});
163
164
socket.on("request", async (mesg) => {
165
const request = mesg.headers;
166
// logger.debug("got request", request);
167
168
try {
169
if (error) {
170
throw new ConatError(error, { code: errorCode });
171
}
172
if (stream == null) {
173
await once(socket, "stream-initialized", request.timeout ?? 30000);
174
}
175
if (stream == null) {
176
throw Error("bug");
177
}
178
if (request.cmd == "set") {
179
mesg.respondSync(
180
stream.set({
181
key: request.key,
182
previousSeq: request.previousSeq,
183
raw: mesg.raw,
184
ttl: request.ttl,
185
encoding: mesg.encoding,
186
headers: request.headers,
187
msgID: request.msgID,
188
}),
189
);
190
} else if (request.cmd == "setMany") {
191
// just like set except the main data of the mesg
192
// has an array of set operations
193
const resp: (
194
| { seq: number; time: number }
195
| { error: string; code?: any }
196
)[] = [];
197
for (const {
198
key,
199
previousSeq,
200
ttl,
201
msgID,
202
messageData,
203
} of mesg.data as SetOptions[]) {
204
try {
205
resp.push(
206
stream.set({
207
key,
208
previousSeq,
209
ttl,
210
headers: messageData.headers,
211
msgID,
212
raw: messageData.raw,
213
encoding: messageData.encoding,
214
}),
215
);
216
} catch (err) {
217
resp.push({ error: `${err}`, code: err.code });
218
}
219
}
220
mesg.respondSync(resp);
221
} else if (request.cmd == "delete") {
222
mesg.respondSync(stream.delete(request));
223
} else if (request.cmd == "config") {
224
mesg.respondSync(stream.config(request.config));
225
} else if (request.cmd == "inventory") {
226
mesg.respondSync(stream.inventory());
227
} else if (request.cmd == "get") {
228
const resp = stream.get({ key: request.key, seq: request.seq });
229
//console.log("got resp = ", resp);
230
if (resp == null) {
231
mesg.respondSync(null);
232
} else {
233
const { raw, encoding, headers, seq, time, key } = resp;
234
mesg.respondSync(null, {
235
raw,
236
encoding,
237
headers: { ...headers, seq, time, key },
238
});
239
}
240
} else if (request.cmd == "keys") {
241
const resp = stream.keys();
242
mesg.respondSync(resp);
243
} else if (request.cmd == "sqlite") {
244
if (!ENABLE_SQLITE_GENERAL_QUERIES) {
245
throw Error("sqlite command not currently supported");
246
}
247
const resp = stream.sqlite(request.statement, request.params);
248
mesg.respondSync(resp);
249
} else if (request.cmd == "serverId") {
250
mesg.respondSync(server.id);
251
} else if (request.cmd == "getAll") {
252
logger.debug("getAll", { subject: socket.subject, request });
253
// getAll uses requestMany which responds with all matching messages,
254
// so no call to mesg.respond here.
255
getAll({ stream, mesg, request, messagesThresh });
256
} else if (request.cmd == "changefeed") {
257
logger.debug("changefeed", changefeed);
258
if (!changefeed) {
259
changefeed = true;
260
startChangefeed({ socket, stream, messagesThresh });
261
}
262
mesg.respondSync("created");
263
} else {
264
mesg.respondSync(null, {
265
headers: { error: `unknown command ${request.cmd}`, code: 404 },
266
});
267
}
268
} catch (err) {
269
mesg.respondSync(null, {
270
headers: { error: `${err}`, code: err.code },
271
});
272
}
273
});
274
});
275
276
return server;
277
}
278
279
async function getAll({ stream, mesg, request, messagesThresh }) {
280
let seq = 0;
281
const respond = (error?, messages?: StoredMessage[]) => {
282
mesg.respondSync(messages, { headers: { error, seq, code: error?.code } });
283
seq += 1;
284
};
285
286
try {
287
const messages: StoredMessage[] = [];
288
let size = 0;
289
for (const message of stream.getAll({
290
start_seq: request.start_seq,
291
end_seq: request.end_seq,
292
})) {
293
messages.push(message);
294
size += message.raw.length;
295
if (size >= messagesThresh) {
296
respond(undefined, messages);
297
messages.length = 0;
298
size = 0;
299
}
300
}
301
302
if (messages.length > 0) {
303
respond(undefined, messages);
304
}
305
// successful finish
306
respond();
307
} catch (err) {
308
respond(`${err}`);
309
}
310
}
311
312
function startChangefeed({ socket, stream, messagesThresh }) {
313
logger.debug("startChangefeed", { subject: socket.subject });
314
// this seq here has nothing to do with the seq of the StoredMessage!
315
let seq = 0;
316
const respond = (error?, messages?: StoredMessage[]) => {
317
if (socket.state == "closed") {
318
return;
319
}
320
//logger.debug("changefeed: writing messages to socket", { seq, messages });
321
socket.write(messages, { headers: { error, seq } });
322
seq += 1;
323
};
324
325
const unsentMessages: StoredMessage[] = [];
326
const sendAllUnsentMessages = throttle(
327
() => {
328
while (socket.state != "closed" && unsentMessages.length > 0) {
329
const messages: StoredMessage[] = [];
330
let size = 0;
331
while (unsentMessages.length > 0 && socket.state != "closed") {
332
const message = unsentMessages.shift();
333
// e.g. op:'delete' messages have length 0 and no raw field
334
size += message?.raw?.length ?? 0;
335
messages.push(message!);
336
if (size >= messagesThresh) {
337
respond(undefined, messages);
338
size = 0;
339
messages.length = 0;
340
}
341
}
342
if (messages.length > 0) {
343
respond(undefined, messages);
344
}
345
}
346
},
347
SEND_THROTTLE,
348
{ leading: true, trailing: true },
349
);
350
351
stream.on("change", (message) => {
352
if (socket.state == "closed") {
353
return;
354
}
355
//console.log("stream change event", message);
356
// logger.debug("changefeed got message", message, socket.state);
357
unsentMessages.push(message);
358
sendAllUnsentMessages();
359
});
360
}
361
362