Path: blob/master/src/packages/backend/conat/test/persist/multiple-servers.test.ts
1451 views
/*1Unit tests of multiple persist servers at once:23- making numerous distinct clients and seeing that they are distributed across persist servers4- stopping a persist server and seeing that failover happens without data loss56pnpm test `pwd`/multiple-servers.test.ts78*/910import {11before,12after,13connect,14persistServer as defaultPersistServer,15once,16} from "@cocalc/backend/conat/test/setup";17import { stream } from "@cocalc/conat/persist/client";18import { delay } from "awaiting";19import { server as createPersistServer } from "@cocalc/backend/conat/persist";20import { messageData } from "@cocalc/conat/core/client";21import { uuid } from "@cocalc/util/misc";2223beforeAll(before);2425describe("multiple clients using multiple persist servers", () => {26const persistServers: any[] = [];27let numServers = 4;28it(`ceate ${numServers} persist servers`, async () => {29persistServers.push(defaultPersistServer);30for (let i = 0; i < numServers - 1; i++) {31const client = connect();32const persistServer = createPersistServer({ client });33await once(persistServer, "ready");34persistServers.push(persistServer);35}36});3738let persistClients: any[] = [];3940// NOTE: count must be below about 40 to avoid hitting the default41// per-user connection limit of 100.42let count = 30;43const projects: string[] = [];44it(`creates ${count} persist clients`, async () => {45const ids = new Set<string>([]);46for (let i = 0; i < count; i++) {47const client = connect();48const project_id = uuid();49projects.push(project_id);50const persistClient = stream({51client,52user: { project_id },53storage: { path: `projects/${project_id}/foo-${i}` },54});55ids.add(await persistClient.serverId());56persistClients.push(persistClient);57const { seq } = await persistClient.set({58messageData: messageData(i, { headers: { [i]: i } }),59});60expect(seq).toBe(1);61}62// given that we're randomly distributing so many clients (what really matters63// is the user field above)64// it's highly likely we hit all servers.65expect(ids.size).toBe(persistServers.length);66});6768it(`add ${numServers} more persist servers`, async () => {69for (let i = 0; i < numServers - 1; i++) {70const client = connect();71const persistServer = createPersistServer({ client });72await once(persistServer, "ready");73persistServers.push(persistServer);74}75});7677it("read data we wrote above (so having new servers doesn't mess with existing connections)", async () => {78for (let i = 0; i < count; i++) {79const mesg = await persistClients[i].get({ seq: 1 });80expect(mesg.data).toBe(i);81expect(mesg.headers[`${i}`]).toBe(i);82}83});8485it("new clients use exactly the same servers, since the assignment was already made above", async () => {86const ids = new Set<string>([]);87for (let i = 0; i < count; i++) {88const client = connect();89const project_id = projects[i];90const persistClient = stream({91client,92user: { project_id },93storage: { path: `projects/${project_id}/foo-${i}` },94});95ids.add(await persistClient.serverId());96persistClients.push(persistClient);97const { seq } = await persistClient.set({98messageData: messageData(i, { headers: { [i]: i } }),99});100expect(seq).toBe(2);101}102expect(ids.size).toBe(numServers);103});104105it("cleans up", () => {106for (const client of persistClients) {107client.close();108}109for (const server of persistServers) {110server.close();111}112});113});114115afterAll(async () => {116// slight delay so all the sqlites of the persist servers can finish117// writing to disk, so we can delete the temp directory118await delay(100);119await after();120});121122123