Path: blob/master/src/packages/backend/conat/test/socket/basic.test.ts
1451 views
/*12pnpm test `pwd`/basic.test.ts34*/56import {7before,8after,9connect,10wait,11setDefaultTimeouts,12} from "@cocalc/backend/conat/test/setup";13import { once } from "@cocalc/util/async-utils";14import { delay } from "awaiting";1516beforeAll(async () => {17await before();18setDefaultTimeouts({ request: 750, publish: 750 });19});2021describe("create a server and client, then send a message and get a response", () => {22let client,23server,24cn1,25cn2,26subject = "response.double";2728it("creates the client and server", () => {29cn1 = connect();30server = cn1.socket.listen(subject);31server.on("connection", (socket) => {32socket.on("data", (data) => {33socket.write(`${data}`.repeat(2));34});35});36});3738it("connects as client and tests out the server", async () => {39cn2 = connect();40client = cn2.socket.connect(subject);41client.write("cocalc");42const [data] = await once(client, "data");43expect(data).toBe("cocalccocalc");44});4546it("send 3 messages and get 3 responses, in order", async () => {47client.write("a");48client.write("b");49client.write("c");50expect((await once(client, "data"))[0]).toBe("aa");51expect((await once(client, "data"))[0]).toBe("bb");52expect((await once(client, "data"))[0]).toBe("cc");53});5455const count = 250;56it(`sends ${count} messages and gets responses, so its obviously not super slow`, async () => {57const t = Date.now();58for (let i = 0; i < count; i++) {59client.write(`${i}`);60}61for (let i = 0; i < count; i++) {62expect((await once(client, "data"))[0]).toBe(`${i}`.repeat(2));63}64expect(Date.now() - t).toBeLessThan(5000);65});6667it("cleans up", () => {68client.close();69server.close();70cn1.close();71cn2.close();72});73});7475describe("create a client first, then the server, and see that write still works (testing the order); also include headers in both directions.", () => {76let client, server, cn1, cn2, requestPromise;77const subject = "cocalc-order";7879it("connects as client and writes to the server that doesn't exist yet", async () => {80cn2 = connect();81client = cn2.socket.connect(subject);82client.write("cocalc", { headers: { my: "header" } });83});8485it("we fire off a request as well, but don't wait for it", () => {86requestPromise = client.request("foo");87});8889it("creates the server", () => {90cn1 = connect();91server = cn1.socket.listen(subject);92server.on("connection", (socket) => {93socket.on("data", (data, headers) => {94socket.write(`${data}`.repeat(2), { headers });95});96socket.on("request", (mesg) => {97mesg.respondSync("bar", { headers: "x" });98});99});100});101102it("it still works out", async () => {103const [data, headers] = await once(client, "data");104expect(data).toBe("cocalccocalc");105expect(headers).toEqual({ my: "header" });106});107108it("get back the response from the request we created above", async () => {109const response = await requestPromise;110expect(response.data).toBe("bar");111expect(response.headers).toBe("x");112});113114it("cleans up", () => {115client.close();116server.close();117cn1.close();118cn2.close();119});120});121122describe("create a client first and write more messages than the queue size results in an error", () => {123let client, server, cn1, cn2;124const subject = "conat.too.many.messages";125126let count = 5,127maxQueueSize = 3,128iter;129it("connects as client with a small queue and fill it", async () => {130cn2 = connect();131let fails = 0;132client = cn2.socket.connect(subject, { maxQueueSize });133iter = client.iter();134for (let i = 0; i < count; i++) {135try {136client.write(`${i}`);137} catch (err) {138// should fail for i=4,5139expect(i).toBeGreaterThan(count - maxQueueSize);140fails += 1;141}142}143expect(fails).toBe(2);144expect(client.queuedWrites.length).toBe(3);145});146147const serverRecv: any[] = [];148let serverSocket;149it("creates the server", () => {150cn1 = connect();151server = cn1.socket.listen(subject, { maxQueueSize });152server.on("connection", (socket) => {153serverSocket = socket;154socket.on("data", (data) => {155serverRecv.push(data);156socket.write(`${data}`.repeat(2));157});158});159});160161it(`first ${maxQueueSize} messages do get sent`, async () => {162for (let i = 0; i < maxQueueSize; i++) {163const { value } = await iter.next();164expect(value[0]).toBe(`${i}`.repeat(2));165}166expect(serverRecv).toEqual(["0", "1", "2"]);167});168169it("wait for client to drain; then we can now send another message without an error", async () => {170await client.waitUntilDrain();171client.write("foo");172});173174it("writing too many messages to the server socket also fails", async () => {175if (serverSocket.tcp.send.unsent > 0) {176await once(serverSocket, "drain");177}178expect(serverSocket.tcp.send.unsent).toBe(0);179serverSocket.write(0);180serverSocket.write(1);181serverSocket.write(2);182expect(() => serverSocket.write(3)).toThrow("WRITE FAILED");183try {184serverSocket.write(4);185} catch (err) {186expect(err.code).toBe("ENOBUFS");187}188});189190it("cleans up", () => {191client.close();192server.close();193cn1.close();194cn2.close();195});196});197198describe("test having two clients and see that communication is independent and also broadcast to both", () => {199let client1, client2, server, cn1, cn2, cn3;200201it("creates a server and two clients", async () => {202cn3 = connect();203server = cn3.socket.listen("cocalc2");204server.on("connection", (socket) => {205socket.on("data", (data) => {206socket.write(`${data}`.repeat(2));207});208});209210cn1 = connect();211client1 = cn1.socket.connect("cocalc2");212cn2 = connect();213client2 = cn2.socket.connect("cocalc2");214});215216it("each client uses the server separately", async () => {217const x1 = once(client1, "data");218const x2 = once(client2, "data");219client1.write("one");220client2.write("two");221expect((await x1)[0]).toBe("oneone");222expect((await x2)[0]).toBe("twotwo");223});224225it("server broadcast to all clients", async () => {226const x1 = once(client1, "data");227const x2 = once(client2, "data");228server.write("broadcast");229expect((await x1)[0]).toBe("broadcast");230expect((await x2)[0]).toBe("broadcast");231});232233it("test with a channel", async () => {234const s1 = server.channel("one");235const c1 = client1.channel("one");236const c2 = client2.channel("one");237s1.on("connection", (socket) => {238socket.on("data", (data) => {239socket.write(`1${data}`);240});241});242const x1 = once(c1, "data");243const x2 = once(c2, "data");244c1.write("c1");245expect((await x1)[0]).toBe("1c1");246c2.write("c2");247expect((await x2)[0]).toBe("1c2");248249s1.close();250c1.close();251c2.close();252});253254it("cleans up", () => {255client1.close();256client2.close();257server.close();258cn1.close();259cn2.close();260cn3.close();261});262});263264describe("create a server and client. Disconnect the client and see from the server point of view that it disconnected.", () => {265let server, cn1;266267it("creates the server", () => {268cn1 = connect();269server = cn1.socket.listen("disconnect.io");270server.on("connection", (socket) => {271socket.on("data", () => {272socket.write(`clients=${Object.keys(server.sockets).length}`);273});274});275expect(Object.keys(server.sockets).length).toBe(0);276});277278let client;279it("connects with a client", async () => {280cn1 = connect();281client = cn1.socket.connect("disconnect.io");282const r = once(client, "data");283client.write("hello");284expect((await r)[0]).toBe("clients=1");285});286287it("disconnects and sees the count of clients goes back to 0", async () => {288client.close();289await wait({290until: () => {291return Object.keys(server.sockets).length == 0;292},293});294});295296it("creates a new client, connects to server, then closes the server and the client sees that and closes.", async () => {297client = cn1.socket.connect("disconnect.io");298const iter = client.iter();299// confirm working:300client.write("hello");301const { value } = await iter.next();302expect(value[0]).toBe("clients=1");303304expect(client.state).toBe("ready");305const closed = once(client, "closed");306// now close server and wait for state to quickly automatically307// switch to not ready anymore308const t0 = Date.now();309server.close();310await closed;311expect(Date.now() - t0).toBeLessThan(250);312});313});314315describe("create two socket servers with the same subject to test that sockets are sticky", () => {316const subject = "a.sticks.place";317let c1, c2, s1, s2;318it("creates two distinct socket servers with the same subject", () => {319c1 = connect();320c2 = connect();321s1 = c1.socket.listen(subject);322s1.on("connection", (socket) => {323// console.log("s1 got connection");324socket.on("data", () => {325// console.log("s1 got data");326socket.write("s1");327});328socket.on("request", (mesg) => mesg.respond("s1"));329});330s2 = c2.socket.listen(subject);331s2.on("connection", (socket) => {332// console.log("s2 got connection");333socket.on("data", () => {334// console.log("s2 got data");335socket.write("s2");336});337socket.on("request", (mesg) => mesg.respond("s2"));338});339});340341let c3, client, resp;342it("creates a client and verifies writes all go to the same server", async () => {343c3 = connect();344client = c3.socket.connect(subject);345const iter = client.iter();346client.write(null);347const { value } = await iter.next();348resp = value[0];349// all additional messages end up going to the same server, because350// of "sticky" subscriptions :-)351for (let i = 0; i < 25; i++) {352client.write(null);353const { value: value1 } = await iter.next();354expect(resp).toBe(value1[0]);355}356});357358let c3b, s3;359it("add one more server and verify that messages still all go to the right place", async () => {360c3b = connect();361s3 = c3b.socket.listen(subject);362let newServerGotConnection = false;363s3.on("connection", (socket) => {364//console.log("s3 got a connection");365newServerGotConnection = true;366socket.on("data", () => {367//console.log("s3 got data", { data });368socket.write("s3");369});370});371const iter = client.iter();372for (let i = 0; i < 25; i++) {373client.write(null);374const { value: value1 } = await iter.next();375if (resp != value1[0]) {376throw Error("sticky load balancing failed!?");377}378}379expect(newServerGotConnection).toBe(false); // redundant...380});381382it("also verify that request/reply messaging go to the right place (stickiness works the same way)", async () => {383for (let i = 0; i < 25; i++) {384const x = await client.request(null);385expect(x.data).toBe(resp);386}387});388389it("remove the server we're connected to and see that the client socket closes, since all state on the other end is gone (this is the only possible thing that should happen!)", async () => {390if (resp == "s1") {391s1.close();392} else if (resp == "s2") {393s2.close();394}395await once(client, "closed");396});397398it("cleans up", () => {399s1.close();400s2.close();401s3.close();402c1.close();403c2.close();404c3.close();405c3b.close();406client.close();407});408});409410describe("create a server where the subject has a wildcard, so clients can e.g., authentication themselves by having permission to write to the subject", () => {411let client, server, cn1, cn2;412it("creates the client and server", () => {413cn1 = connect();414server = cn1.socket.listen("changefeeds.*");415server.on("connection", (socket) => {416socket.on("data", () => {417socket.write(socket.subject.split(".")[1]);418});419});420});421422it("connects as client on different matching subjects", async () => {423cn2 = connect();424client = cn2.socket.connect("changefeeds.account-5077");425const x = once(client, "data");426client.write(null);427const [data] = await x;428expect(data).toBe("account-5077");429client.close();430431client = cn2.socket.connect("changefeeds.account-389");432const x2 = once(client, "data");433client.write(null);434const [data2] = await x2;435expect(data2).toBe("account-389");436});437438it("cleans up", () => {439client.close();440server.close();441cn1.close();442cn2.close();443});444});445446describe("Check that the automatic reconnection parameter works", () => {447let server, cn1;448it("creates the server", () => {449cn1 = connect();450server = cn1.socket.listen("recon");451server.on("connection", (socket) => {452socket.on("data", (data) => {453socket.write(data);454});455});456});457458it("create a client with reconnection (the default) and confirm it works (all states hit)", async () => {459const socket = cn1.socket.connect("recon");460expect(socket.reconnection).toBe(true); // the default461await once(socket, "ready");462// have to listen before we trigger it:463const y = once(socket, "disconnected");464const x = once(socket, "connecting");465socket.disconnect();466const z = once(socket, "data");467468// write when not connected -- this should get sent469// when we connect:470socket.write("hi");471472await once(socket, "ready");473await y;474await x;475expect((await z)[0]).toBe("hi");476socket.close();477});478479it("creates a client without reconnection", async () => {480const socket = cn1.socket.connect("recon", { reconnection: false });481expect(socket.reconnection).toBe(false);482await once(socket, "ready");483socket.disconnect();484await delay(50);485// still disconnected486expect(socket.state).toBe("disconnected");487// but we can manually connect488socket.connect();489await once(socket, "ready");490socket.close();491});492});493494describe("creating multiple sockets from the one client to one server works (they should be distinct)", () => {495let server, cn1, cn2;496const subject = "multiple.sockets.edu";497it("creates the client and server", () => {498cn1 = connect();499server = cn1.socket.listen(subject);500server.on("connection", (socket) => {501socket.on("data", (data) => {502socket.write(`${data}-${socket.id}`);503});504});505});506507it("creates two client sockets", async () => {508cn2 = connect();509const socket1 = cn2.socket.connect(subject);510const socket2 = cn2.socket.connect(subject);511expect(socket1.id).not.toEqual(socket2.id);512const x = once(socket1, "data");513const y = once(socket2, "data");514socket1.write("cocalc");515socket2.write("conat");516const [data] = await x;517expect(data).toBe(`cocalc-${socket1.id}`);518const [data2] = await y;519expect(data2).toBe(`conat-${socket2.id}`);520const x1 = once(socket1, "data");521const y1 = once(socket2, "data");522523// also test broadcast524server.write("hello");525expect((await x1)[0]).toBe("hello");526expect((await y1)[0]).toBe("hello");527528socket1.close();529socket2.close();530});531532it("cleans up", () => {533server.close();534cn1.close();535cn2.close();536});537});538539describe("test request/respond from client to server and from server to client", () => {540let socket1, socket2, server, cn1, cn2, cn3;541const subject = "request-respond-demo";542const sockets: any[] = [];543544it("creates a server and two sockets", async () => {545cn3 = connect();546server = cn3.socket.listen(subject);547server.on("connection", (socket) => {548sockets.push(socket);549socket.on("request", (mesg) => {550mesg.respond(`hi ${mesg.data}, from server`);551});552});553554cn1 = connect();555socket1 = cn1.socket.connect(subject);556socket1.on("request", (mesg) => {557mesg.respond(`hi ${mesg.data}, from socket1`);558});559560cn2 = connect();561socket2 = cn2.socket.connect(subject);562socket2.on("request", (mesg) => {563mesg.respond(`hi ${mesg.data}, from socket2`);564});565});566567it("each socket calls the server", async () => {568expect((await socket1.request("socket1")).data).toBe(569"hi socket1, from server",570);571expect((await socket2.request("socket2")).data).toBe(572"hi socket2, from server",573);574});575576it("the server individually calls each socket", async () => {577// note that sockets[0] and sockets[1] might be in578// either order.579const x = (await sockets[0].request("server")).data;580const y = (await sockets[1].request("server")).data;581expect(x).not.toEqual(y);582expect(x).toContain("hi server, from socket");583expect(y).toContain("hi server, from socket");584});585586it("broadcast a request to all connected sockets", async () => {587const v = (await server.request("server")) as any;588const w = v.map((y: any) => y.data);589const S = new Set(["hi server, from socket1", "hi server, from socket2"]);590expect(new Set(w)).toEqual(S);591592// also broadcast and use race, so we get just the first response.593const x = await server.request("server", { race: true });594expect(S.has(x.data)).toBe(true);595});596597it("cleans up", () => {598socket1.close();599socket2.close();600server.close();601cn1.close();602cn2.close();603cn3.close();604});605});606607describe("test request/respond with headers", () => {608let socket1,609server,610cn1,611cn2,612sockets: any[] = [];613const subject = "request-respond-headers";614615it("creates a server and a socket", async () => {616cn2 = connect();617server = cn2.socket.listen(subject);618server.on("connection", (socket) => {619sockets.push(socket);620socket.on("request", (mesg) => {621mesg.respond(`server: ${mesg.data}`, {622headers: { ...mesg.headers, server: true },623});624});625});626627cn1 = connect();628socket1 = cn1.socket.connect(subject);629socket1.on("request", (mesg) => {630mesg.respond(`socket1: ${mesg.data}`, {631headers: { ...mesg.headers, socket1: true },632});633});634});635636it("headers work when client calls server", async () => {637const x = await socket1.request("hi", { headers: { foo: 10 } });638expect(x.data).toBe("server: hi");639expect(x.headers).toEqual(640expect.objectContaining({ foo: 10, server: true }),641);642});643644it("headers work when server calls client", async () => {645const x = await sockets[0].request("hi", { headers: { foo: 10 } });646expect(x.data).toBe("socket1: hi");647expect(x.headers).toEqual(648expect.objectContaining({ foo: 10, socket1: true }),649);650});651652it("cleans up", () => {653socket1.close();654server.close();655cn1.close();656cn2.close();657});658});659660describe("test requestMany/respond", () => {661let socket1,662server,663cn1,664cn2,665sockets: any[] = [];666const subject = "requestMany";667668it("creates a server that handles a requestMany, and a client", async () => {669cn2 = connect();670server = cn2.socket.listen(subject);671server.on("connection", (socket) => {672sockets.push(socket);673socket.on("request", (mesg) => {674for (let i = 0; i < mesg.data; i++) {675mesg.respond(i);676}677});678});679680cn1 = connect();681socket1 = cn1.socket.connect(subject);682});683684it("sends a requestMany request and get 3 responses", async () => {685const sub = await socket1.requestMany(10);686for (let i = 0; i < 10; i++) {687expect((await sub.next()).value.data).toBe(i);688}689sub.close();690});691692it("cleans up", () => {693socket1.close();694server.close();695cn1.close();696cn2.close();697});698});699700afterAll(after);701702703