Path: blob/master/src/packages/backend/conat/test/persist/persist-client.test.ts
1451 views
/*1Tests of persist client.23pnpm test ./persist-client.test.ts45*/67import {8before,9after,10connect,11restartServer,12restartPersistServer,13wait,14} from "@cocalc/backend/conat/test/setup";15import { stream } from "@cocalc/conat/persist/client";16import { messageData } from "@cocalc/conat/core/client";17import { delay } from "awaiting";1819beforeAll(before);2021jest.setTimeout(10000);22describe("create a persist client stream and test the basic operations", () => {23let client, s1;2425it("creates a client and stream", () => {26client = connect();27s1 = stream({28client,29user: { hub_id: "x" },30storage: { path: "hub/foo" },31});32});3334let seq0;35it("write a value to the stream", async () => {36const { seq, time } = await s1.set({37messageData: messageData("hi", { headers: { foo: "bar" } }),38});39expect(Math.abs(time - Date.now())).toBeLessThan(1000);40seq0 = seq;41});4243it("get the value back", async () => {44const mesg = await s1.get({ seq: seq0 });45expect(mesg.data).toBe("hi");46expect(mesg.headers.foo).toBe("bar");47});4849it("writes a value with a key", async () => {50await s1.set({51key: "my-key",52messageData: messageData("value", { headers: { foo: "bar" } }),53});54const mesg = await s1.get({ key: "my-key" });55expect(mesg.data).toBe("value");56});57});5859describe("restarting persist server", () => {60let client, s1;6162it("creates a client and stream and write test data", async () => {63client = connect();64s1 = stream({65client,66user: { hub_id: "x" },67storage: { path: "hub/bar" },68});69await s1.set({70key: "test",71messageData: messageData("data", { headers: { foo: "bar" } }),72});73});7475it("restart the persist server", async () => {76await restartPersistServer();77});7879it("first attempt to read the data written above fails because persist server hasn't started yet", async () => {80await expect(async () => {81await s1.get({ key: "test", timeout: 500 });82}).rejects.toThrow("no subscribers");83});8485jest.setTimeout(10000);86it("it does start working relatively quickly though", async () => {87await wait({88until: async () => {89try {90await s1.get({ key: "test", timeout: 1500 });91return true;92} catch {}93},94});9596const mesg = await s1.get({ key: "test" });97expect(mesg.data).toBe("data");98});99});100101describe("restarting persist server with an ephemeral stream", () => {102let client, s1;103104it("creates a client and an ephemeral stream and write test data", async () => {105client = connect();106s1 = stream({107client,108user: { hub_id: "x" },109storage: { path: "hub/in-memory-only", ephemeral: true },110});111await s1.set({112key: "test",113messageData: messageData("data", { headers: { foo: "bar" } }),114});115});116117it("restart the persist server", async () => {118await restartPersistServer();119});120121it("our data is gone - it's ephemeral", async () => {122s1 = stream({123client,124user: { hub_id: "x" },125storage: { path: "hub/in-memory-onl", ephemeral: true },126});127await wait({128until: async () => {129try {130const mesg = await s1.get({ key: "test", timeout: 500 });131return mesg === undefined;132} catch {}133},134});135136expect(await s1.get({ key: "test" })).toBe(undefined);137});138});139140describe("restarting the network but not the persist server", () => {141let client, s1;142143it("creates a client and stream and write test data", async () => {144client = connect();145s1 = stream({146client,147user: { hub_id: "x" },148storage: { path: "hub/network" },149});150await s1.set({151key: "test",152messageData: messageData("data", { headers: { foo: "bar" } }),153});154});155156it("restart conat networking", async () => {157await restartServer();158});159160it("it does start working eventually", async () => {161await wait({162until: async () => {163try {164await s1.get({ key: "test", timeout: 1000 });165return true;166} catch {}167},168});169const mesg = await s1.get({ key: "test" });170expect(mesg.data).toBe("data");171});172});173174describe("test a changefeed", () => {175let client, s1, cf;176177it("creates a client, stream and changefeed", async () => {178client = connect();179s1 = stream({180client,181user: { hub_id: "x" },182storage: { path: "hub/changefeed" },183});184cf = await s1.changefeed();185});186187it("write and see result via changefeed", async () => {188await s1.set({189key: "test",190messageData: messageData("data", { headers: { foo: "bar" } }),191});192const { value: updates, done } = await cf.next();193expect(done).toBe(false);194expect(updates[0]).toEqual(195expect.objectContaining({196seq: 1,197key: "test",198headers: { foo: "bar" },199}),200);201});202203let s2, client2;204it("write via another client and see result via changefeed", async () => {205client2 = connect();206s2 = stream({207client: client2,208user: { hub_id: "x" },209storage: { path: "hub/changefeed" },210});211expect(s1).not.toBe(s2);212await s2.set({213key: "test2",214messageData: messageData("data2", { headers: { foo: "bar2" } }),215});216217const { value: updates, done } = await cf.next();218expect(done).toBe(false);219expect(updates[0]).toEqual(220expect.objectContaining({221seq: 2,222key: "test2",223headers: { foo: "bar2" },224}),225);226expect(updates[0].seq).toBe(2);227expect(updates.length).toBe(1);228});229230// this takes a while due to it having to deal with the network restart231it("restart conat socketio server, and verify changefeed still works", async () => {232// send one more233await s2.set({234key: "test3",235messageData: messageData("data3", { headers: { foo: "bar3" } }),236});237await restartServer();238await wait({239until: async () => {240// this set is expected to fail while networking is restarting241try {242await s1.set({243key: "test4",244messageData: messageData("data4", { headers: { foo: "bar4" } }),245timeout: 1000,246});247return true;248} catch {249return false;250}251},252start: 500,253});254255// all three updates must get through, and in the correct order256const { value: updates0, done: done0 } = await cf.next();257expect(done0).toBe(false);258expect(updates0[0].seq).toBe(3);259// its random whether or not test4 comes through as part of the260// first group or not. The ones sent when offline always come261// together in a group.262if (updates0.length >= 2) {263expect(updates0[1].seq).toBe(4);264} else {265const { value: updates1 } = await cf.next();266expect(updates1[0].seq).toBe(4);267}268});269270it("restart the persist server -- this is pretty brutal", async () => {271await restartPersistServer();272});273274it("set still works (with error) after restarting persist server", async () => {275// doing this set should fail due to persist for a second due server being276// off and having to connect again.277await wait({278until: async () => {279try {280await s2.set({281key: "test4",282messageData: messageData("data4", { headers: { foo: "bar4" } }),283timeout: 500,284});285286return true;287} catch {288return false;289}290},291});292const mesg = await s2.get({ key: "test4" });293expect(mesg.data).toBe("data4");294});295296it("changefeed still works after restarting persist server, though what gets received is somewhat random -- the persist server doesn't have its own state so can't guarantee continguous changefeeds when it restarts", async () => {297await delay(1000);298await s2.set({299key: "test5",300messageData: messageData("data5", { headers: { foo: "bar5" } }),301timeout: 1000,302});303const { value: updates, done } = await cf.next();304expect(done).toBe(false);305// changefeed may or may not have dropped a message, depending on timing306expect(updates[0].headers?.foo?.startsWith("bar")).toBe(true);307});308});309310afterAll(after);311312313