Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/backend/conat/test/socket/basic.test.ts
1451 views
1
/*
2
3
pnpm test `pwd`/basic.test.ts
4
5
*/
6
7
import {
8
before,
9
after,
10
connect,
11
wait,
12
setDefaultTimeouts,
13
} from "@cocalc/backend/conat/test/setup";
14
import { once } from "@cocalc/util/async-utils";
15
import { delay } from "awaiting";
16
17
beforeAll(async () => {
18
await before();
19
setDefaultTimeouts({ request: 750, publish: 750 });
20
});
21
22
describe("create a server and client, then send a message and get a response", () => {
23
let client,
24
server,
25
cn1,
26
cn2,
27
subject = "response.double";
28
29
it("creates the client and server", () => {
30
cn1 = connect();
31
server = cn1.socket.listen(subject);
32
server.on("connection", (socket) => {
33
socket.on("data", (data) => {
34
socket.write(`${data}`.repeat(2));
35
});
36
});
37
});
38
39
it("connects as client and tests out the server", async () => {
40
cn2 = connect();
41
client = cn2.socket.connect(subject);
42
client.write("cocalc");
43
const [data] = await once(client, "data");
44
expect(data).toBe("cocalccocalc");
45
});
46
47
it("send 3 messages and get 3 responses, in order", async () => {
48
client.write("a");
49
client.write("b");
50
client.write("c");
51
expect((await once(client, "data"))[0]).toBe("aa");
52
expect((await once(client, "data"))[0]).toBe("bb");
53
expect((await once(client, "data"))[0]).toBe("cc");
54
});
55
56
const count = 250;
57
it(`sends ${count} messages and gets responses, so its obviously not super slow`, async () => {
58
const t = Date.now();
59
for (let i = 0; i < count; i++) {
60
client.write(`${i}`);
61
}
62
for (let i = 0; i < count; i++) {
63
expect((await once(client, "data"))[0]).toBe(`${i}`.repeat(2));
64
}
65
expect(Date.now() - t).toBeLessThan(5000);
66
});
67
68
it("cleans up", () => {
69
client.close();
70
server.close();
71
cn1.close();
72
cn2.close();
73
});
74
});
75
76
describe("create a client first, then the server, and see that write still works (testing the order); also include headers in both directions.", () => {
77
let client, server, cn1, cn2, requestPromise;
78
const subject = "cocalc-order";
79
80
it("connects as client and writes to the server that doesn't exist yet", async () => {
81
cn2 = connect();
82
client = cn2.socket.connect(subject);
83
client.write("cocalc", { headers: { my: "header" } });
84
});
85
86
it("we fire off a request as well, but don't wait for it", () => {
87
requestPromise = client.request("foo");
88
});
89
90
it("creates the server", () => {
91
cn1 = connect();
92
server = cn1.socket.listen(subject);
93
server.on("connection", (socket) => {
94
socket.on("data", (data, headers) => {
95
socket.write(`${data}`.repeat(2), { headers });
96
});
97
socket.on("request", (mesg) => {
98
mesg.respondSync("bar", { headers: "x" });
99
});
100
});
101
});
102
103
it("it still works out", async () => {
104
const [data, headers] = await once(client, "data");
105
expect(data).toBe("cocalccocalc");
106
expect(headers).toEqual({ my: "header" });
107
});
108
109
it("get back the response from the request we created above", async () => {
110
const response = await requestPromise;
111
expect(response.data).toBe("bar");
112
expect(response.headers).toBe("x");
113
});
114
115
it("cleans up", () => {
116
client.close();
117
server.close();
118
cn1.close();
119
cn2.close();
120
});
121
});
122
123
describe("create a client first and write more messages than the queue size results in an error", () => {
124
let client, server, cn1, cn2;
125
const subject = "conat.too.many.messages";
126
127
let count = 5,
128
maxQueueSize = 3,
129
iter;
130
it("connects as client with a small queue and fill it", async () => {
131
cn2 = connect();
132
let fails = 0;
133
client = cn2.socket.connect(subject, { maxQueueSize });
134
iter = client.iter();
135
for (let i = 0; i < count; i++) {
136
try {
137
client.write(`${i}`);
138
} catch (err) {
139
// should fail for i=4,5
140
expect(i).toBeGreaterThan(count - maxQueueSize);
141
fails += 1;
142
}
143
}
144
expect(fails).toBe(2);
145
expect(client.queuedWrites.length).toBe(3);
146
});
147
148
const serverRecv: any[] = [];
149
let serverSocket;
150
it("creates the server", () => {
151
cn1 = connect();
152
server = cn1.socket.listen(subject, { maxQueueSize });
153
server.on("connection", (socket) => {
154
serverSocket = socket;
155
socket.on("data", (data) => {
156
serverRecv.push(data);
157
socket.write(`${data}`.repeat(2));
158
});
159
});
160
});
161
162
it(`first ${maxQueueSize} messages do get sent`, async () => {
163
for (let i = 0; i < maxQueueSize; i++) {
164
const { value } = await iter.next();
165
expect(value[0]).toBe(`${i}`.repeat(2));
166
}
167
expect(serverRecv).toEqual(["0", "1", "2"]);
168
});
169
170
it("wait for client to drain; then we can now send another message without an error", async () => {
171
await client.waitUntilDrain();
172
client.write("foo");
173
});
174
175
it("writing too many messages to the server socket also fails", async () => {
176
if (serverSocket.tcp.send.unsent > 0) {
177
await once(serverSocket, "drain");
178
}
179
expect(serverSocket.tcp.send.unsent).toBe(0);
180
serverSocket.write(0);
181
serverSocket.write(1);
182
serverSocket.write(2);
183
expect(() => serverSocket.write(3)).toThrow("WRITE FAILED");
184
try {
185
serverSocket.write(4);
186
} catch (err) {
187
expect(err.code).toBe("ENOBUFS");
188
}
189
});
190
191
it("cleans up", () => {
192
client.close();
193
server.close();
194
cn1.close();
195
cn2.close();
196
});
197
});
198
199
describe("test having two clients and see that communication is independent and also broadcast to both", () => {
200
let client1, client2, server, cn1, cn2, cn3;
201
202
it("creates a server and two clients", async () => {
203
cn3 = connect();
204
server = cn3.socket.listen("cocalc2");
205
server.on("connection", (socket) => {
206
socket.on("data", (data) => {
207
socket.write(`${data}`.repeat(2));
208
});
209
});
210
211
cn1 = connect();
212
client1 = cn1.socket.connect("cocalc2");
213
cn2 = connect();
214
client2 = cn2.socket.connect("cocalc2");
215
});
216
217
it("each client uses the server separately", async () => {
218
const x1 = once(client1, "data");
219
const x2 = once(client2, "data");
220
client1.write("one");
221
client2.write("two");
222
expect((await x1)[0]).toBe("oneone");
223
expect((await x2)[0]).toBe("twotwo");
224
});
225
226
it("server broadcast to all clients", async () => {
227
const x1 = once(client1, "data");
228
const x2 = once(client2, "data");
229
server.write("broadcast");
230
expect((await x1)[0]).toBe("broadcast");
231
expect((await x2)[0]).toBe("broadcast");
232
});
233
234
it("test with a channel", async () => {
235
const s1 = server.channel("one");
236
const c1 = client1.channel("one");
237
const c2 = client2.channel("one");
238
s1.on("connection", (socket) => {
239
socket.on("data", (data) => {
240
socket.write(`1${data}`);
241
});
242
});
243
const x1 = once(c1, "data");
244
const x2 = once(c2, "data");
245
c1.write("c1");
246
expect((await x1)[0]).toBe("1c1");
247
c2.write("c2");
248
expect((await x2)[0]).toBe("1c2");
249
250
s1.close();
251
c1.close();
252
c2.close();
253
});
254
255
it("cleans up", () => {
256
client1.close();
257
client2.close();
258
server.close();
259
cn1.close();
260
cn2.close();
261
cn3.close();
262
});
263
});
264
265
describe("create a server and client. Disconnect the client and see from the server point of view that it disconnected.", () => {
266
let server, cn1;
267
268
it("creates the server", () => {
269
cn1 = connect();
270
server = cn1.socket.listen("disconnect.io");
271
server.on("connection", (socket) => {
272
socket.on("data", () => {
273
socket.write(`clients=${Object.keys(server.sockets).length}`);
274
});
275
});
276
expect(Object.keys(server.sockets).length).toBe(0);
277
});
278
279
let client;
280
it("connects with a client", async () => {
281
cn1 = connect();
282
client = cn1.socket.connect("disconnect.io");
283
const r = once(client, "data");
284
client.write("hello");
285
expect((await r)[0]).toBe("clients=1");
286
});
287
288
it("disconnects and sees the count of clients goes back to 0", async () => {
289
client.close();
290
await wait({
291
until: () => {
292
return Object.keys(server.sockets).length == 0;
293
},
294
});
295
});
296
297
it("creates a new client, connects to server, then closes the server and the client sees that and closes.", async () => {
298
client = cn1.socket.connect("disconnect.io");
299
const iter = client.iter();
300
// confirm working:
301
client.write("hello");
302
const { value } = await iter.next();
303
expect(value[0]).toBe("clients=1");
304
305
expect(client.state).toBe("ready");
306
const closed = once(client, "closed");
307
// now close server and wait for state to quickly automatically
308
// switch to not ready anymore
309
const t0 = Date.now();
310
server.close();
311
await closed;
312
expect(Date.now() - t0).toBeLessThan(250);
313
});
314
});
315
316
describe("create two socket servers with the same subject to test that sockets are sticky", () => {
317
const subject = "a.sticks.place";
318
let c1, c2, s1, s2;
319
it("creates two distinct socket servers with the same subject", () => {
320
c1 = connect();
321
c2 = connect();
322
s1 = c1.socket.listen(subject);
323
s1.on("connection", (socket) => {
324
// console.log("s1 got connection");
325
socket.on("data", () => {
326
// console.log("s1 got data");
327
socket.write("s1");
328
});
329
socket.on("request", (mesg) => mesg.respond("s1"));
330
});
331
s2 = c2.socket.listen(subject);
332
s2.on("connection", (socket) => {
333
// console.log("s2 got connection");
334
socket.on("data", () => {
335
// console.log("s2 got data");
336
socket.write("s2");
337
});
338
socket.on("request", (mesg) => mesg.respond("s2"));
339
});
340
});
341
342
let c3, client, resp;
343
it("creates a client and verifies writes all go to the same server", async () => {
344
c3 = connect();
345
client = c3.socket.connect(subject);
346
const iter = client.iter();
347
client.write(null);
348
const { value } = await iter.next();
349
resp = value[0];
350
// all additional messages end up going to the same server, because
351
// of "sticky" subscriptions :-)
352
for (let i = 0; i < 25; i++) {
353
client.write(null);
354
const { value: value1 } = await iter.next();
355
expect(resp).toBe(value1[0]);
356
}
357
});
358
359
let c3b, s3;
360
it("add one more server and verify that messages still all go to the right place", async () => {
361
c3b = connect();
362
s3 = c3b.socket.listen(subject);
363
let newServerGotConnection = false;
364
s3.on("connection", (socket) => {
365
//console.log("s3 got a connection");
366
newServerGotConnection = true;
367
socket.on("data", () => {
368
//console.log("s3 got data", { data });
369
socket.write("s3");
370
});
371
});
372
const iter = client.iter();
373
for (let i = 0; i < 25; i++) {
374
client.write(null);
375
const { value: value1 } = await iter.next();
376
if (resp != value1[0]) {
377
throw Error("sticky load balancing failed!?");
378
}
379
}
380
expect(newServerGotConnection).toBe(false); // redundant...
381
});
382
383
it("also verify that request/reply messaging go to the right place (stickiness works the same way)", async () => {
384
for (let i = 0; i < 25; i++) {
385
const x = await client.request(null);
386
expect(x.data).toBe(resp);
387
}
388
});
389
390
it("remove the server we're connected to and see that the client socket closes, since all state on the other end is gone (this is the only possible thing that should happen!)", async () => {
391
if (resp == "s1") {
392
s1.close();
393
} else if (resp == "s2") {
394
s2.close();
395
}
396
await once(client, "closed");
397
});
398
399
it("cleans up", () => {
400
s1.close();
401
s2.close();
402
s3.close();
403
c1.close();
404
c2.close();
405
c3.close();
406
c3b.close();
407
client.close();
408
});
409
});
410
411
describe("create a server where the subject has a wildcard, so clients can e.g., authentication themselves by having permission to write to the subject", () => {
412
let client, server, cn1, cn2;
413
it("creates the client and server", () => {
414
cn1 = connect();
415
server = cn1.socket.listen("changefeeds.*");
416
server.on("connection", (socket) => {
417
socket.on("data", () => {
418
socket.write(socket.subject.split(".")[1]);
419
});
420
});
421
});
422
423
it("connects as client on different matching subjects", async () => {
424
cn2 = connect();
425
client = cn2.socket.connect("changefeeds.account-5077");
426
const x = once(client, "data");
427
client.write(null);
428
const [data] = await x;
429
expect(data).toBe("account-5077");
430
client.close();
431
432
client = cn2.socket.connect("changefeeds.account-389");
433
const x2 = once(client, "data");
434
client.write(null);
435
const [data2] = await x2;
436
expect(data2).toBe("account-389");
437
});
438
439
it("cleans up", () => {
440
client.close();
441
server.close();
442
cn1.close();
443
cn2.close();
444
});
445
});
446
447
describe("Check that the automatic reconnection parameter works", () => {
448
let server, cn1;
449
it("creates the server", () => {
450
cn1 = connect();
451
server = cn1.socket.listen("recon");
452
server.on("connection", (socket) => {
453
socket.on("data", (data) => {
454
socket.write(data);
455
});
456
});
457
});
458
459
it("create a client with reconnection (the default) and confirm it works (all states hit)", async () => {
460
const socket = cn1.socket.connect("recon");
461
expect(socket.reconnection).toBe(true); // the default
462
await once(socket, "ready");
463
// have to listen before we trigger it:
464
const y = once(socket, "disconnected");
465
const x = once(socket, "connecting");
466
socket.disconnect();
467
const z = once(socket, "data");
468
469
// write when not connected -- this should get sent
470
// when we connect:
471
socket.write("hi");
472
473
await once(socket, "ready");
474
await y;
475
await x;
476
expect((await z)[0]).toBe("hi");
477
socket.close();
478
});
479
480
it("creates a client without reconnection", async () => {
481
const socket = cn1.socket.connect("recon", { reconnection: false });
482
expect(socket.reconnection).toBe(false);
483
await once(socket, "ready");
484
socket.disconnect();
485
await delay(50);
486
// still disconnected
487
expect(socket.state).toBe("disconnected");
488
// but we can manually connect
489
socket.connect();
490
await once(socket, "ready");
491
socket.close();
492
});
493
});
494
495
describe("creating multiple sockets from the one client to one server works (they should be distinct)", () => {
496
let server, cn1, cn2;
497
const subject = "multiple.sockets.edu";
498
it("creates the client and server", () => {
499
cn1 = connect();
500
server = cn1.socket.listen(subject);
501
server.on("connection", (socket) => {
502
socket.on("data", (data) => {
503
socket.write(`${data}-${socket.id}`);
504
});
505
});
506
});
507
508
it("creates two client sockets", async () => {
509
cn2 = connect();
510
const socket1 = cn2.socket.connect(subject);
511
const socket2 = cn2.socket.connect(subject);
512
expect(socket1.id).not.toEqual(socket2.id);
513
const x = once(socket1, "data");
514
const y = once(socket2, "data");
515
socket1.write("cocalc");
516
socket2.write("conat");
517
const [data] = await x;
518
expect(data).toBe(`cocalc-${socket1.id}`);
519
const [data2] = await y;
520
expect(data2).toBe(`conat-${socket2.id}`);
521
const x1 = once(socket1, "data");
522
const y1 = once(socket2, "data");
523
524
// also test broadcast
525
server.write("hello");
526
expect((await x1)[0]).toBe("hello");
527
expect((await y1)[0]).toBe("hello");
528
529
socket1.close();
530
socket2.close();
531
});
532
533
it("cleans up", () => {
534
server.close();
535
cn1.close();
536
cn2.close();
537
});
538
});
539
540
describe("test request/respond from client to server and from server to client", () => {
541
let socket1, socket2, server, cn1, cn2, cn3;
542
const subject = "request-respond-demo";
543
const sockets: any[] = [];
544
545
it("creates a server and two sockets", async () => {
546
cn3 = connect();
547
server = cn3.socket.listen(subject);
548
server.on("connection", (socket) => {
549
sockets.push(socket);
550
socket.on("request", (mesg) => {
551
mesg.respond(`hi ${mesg.data}, from server`);
552
});
553
});
554
555
cn1 = connect();
556
socket1 = cn1.socket.connect(subject);
557
socket1.on("request", (mesg) => {
558
mesg.respond(`hi ${mesg.data}, from socket1`);
559
});
560
561
cn2 = connect();
562
socket2 = cn2.socket.connect(subject);
563
socket2.on("request", (mesg) => {
564
mesg.respond(`hi ${mesg.data}, from socket2`);
565
});
566
});
567
568
it("each socket calls the server", async () => {
569
expect((await socket1.request("socket1")).data).toBe(
570
"hi socket1, from server",
571
);
572
expect((await socket2.request("socket2")).data).toBe(
573
"hi socket2, from server",
574
);
575
});
576
577
it("the server individually calls each socket", async () => {
578
// note that sockets[0] and sockets[1] might be in
579
// either order.
580
const x = (await sockets[0].request("server")).data;
581
const y = (await sockets[1].request("server")).data;
582
expect(x).not.toEqual(y);
583
expect(x).toContain("hi server, from socket");
584
expect(y).toContain("hi server, from socket");
585
});
586
587
it("broadcast a request to all connected sockets", async () => {
588
const v = (await server.request("server")) as any;
589
const w = v.map((y: any) => y.data);
590
const S = new Set(["hi server, from socket1", "hi server, from socket2"]);
591
expect(new Set(w)).toEqual(S);
592
593
// also broadcast and use race, so we get just the first response.
594
const x = await server.request("server", { race: true });
595
expect(S.has(x.data)).toBe(true);
596
});
597
598
it("cleans up", () => {
599
socket1.close();
600
socket2.close();
601
server.close();
602
cn1.close();
603
cn2.close();
604
cn3.close();
605
});
606
});
607
608
describe("test request/respond with headers", () => {
609
let socket1,
610
server,
611
cn1,
612
cn2,
613
sockets: any[] = [];
614
const subject = "request-respond-headers";
615
616
it("creates a server and a socket", async () => {
617
cn2 = connect();
618
server = cn2.socket.listen(subject);
619
server.on("connection", (socket) => {
620
sockets.push(socket);
621
socket.on("request", (mesg) => {
622
mesg.respond(`server: ${mesg.data}`, {
623
headers: { ...mesg.headers, server: true },
624
});
625
});
626
});
627
628
cn1 = connect();
629
socket1 = cn1.socket.connect(subject);
630
socket1.on("request", (mesg) => {
631
mesg.respond(`socket1: ${mesg.data}`, {
632
headers: { ...mesg.headers, socket1: true },
633
});
634
});
635
});
636
637
it("headers work when client calls server", async () => {
638
const x = await socket1.request("hi", { headers: { foo: 10 } });
639
expect(x.data).toBe("server: hi");
640
expect(x.headers).toEqual(
641
expect.objectContaining({ foo: 10, server: true }),
642
);
643
});
644
645
it("headers work when server calls client", async () => {
646
const x = await sockets[0].request("hi", { headers: { foo: 10 } });
647
expect(x.data).toBe("socket1: hi");
648
expect(x.headers).toEqual(
649
expect.objectContaining({ foo: 10, socket1: true }),
650
);
651
});
652
653
it("cleans up", () => {
654
socket1.close();
655
server.close();
656
cn1.close();
657
cn2.close();
658
});
659
});
660
661
describe("test requestMany/respond", () => {
662
let socket1,
663
server,
664
cn1,
665
cn2,
666
sockets: any[] = [];
667
const subject = "requestMany";
668
669
it("creates a server that handles a requestMany, and a client", async () => {
670
cn2 = connect();
671
server = cn2.socket.listen(subject);
672
server.on("connection", (socket) => {
673
sockets.push(socket);
674
socket.on("request", (mesg) => {
675
for (let i = 0; i < mesg.data; i++) {
676
mesg.respond(i);
677
}
678
});
679
});
680
681
cn1 = connect();
682
socket1 = cn1.socket.connect(subject);
683
});
684
685
it("sends a requestMany request and get 3 responses", async () => {
686
const sub = await socket1.requestMany(10);
687
for (let i = 0; i < 10; i++) {
688
expect((await sub.next()).value.data).toBe(i);
689
}
690
sub.close();
691
});
692
693
it("cleans up", () => {
694
socket1.close();
695
server.close();
696
cn1.close();
697
cn2.close();
698
});
699
});
700
701
afterAll(after);
702
703