Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/conat/core/server.ts
1453 views
1
/*
2
3
Just try it out, start up node.js in this directory and:
4
5
s = require('@cocalc/conat/core/server').init({port:4567, getUser:()=>{return {hub_id:'hub'}}})
6
c = s.client();
7
c.watch('foo')
8
c2 = s.client();
9
c2.pub('foo', 'bar')
10
11
12
cd packages/server
13
14
15
s = await require('@cocalc/server/conat/socketio').initConatServer()
16
17
s0 = await require('@cocalc/server/conat/socketio').initConatServer({port:3000}); 0
18
19
20
For valkey clustering -- run "valkey-server" in a terminal, then:
21
22
s0 = await require('@cocalc/server/conat/socketio').initConatServer({valkey:'valkey://localhost:6379', port:3000, getUser:()=>{return {hub_id:'hub'}}})
23
24
s1 = await require('@cocalc/server/conat/socketio').initConatServer({valkey:'valkey://localhost:6379', port:3001, getUser:()=>{return {hub_id:'hub'}}})
25
26
Corresponding clients:
27
28
c0 = require('@cocalc/conat/core/client').connect('http://localhost:3000')
29
30
c1 = require('@cocalc/conat/core/client').connect('http://localhost:3001')
31
32
33
---
34
35
Or from cocalc/src
36
37
pnpm conat-server
38
39
40
WARNING/TODO: I did not yet implement anything to expire interest
41
when a server terminates!! This basically isn't needed when using
42
the cluster adapter since there's no scaling up or down happening
43
(unless a worker keeps crashing), but with valkey it would be a good idea.
44
45
*/
46
47
import type { ConnectionStats, ServerInfo } from "./types";
48
import {
49
isValidSubject,
50
isValidSubjectWithoutWildcards,
51
} from "@cocalc/conat/util";
52
import { createAdapter as createValkeyStreamsAdapter } from "@cocalc/redis-streams-adapter";
53
import { createAdapter as createValkeyPubSubAdapter } from "@socket.io/redis-adapter";
54
import Valkey from "iovalkey";
55
import { Server } from "socket.io";
56
import { callback, delay } from "awaiting";
57
import {
58
ConatError,
59
connect,
60
type Client,
61
type ClientOptions,
62
MAX_INTEREST_TIMEOUT,
63
STICKY_QUEUE_GROUP,
64
} from "./client";
65
import {
66
RESOURCE,
67
MAX_CONNECTIONS_PER_USER,
68
MAX_CONNECTIONS,
69
MAX_PAYLOAD,
70
MAX_SUBSCRIPTIONS_PER_CLIENT,
71
MAX_SUBSCRIPTIONS_PER_HUB,
72
} from "./constants";
73
import { randomId } from "@cocalc/conat/names";
74
import { Patterns } from "./patterns";
75
import ConsistentHash from "consistent-hash";
76
import { is_array } from "@cocalc/util/misc";
77
import { UsageMonitor } from "@cocalc/conat/monitor/usage";
78
import { once, until } from "@cocalc/util/async-utils";
79
import { getLogger } from "@cocalc/conat/client";
80
81
const logger = getLogger("conat:core:server");
82
83
const INTEREST_STREAM = "interest";
84
const STICKY_STREAM = "sticky";
85
86
const VALKEY_OPTIONS = { maxRetriesPerRequest: null };
87
const USE_VALKEY_PUBSUB = true;
88
89
const VALKEY_READ_COUNT = 100;
90
91
export function valkeyClient(valkey) {
92
if (typeof valkey == "string") {
93
if (valkey.startsWith("{") && valkey.endsWith("}")) {
94
return new Valkey({ ...VALKEY_OPTIONS, ...JSON.parse(valkey) });
95
} else {
96
return new Valkey(valkey, VALKEY_OPTIONS);
97
}
98
} else {
99
return new Valkey({ ...VALKEY_OPTIONS, ...valkey });
100
}
101
}
102
103
const DEBUG = false;
104
105
interface InterestUpdate {
106
op: "add" | "delete";
107
subject: string;
108
queue?: string;
109
room: string;
110
}
111
112
interface StickyUpdate {
113
pattern: string;
114
subject: string;
115
target: string;
116
}
117
118
export function init(opts: Options) {
119
return new ConatServer(opts);
120
}
121
122
export type UserFunction = (
123
socket,
124
systemAccounts?: { [cookieName: string]: { password: string; user: any } },
125
) => Promise<any>;
126
127
export type AllowFunction = (opts: {
128
type: "pub" | "sub";
129
user: any;
130
subject: string;
131
}) => Promise<boolean>;
132
133
export interface Options {
134
httpServer?;
135
port?: number;
136
id?: string;
137
path?: string;
138
getUser?: UserFunction;
139
isAllowed?: AllowFunction;
140
valkey?:
141
| string
142
| {
143
port?: number;
144
host?: string;
145
username?: string;
146
password?: string;
147
db?: number;
148
};
149
cluster?: boolean;
150
maxSubscriptionsPerClient?: number;
151
maxSubscriptionsPerHub?: number;
152
systemAccountPassword?: string;
153
// if true, use https when creating an internal client.
154
ssl?: boolean;
155
}
156
157
type State = "ready" | "closed";
158
159
export class ConatServer {
160
public readonly io;
161
public readonly id: string;
162
163
private getUser: UserFunction;
164
private isAllowed: AllowFunction;
165
readonly options: Partial<Options>;
166
private cluster?: boolean;
167
168
private sockets: { [id: string]: any } = {};
169
private disconnectingTimeout: {
170
[id: string]: ReturnType<typeof setTimeout>;
171
} = {};
172
173
private stats: { [id: string]: ConnectionStats } = {};
174
private usage: UsageMonitor;
175
private state: State = "ready";
176
177
private subscriptions: { [socketId: string]: Set<string> } = {};
178
private interest: Patterns<{ [queue: string]: Set<string> }> = new Patterns();
179
private sticky: {
180
// the target string is JSON.stringify({ id: string; subject: string }), which is the
181
// socket.io room to send the messages to.
182
[pattern: string]: { [subject: string]: string };
183
} = {};
184
185
constructor(options: Options) {
186
const {
187
httpServer,
188
port = 3000,
189
ssl = false,
190
id = randomId(),
191
path = "/conat",
192
getUser,
193
isAllowed,
194
valkey,
195
cluster,
196
maxSubscriptionsPerClient = MAX_SUBSCRIPTIONS_PER_CLIENT,
197
maxSubscriptionsPerHub = MAX_SUBSCRIPTIONS_PER_HUB,
198
systemAccountPassword,
199
} = options;
200
this.options = {
201
port,
202
ssl,
203
id,
204
path,
205
valkey,
206
maxSubscriptionsPerClient,
207
maxSubscriptionsPerHub,
208
systemAccountPassword,
209
};
210
this.cluster = cluster || !!valkey;
211
this.getUser = async (socket) => {
212
if (getUser == null) {
213
// no auth at all
214
return null;
215
} else {
216
let systemAccounts;
217
if (this.options.systemAccountPassword) {
218
systemAccounts = {
219
sys: {
220
password: this.options.systemAccountPassword,
221
user: { hub_id: "system" },
222
},
223
};
224
} else {
225
systemAccounts = undefined;
226
}
227
return await getUser(socket, systemAccounts);
228
}
229
};
230
this.isAllowed = isAllowed ?? (async () => true);
231
this.id = id;
232
this.log("Starting Conat server...", {
233
id,
234
path,
235
port: this.options.port,
236
httpServer: httpServer ? "httpServer(...)" : undefined,
237
valkey: !!valkey, // valkey has password in it so do not log
238
});
239
240
// NOTE: do NOT enable connectionStateRecovery; it seems to cause issues
241
// when restarting the server.
242
let adapter: any = undefined;
243
if (valkey) {
244
this.log("using valkey");
245
const c = valkeyClient(valkey);
246
if (USE_VALKEY_PUBSUB) {
247
this.log("using the valkey pub/sub adapter");
248
adapter = createValkeyPubSubAdapter(c, c.duplicate());
249
} else {
250
this.log("using the valkey streams adapter with low-latency config");
251
adapter = createValkeyStreamsAdapter(c, {
252
readCount: VALKEY_READ_COUNT,
253
blockTime: 1,
254
});
255
}
256
}
257
258
const socketioOptions = {
259
maxHttpBufferSize: MAX_PAYLOAD,
260
path,
261
adapter,
262
// perMessageDeflate is disabled by default in socket.io, but it
263
// seems unclear exactly *why*:
264
// https://github.com/socketio/socket.io/issues/3477#issuecomment-930503313
265
perMessageDeflate: { threshold: 1024 },
266
};
267
this.log(socketioOptions);
268
if (httpServer) {
269
this.io = new Server(httpServer, socketioOptions);
270
} else {
271
this.io = new Server(port, socketioOptions);
272
this.log(`listening on port ${port}`);
273
}
274
this.initUsage();
275
this.init();
276
if (this.options.systemAccountPassword) {
277
this.initSystemService();
278
}
279
}
280
281
private init = async () => {
282
this.io.on("connection", this.handleSocket);
283
if (this.cluster) {
284
if (this.options.valkey == null) {
285
// the cluster adapter doesn't get configured until after the constructor,
286
// so we wait a moment before configuring these.
287
await delay(1);
288
}
289
this.initInterestSubscription();
290
this.initStickySubscription();
291
}
292
};
293
294
private initUsage = () => {
295
this.usage = new UsageMonitor({
296
maxPerUser: MAX_CONNECTIONS_PER_USER,
297
max: MAX_CONNECTIONS,
298
resource: RESOURCE,
299
log: (...args) => this.log("usage", ...args),
300
});
301
};
302
303
close = async () => {
304
if (this.state == "closed") {
305
return;
306
}
307
this.state = "closed";
308
await this.io.close();
309
for (const prop of ["interest", "subscriptions", "sockets", "services"]) {
310
delete this[prop];
311
}
312
this.usage?.close();
313
this.interest?.close();
314
this.sticky = {};
315
this.subscriptions = {};
316
this.stats = {};
317
this.sockets = {};
318
};
319
320
private info = (): ServerInfo => {
321
return {
322
max_payload: MAX_PAYLOAD,
323
id: this.id,
324
};
325
};
326
327
private log = (...args) => {
328
logger.debug(this.id, ":", ...args);
329
};
330
331
private unsubscribe = async ({ socket, subject }) => {
332
if (DEBUG) {
333
this.log("unsubscribe ", { id: socket.id, subject });
334
}
335
const room = socketSubjectRoom({ socket, subject });
336
socket.leave(room);
337
await this.updateInterest({ op: "delete", subject, room });
338
};
339
340
// INTEREST
341
342
private updateInterest = async (update: InterestUpdate) => {
343
this._updateInterest(update);
344
if (!this.cluster) return;
345
// console.log(this.options.port, "cluster: publish interest change", update);
346
this.io.of("cluster").serverSideEmit(INTEREST_STREAM, "update", update);
347
};
348
349
private initInterest = async () => {
350
if (!this.cluster) return;
351
const getStateFromCluster = (cb) => {
352
this.io.of("cluster").serverSideEmit(INTEREST_STREAM, "init", cb);
353
};
354
355
await until(
356
async () => {
357
try {
358
const responses = (await callback(getStateFromCluster)).filter(
359
(state) => isNonempty(state.patterns),
360
);
361
// console.log("initInterest got", responses);
362
if (responses.length > 0) {
363
this.deserializeInterest(responses[0]);
364
return true;
365
} else {
366
// console.log(`init interest state -- waiting for other nodes...`);
367
return false;
368
}
369
} catch (err) {
370
if (!process.env.COCALC_TEST_MODE) {
371
console.log(`initInterest: WARNING -- ${err}`);
372
}
373
return false;
374
}
375
},
376
{ start: 100, decay: 1.5, max: 5000 },
377
);
378
};
379
380
private initInterestSubscription = async () => {
381
if (!this.cluster) return;
382
383
this.initInterest();
384
385
this.io.of("cluster").on(INTEREST_STREAM, (action, args) => {
386
// console.log("INTEREST_STREAM received", { action, args });
387
if (action == "update") {
388
// another server telling us about subscription interest
389
// console.log("applying interest update", args);
390
this._updateInterest(args);
391
} else if (action == "init") {
392
// console.log("another server requesting state");
393
args(this.serializableInterest());
394
}
395
});
396
};
397
398
private serializableInterest = () => {
399
const fromT = (x: { [queue: string]: Set<string> }) => {
400
const y: { [queue: string]: string[] } = {};
401
for (const queue in x) {
402
y[queue] = Array.from(x[queue]);
403
}
404
return y;
405
};
406
return this.interest.serialize(fromT);
407
};
408
409
private deserializeInterest = (state) => {
410
const interest = new Patterns<{ [queue: string]: Set<string> }>();
411
interest.deserialize(state, (x: any) => {
412
for (const key in x) {
413
x[key] = new Set<string>(x[key]);
414
}
415
return x;
416
});
417
const i = this.interest;
418
this.interest = interest;
419
this.interest.merge(i);
420
};
421
422
private _updateInterest = (update: InterestUpdate) => {
423
if (this.state != "ready") return;
424
const { op, subject, queue, room } = update;
425
const groups = this.interest.get(subject);
426
if (op == "add") {
427
if (typeof queue != "string") {
428
throw Error("queue must not be null for add");
429
}
430
if (groups === undefined) {
431
this.interest.set(subject, { [queue]: new Set([room]) });
432
} else if (groups[queue] == null) {
433
groups[queue] = new Set([room]);
434
} else {
435
groups[queue].add(room);
436
}
437
} else if (op == "delete") {
438
if (groups != null) {
439
let nonempty = false;
440
for (const queue in groups) {
441
groups[queue].delete(room);
442
if (groups[queue].size == 0) {
443
delete groups[queue];
444
} else {
445
nonempty = true;
446
}
447
}
448
if (!nonempty) {
449
// no interest anymore
450
this.interest.delete(subject);
451
delete this.sticky[subject];
452
}
453
}
454
} else {
455
throw Error(`invalid op ${op}`);
456
}
457
};
458
459
// STICKY
460
461
private initSticky = async () => {
462
if (!this.cluster) return;
463
const getStateFromCluster = (cb) => {
464
this.io.of("cluster").serverSideEmit(STICKY_STREAM, "init", cb);
465
};
466
467
await until(
468
async () => {
469
try {
470
const responses = (await callback(getStateFromCluster)).filter((x) =>
471
isNonempty(x),
472
);
473
// console.log("initSticky got", responses);
474
if (responses.length > 0) {
475
for (const response of responses) {
476
this.mergeSticky(response);
477
}
478
return true;
479
} else {
480
// console.log(`init sticky state -- waiting for other nodes...`);
481
return false;
482
}
483
} catch (err) {
484
if (!process.env.COCALC_TEST_MODE) {
485
console.log(`initInterest: WARNING -- ${err}`);
486
}
487
return false;
488
}
489
},
490
{ start: 100, decay: 1.5, max: 10000 },
491
);
492
};
493
494
private mergeSticky = (sticky: {
495
[pattern: string]: { [subject: string]: string };
496
}) => {
497
for (const pattern in sticky) {
498
this.sticky[pattern] = { ...sticky[pattern], ...this.sticky[pattern] };
499
}
500
};
501
502
private initStickySubscription = async () => {
503
if (!this.cluster) return;
504
505
this.initSticky();
506
507
this.io.of("cluster").on(STICKY_STREAM, (action, args) => {
508
// console.log("STICKY_STREAM received", { action, args });
509
if (action == "update") {
510
this._updateSticky(args);
511
} else if (action == "init") {
512
// console.log("sending stickyUpdates", this.stickyUpdates);
513
args(this.sticky);
514
}
515
});
516
};
517
518
private updateSticky = async (update: StickyUpdate) => {
519
this._updateSticky(update);
520
if (!this.cluster) return;
521
522
// console.log(this.options.port, "cluster: publish sticky update", update);
523
this.io.of("cluster").serverSideEmit(STICKY_STREAM, "update", update);
524
};
525
526
private _updateSticky = (update: StickyUpdate) => {
527
const { pattern, subject, target } = update;
528
if (this.sticky[pattern] === undefined) {
529
this.sticky[pattern] = {};
530
}
531
this.sticky[pattern][subject] = target;
532
};
533
534
private getStickyTarget = ({ pattern, subject }) => {
535
return this.sticky[pattern]?.[subject];
536
};
537
538
//
539
540
private subscribe = async ({ socket, subject, queue, user }) => {
541
if (DEBUG) {
542
this.log("subscribe ", { id: socket.id, subject, queue });
543
}
544
if (typeof queue != "string") {
545
throw Error("queue must be defined");
546
}
547
if (!isValidSubject(subject)) {
548
throw Error("invalid subject");
549
return;
550
}
551
if (!(await this.isAllowed({ user, subject, type: "sub" }))) {
552
const message = `permission denied subscribing to '${subject}' from ${JSON.stringify(user)}`;
553
this.log(message);
554
throw new ConatError(message, {
555
code: 403,
556
});
557
}
558
let maxSubs;
559
if (user?.hub_id) {
560
maxSubs =
561
this.options.maxSubscriptionsPerHub ?? MAX_SUBSCRIPTIONS_PER_HUB;
562
} else {
563
maxSubs =
564
this.options.maxSubscriptionsPerClient ?? MAX_SUBSCRIPTIONS_PER_CLIENT;
565
}
566
if (maxSubs) {
567
const numSubs = this.subscriptions?.[socket.id]?.size ?? 0;
568
if (numSubs >= maxSubs) {
569
// error 429 == "too many requests"
570
throw new ConatError(
571
`there is a limit of at most ${maxSubs} subscriptions and you currently have ${numSubs} subscriptions -- subscription to '${subject}' denied`,
572
{ code: 429 },
573
);
574
}
575
}
576
const room = socketSubjectRoom({ socket, subject });
577
// critical to await socket.join so we don't advertise that there is
578
// a subscriber before the socket is actually getting messages.
579
await socket.join(room);
580
await this.updateInterest({ op: "add", subject, room, queue });
581
};
582
583
private publish = async ({ subject, data, from }): Promise<number> => {
584
if (!isValidSubjectWithoutWildcards(subject)) {
585
throw Error("invalid subject");
586
}
587
if (!(await this.isAllowed({ user: from, subject, type: "pub" }))) {
588
const message = `permission denied publishing to '${subject}' from ${JSON.stringify(from)}`;
589
this.log(message);
590
throw new ConatError(message, {
591
// this is the http code for permission denied, and having this
592
// set is assumed elsewhere in our code, so don't mess with it!
593
code: 403,
594
});
595
}
596
let count = 0;
597
for (const pattern of this.interest.matches(subject)) {
598
const g = this.interest.get(pattern)!;
599
if (DEBUG) {
600
this.log("publishing", { subject, data, g });
601
}
602
// send to exactly one in each queue group
603
for (const queue in g) {
604
const target = this.loadBalance({
605
pattern,
606
subject,
607
queue,
608
targets: g[queue],
609
});
610
if (target !== undefined) {
611
this.io.to(target).emit(pattern, { subject, data });
612
count += 1;
613
}
614
}
615
}
616
return count;
617
};
618
619
private loadBalance = ({
620
pattern,
621
subject,
622
queue,
623
targets,
624
}: {
625
pattern: string;
626
subject: string;
627
queue: string;
628
targets: Set<string>;
629
}): string | undefined => {
630
if (targets.size == 0) {
631
return undefined;
632
}
633
if (queue == STICKY_QUEUE_GROUP) {
634
const v = subject.split(".");
635
subject = v.slice(0, v.length - 1).join(".");
636
const currentTarget = this.getStickyTarget({ pattern, subject });
637
if (currentTarget === undefined || !targets.has(currentTarget)) {
638
// we use consistent hashing instead of random to make the choice, because if
639
// choice is being made by two different socketio servers at the same time,
640
// and they make different choices, it would be (temporarily) bad since a
641
// couple messages could get routed inconsistently (valkey sync would quickly
642
// resolve this). It's actually very highly likely to have such parallel choices
643
// happening in cocalc, since when a file is opened a persistent stream is opened
644
// in the browser and the project at the exact same time, and those are likely
645
// to be connected to different socketio servers. By using consistent hashing,
646
// all conflicts are avoided except for a few moments when the actual targets
647
// (e.g., the persist servers) are themselves changing, which should be something
648
// that only happens for a moment every few days.
649
const target = consistentChoice(targets, subject);
650
this.updateSticky({ pattern, subject, target });
651
return target;
652
}
653
return currentTarget;
654
} else {
655
return randomChoice(targets);
656
}
657
};
658
659
private handleSocket = async (socket) => {
660
this.sockets[socket.id] = socket;
661
socket.once("closed", () => {
662
this.log("connection closed", socket.id);
663
delete this.sockets[socket.id];
664
delete this.stats[socket.id];
665
});
666
667
this.stats[socket.id] = {
668
send: { messages: 0, bytes: 0 },
669
recv: { messages: 0, bytes: 0 },
670
subs: 0,
671
connected: Date.now(),
672
address: getAddress(socket),
673
};
674
let user: any = null;
675
let added = false;
676
try {
677
user = await this.getUser(socket);
678
this.usage.add(user);
679
added = true;
680
} catch (err) {
681
// getUser is supposed to throw an error if authentication fails
682
// for any reason
683
// Also, if the connection limit is hit they still connect, but as
684
// the error user who can't do anything (hence not waste resources).
685
user = { error: `${err}`, code: err.code };
686
}
687
this.stats[socket.id].user = user;
688
const id = socket.id;
689
this.log("new connection", { id, user });
690
if (this.disconnectingTimeout[id]) {
691
this.log("clearing disconnectingTimeout - ", { id, user });
692
clearTimeout(this.disconnectingTimeout[id]);
693
delete this.disconnectingTimeout[id];
694
}
695
if (this.subscriptions[id] == null) {
696
this.subscriptions[id] = new Set<string>();
697
}
698
699
socket.emit("info", { ...this.info(), user });
700
701
socket.on("stats", ({ recv0 }) => {
702
const s = this.stats[socket.id];
703
if (s == null) return;
704
s.recv = recv0;
705
});
706
707
socket.on(
708
"wait-for-interest",
709
async ({ subject, timeout = MAX_INTEREST_TIMEOUT }, respond) => {
710
if (respond == null) {
711
return;
712
}
713
if (!isValidSubjectWithoutWildcards(subject)) {
714
respond({ error: "invalid subject" });
715
return;
716
}
717
if (!(await this.isAllowed({ user, subject, type: "pub" }))) {
718
const message = `permission denied waiting for interest in '${subject}' from ${JSON.stringify(user)}`;
719
this.log(message);
720
respond({ error: message, code: 403 });
721
}
722
const matches = this.interest.matches(subject);
723
if (matches.length > 0 || !timeout) {
724
// NOTE: we never return the actual matches, since this is a potential security vulnerability.
725
// it could make it very easy to figure out private inboxes, etc.
726
respond(matches.length > 0);
727
}
728
if (timeout > MAX_INTEREST_TIMEOUT) {
729
timeout = MAX_INTEREST_TIMEOUT;
730
}
731
const start = Date.now();
732
while (this.state != "closed" && this.sockets[socket.id]) {
733
if (Date.now() - start >= timeout) {
734
respond({ error: "timeout" });
735
return;
736
}
737
await once(this.interest, "change");
738
if ((this.state as any) == "closed" || !this.sockets[socket.id]) {
739
return;
740
}
741
const matches = this.interest.matches(subject);
742
if (matches.length > 0) {
743
respond(true);
744
return;
745
}
746
}
747
},
748
);
749
750
socket.on("publish", async ([subject, ...data], respond) => {
751
if (data?.[2]) {
752
// done
753
this.stats[socket.id].send.messages += 1;
754
}
755
this.stats[socket.id].send.bytes += data[4]?.length ?? 0;
756
this.stats[socket.id].active = Date.now();
757
// this.log(JSON.stringify(this.stats));
758
759
try {
760
const count = await this.publish({ subject, data, from: user });
761
respond?.({ count });
762
} catch (err) {
763
if (err.code == 403) {
764
socket.emit("permission", {
765
message: err.message,
766
subject,
767
type: "pub",
768
});
769
}
770
respond?.({ error: `${err}`, code: err.code });
771
}
772
});
773
774
const subscribe = async ({ subject, queue }) => {
775
try {
776
if (this.subscriptions[id].has(subject)) {
777
return { status: "already-added" };
778
}
779
await this.subscribe({ socket, subject, queue, user });
780
this.subscriptions[id].add(subject);
781
this.stats[socket.id].subs += 1;
782
this.stats[socket.id].active = Date.now();
783
return { status: "added" };
784
} catch (err) {
785
if (err.code == 403) {
786
socket.emit("permission", {
787
message: err.message,
788
subject,
789
type: "sub",
790
});
791
}
792
return { error: `${err}`, code: err.code };
793
}
794
};
795
796
socket.on(
797
"subscribe",
798
async (x: { subject; queue } | { subject; queue }[], respond) => {
799
let r;
800
if (is_array(x)) {
801
const v: any[] = [];
802
for (const y of x) {
803
v.push(await subscribe(y));
804
}
805
r = v;
806
} else {
807
r = await subscribe(x);
808
}
809
respond?.(r);
810
},
811
);
812
813
socket.on("subscriptions", (_, respond) => {
814
if (respond == null) {
815
return;
816
}
817
respond(Array.from(this.subscriptions[id]));
818
});
819
820
const unsubscribe = ({ subject }: { subject: string }) => {
821
if (!this.subscriptions[id].has(subject)) {
822
return;
823
}
824
this.unsubscribe({ socket, subject });
825
this.subscriptions[id].delete(subject);
826
this.stats[socket.id].subs -= 1;
827
this.stats[socket.id].active = Date.now();
828
};
829
830
socket.on(
831
"unsubscribe",
832
(x: { subject: string } | { subject: string }[], respond) => {
833
let r;
834
if (is_array(x)) {
835
r = x.map(unsubscribe);
836
} else {
837
r = unsubscribe(x);
838
}
839
respond?.(r);
840
},
841
);
842
843
socket.on("disconnecting", async () => {
844
this.log("disconnecting", { id, user });
845
delete this.stats[socket.id];
846
if (added) {
847
this.usage.delete(user);
848
}
849
const rooms = Array.from(socket.rooms) as string[];
850
for (const room of rooms) {
851
const subject = getSubjectFromRoom(room);
852
this.unsubscribe({ socket, subject });
853
}
854
delete this.subscriptions[id];
855
});
856
};
857
858
// create new client in the same process connected to this server.
859
// This is useful for unit testing and is not cached by default (i.e., multiple
860
// calls return distinct clients).
861
private address = () => {
862
const port = this.options.port;
863
const path = this.options.path?.slice(0, -"/conat".length) ?? "";
864
return `http${this.options.ssl || port == 443 ? "s" : ""}://localhost:${port}${path}`;
865
};
866
867
client = (options?: ClientOptions): Client => {
868
const address = this.address();
869
this.log("client: connecting to - ", { address });
870
return connect({
871
address,
872
noCache: true,
873
...options,
874
});
875
};
876
877
initSystemService = async () => {
878
if (!this.options.systemAccountPassword) {
879
throw Error("system service requires system account");
880
}
881
this.log("starting service listening on sys...");
882
const client = this.client({
883
extraHeaders: { Cookie: `sys=${this.options.systemAccountPassword}` },
884
});
885
try {
886
await client.service(
887
"sys.conat.server",
888
{
889
stats: () => {
890
return { [this.id]: this.stats };
891
},
892
usage: () => {
893
return { [this.id]: this.usage.stats() };
894
},
895
// user has to explicitly refresh there browser after
896
// being disconnected this way
897
disconnect: (ids: string | string[]) => {
898
if (typeof ids == "string") {
899
ids = [ids];
900
}
901
for (const id of ids) {
902
this.io.in(id).disconnectSockets();
903
}
904
},
905
},
906
{ queue: this.id },
907
);
908
this.log(`successfully started sys.conat.server service`);
909
} catch (err) {
910
this.log(`WARNING: unable to start sys.conat.server service -- ${err}`);
911
}
912
};
913
}
914
915
function getSubjectFromRoom(room: string) {
916
if (room.startsWith("{")) {
917
return JSON.parse(room).subject;
918
} else {
919
return room;
920
}
921
}
922
923
function socketSubjectRoom({ socket, subject }) {
924
return JSON.stringify({ id: socket.id, subject });
925
}
926
927
export function randomChoice(v: Set<string>): string {
928
if (v.size == 0) {
929
throw Error("v must have size at least 1");
930
}
931
if (v.size == 1) {
932
for (const x of v) {
933
return x;
934
}
935
}
936
const w = Array.from(v);
937
const i = Math.floor(Math.random() * w.length);
938
return w[i];
939
}
940
941
export function consistentChoice(v: Set<string>, resource: string): string {
942
if (v.size == 0) {
943
throw Error("v must have size at least 1");
944
}
945
if (v.size == 1) {
946
for (const x of v) {
947
return x;
948
}
949
}
950
const hr = new ConsistentHash();
951
const w = Array.from(v);
952
w.sort();
953
for (const x of w) {
954
hr.add(x);
955
}
956
return hr.get(resource);
957
}
958
959
// See https://socket.io/how-to/get-the-ip-address-of-the-client
960
function getAddress(socket) {
961
const header = socket.handshake.headers["forwarded"];
962
if (header) {
963
for (const directive of header.split(",")[0].split(";")) {
964
if (directive.startsWith("for=")) {
965
return directive.substring(4);
966
}
967
}
968
}
969
970
let addr = socket.handshake.headers["x-forwarded-for"]?.split(",")?.[0];
971
if (addr) {
972
return addr;
973
}
974
for (const other of ["cf-connecting-ip", "fastly-client-ip"]) {
975
addr = socket.handshake.headers[other];
976
if (addr) {
977
return addr;
978
}
979
}
980
981
return socket.handshake.address;
982
}
983
984
function isNonempty(obj) {
985
for (const _ in obj) {
986
return true;
987
}
988
return false;
989
}
990
991