Path: blob/master/src/packages/backend/conat/test/sync/dstream.test.ts
1451 views
/*1Testing basic ops with *persistent* dstreams.23DEVELOPMENT:45pnpm test ./dstream.test.ts67*/89import { createDstream as create } from "./util";10import { dstream as createDstream } from "@cocalc/backend/conat/sync";11import { once } from "@cocalc/util/async-utils";12import { connect, before, after, wait } from "@cocalc/backend/conat/test/setup";1314beforeAll(before);1516jest.setTimeout(10000);1718describe("create a dstream and do some basic operations", () => {19let s;2021it("creates stream", async () => {22s = await create();23});2425it("starts out empty", () => {26expect(s.getAll()).toEqual([]);27expect(s.length).toEqual(0);28});2930const mesg = { stdout: "hello" };31it("publishes a message to the stream and confirms it is there", () => {32s.push(mesg);33expect(s.getAll()).toEqual([mesg]);34expect(s.length).toEqual(1);35expect(s[0]).toEqual(mesg);36});3738it("verifies that unsaved changes works properly", async () => {39expect(s.hasUnsavedChanges()).toBe(true);40expect(s.unsavedChanges()).toEqual([mesg]);41await s.save();42expect(s.hasUnsavedChanges()).toBe(false);43expect(s.unsavedChanges()).toEqual([]);44});4546it("confirm persistence: closes and re-opens stream and confirms message is still there", async () => {47const name = s.name;48await s.save();49// close s:50await s.close();51// using s fails52expect(s.getAll).toThrow("closed");53// create new stream with same name54const t = await createDstream({ name });55// ensure it is NOT just from the cache56expect(s === t).toBe(false);57// make sure it has our message58expect(t.getAll()).toEqual([mesg]);59});60});6162describe("create two dstreams and observe sync between them", () => {63const name = `test-${Math.random()}`;64let s1, s2;65it("creates two distinct dstream objects s1 and s2 with the same name", async () => {66s1 = await createDstream({ name, noAutosave: true, noCache: true });67s2 = await createDstream({ name, noAutosave: true, noCache: true });68// definitely distinct69expect(s1 === s2).toBe(false);70});7172it("writes to s1 and observes s2 doesn't see anything until we save", async () => {73s1.push("hello");74expect(s1[0]).toEqual("hello");75expect(s2.length).toEqual(0);76s1.save();77await once(s2, "change");78expect(s2[0]).toEqual("hello");79expect(s2.getAll()).toEqual(["hello"]);80});8182it("now write to s2 and save and see that reflected in s1", async () => {83s2.push("hi from s2");84s2.save();85while (s1[1] != "hi from s2") {86await once(s1, "change");87}88expect(s1[1]).toEqual("hi from s2");89});9091it("write to s1 and s2 and save at the same time and see some 'random choice' of order gets imposed by the server", async () => {92s1.push("s1");93s2.push("s2");94// our changes are reflected locally95expect(s1.getAll()).toEqual(["hello", "hi from s2", "s1"]);96expect(s2.getAll()).toEqual(["hello", "hi from s2", "s2"]);97// now kick off the two saves *in parallel*98s1.save();99s2.save();100await wait({101until: () => {102return s1.length == 4 && s2.length == 4;103},104});105expect(s1.getAll()).toEqual(s2.getAll());106expect(new Set(s1.getAll())).toEqual(107new Set(["hello", "hi from s2", "s1", "s2"]),108);109});110});111112describe("get sequence number and time of message", () => {113let s;114115it("creates stream and write message", async () => {116s = await create();117s.push("hello");118});119120it("sequence number is initialized undefined because it is server assigned ", async () => {121const n = s.seq(0);122expect(n).toBe(undefined);123});124125it("time also undefined because it is server assigned ", async () => {126const t = s.time(0);127expect(t).toBe(undefined);128});129130it("save and get server assigned sequence number", async () => {131s.save();132await once(s, "change");133const n = s.seq(0);134expect(n).toBeGreaterThan(0);135});136137it("get server assigned time", async () => {138const t = s.time(0);139// since testing on the same machine as server, these times should be close:140expect(t.getTime() - Date.now()).toBeLessThan(5000);141});142143it("publish another message and get next server number is bigger", async () => {144const n = s.seq(0);145s.push("there");146await s.save();147const m = s.seq(1);148expect(m).toBeGreaterThan(n);149});150151it("and time is bigger", async () => {152if (s.time(1) == null) {153await once(s, "change");154}155expect(s.time(0).getTime()).toBeLessThan(s.time(1).getTime());156});157});158159describe("closing also saves by default, but not if autosave is off", () => {160let s;161const name = `test-${Math.random()}`;162163it("creates stream and write a message", async () => {164// noAutosave: false is the default:165s = await createDstream({ name, noAutosave: false });166s.push(389);167});168169it("closes then opens and message is there, since autosave is on", async () => {170await s.close();171const t = await createDstream({ name });172expect(t[0]).toEqual(389);173});174175it("make another stream with autosave off, and close which causes LOSS OF DATA", async () => {176const name = `test-${Math.random()}`;177const s = await createDstream({ name, noAutosave: true });178s.push(389);179s.close();180const t = await createDstream({ name, noAutosave: true });181// data is gone forever!182expect(t.length).toBe(0);183});184});185186describe("testing start_seq", () => {187const name = `test-${Math.random()}`;188let seq;189it("creates a stream and adds 3 messages, noting their assigned sequence numbers", async () => {190const s = await createDstream({ name, noAutosave: true });191s.push(1, 2, 3);192expect(s.getAll()).toEqual([1, 2, 3]);193// save, thus getting sequence numbers194s.save();195while (s.seq(2) == null) {196s.save();197await once(s, "change");198}199seq = [s.seq(0), s.seq(1), s.seq(2)];200// tests partly that these are integers...201const n = seq.reduce((a, b) => a + b, 0);202expect(typeof n).toBe("number");203expect(n).toBeGreaterThan(2);204await s.close();205});206207let s;208it("it opens the stream but starting with the last sequence number, so only one message", async () => {209s = await createDstream({210name,211noAutosave: true,212start_seq: seq[2],213});214expect(s.length).toBe(1);215expect(s.getAll()).toEqual([3]);216expect(s.start_seq).toEqual(seq[2]);217});218219it("it then pulls in the previous message, so now two messages are loaded", async () => {220await s.load({ start_seq: seq[1] });221expect(s.length).toBe(2);222expect(s.getAll()).toEqual([2, 3]);223expect(s.start_seq).toEqual(seq[1]);224});225226it("a bigger example involving loading older messages", async () => {227for (let i = 4; i < 100; i++) {228s.push(i);229}230await s.save();231const last = s.seq(s.length - 1);232const mid = s.seq(s.length - 50);233await s.close();234s = await createDstream({235name,236noAutosave: true,237start_seq: last,238});239expect(s.length).toBe(1);240expect(s.getAll()).toEqual([99]);241expect(s.start_seq).toEqual(last);242243await s.load({ start_seq: mid });244expect(s.length).toEqual(50);245expect(s.start_seq).toEqual(mid);246for (let i = 0; i < 50; i++) {247expect(s.get(i)).toBe(i + 50);248}249250await s.load({ start_seq: 0 });251for (let i = 0; i < 99; i++) {252expect(s.get(i)).toBe(i + 1);253}254});255});256257describe("a little bit of a stress test", () => {258const name = `test-${Math.random()}`;259const count = 100;260let s;261it(`creates a stream and pushes ${count} messages`, async () => {262s = await createDstream({263name,264noAutosave: true,265});266for (let i = 0; i < count; i++) {267s.push({ i });268}269expect(s.length).toBe(count);270// NOTE: warning -- this is **MUCH SLOWER**, e.g., 10x slower,271// running under jest, hence why count is small.272await s.save();273expect(s.length).toBe(count);274});275});276277describe("dstream typescript test", () => {278it("creates stream", async () => {279const name = `test-${Math.random()}`;280const s = await createDstream<string>({ name });281282// write a message with the correct type283s.push("foo");284285// wrong type -- no way to test this, but if you uncomment286// this you should get a typescript error:287288// s.push({ foo: "bar" });289});290});291292describe("ensure there isn't a really obvious subscription leak", () => {293let client;294295it("create a client, which initially has only one subscription (the inbox)", async () => {296client = connect();297await client.getInbox();298expect(client.numSubscriptions()).toBe(1);299});300301const count = 100;302it(`creates and closes ${count} streams and checks there is no leak`, async () => {303const before = client.numSubscriptions();304// create305const a: any = [];306for (let i = 0; i < count; i++) {307a[i] = await createDstream({308name: `${Math.random()}`,309});310}311for (let i = 0; i < count; i++) {312await a[i].close();313}314const after = client.numSubscriptions();315expect(after).toBe(before);316317// also check count on server went down.318expect((await client.getSubscriptions()).size).toBe(before);319});320321it("does another leak test, but with a publish operation each time", async () => {322const before = client.numSubscriptions();323// create324const a: any = [];325for (let i = 0; i < count; i++) {326a[i] = await createDstream({327name: `${Math.random()}`,328noAutosave: true,329});330a[i].publish(i);331await a[i].save();332}333for (let i = 0; i < count; i++) {334await a[i].close();335}336const after = client.numSubscriptions();337expect(after).toBe(before);338});339});340341describe("test delete of messages from stream", () => {342let client1, client2, s1, s2;343const name = "test-delete";344it("create two clients", async () => {345client1 = connect();346client2 = connect();347s1 = await createDstream({348client: client1,349name,350noAutosave: true,351noCache: true,352});353s2 = await createDstream({354client: client2,355name,356noAutosave: true,357noCache: true,358});359});360361it("writes message one, confirm seen by other, then delete and confirm works", async () => {362s1.push("hello");363await s1.save();364await wait({ until: () => s2.length > 0 });365s1.delete({ all: true });366await wait({ until: () => s2.length == 0 && s1.length == 0 });367});368369it("same delete test as above but with a few more items and delete on s2 instead", async () => {370for (let i = 0; i < 10; i++) {371s1.push(i);372}373await s1.save();374await wait({ until: () => s2.length == 10 });375s2.delete({ all: true });376await wait({ until: () => s2.length == 0 && s1.length == 0 });377});378379it("delete specific index", async () => {380s1.push("x", "y", "z");381await s1.save();382await wait({ until: () => s2.length == 3 });383s2.delete({ last_index: 1 });384await wait({ until: () => s2.length == 1 && s1.length == 1 });385expect(s1.get()).toEqual(["z"]);386});387388it("delete specific seq number", async () => {389s1.push("x", "y");390await s1.save();391expect(s1.get()).toEqual(["z", "x", "y"]);392const seq = s1.seq(1);393const { seqs } = await s1.delete({ seq });394expect(seqs).toEqual([seq]);395await wait({ until: () => s2.length == 2 && s1.length == 2 });396expect(s1.get()).toEqual(["z", "y"]);397});398399it("delete up to a sequence number", async () => {400s1.push("x", "y");401await s1.save();402expect(s1.get()).toEqual(["z", "y", "x", "y"]);403const seq = s1.seq(1);404const { seqs } = await s1.delete({ last_seq: seq });405expect(seqs.length).toBe(2);406expect(seqs[1]).toBe(seq);407await wait({ until: () => s1.length == 2 });408expect(s1.get()).toEqual(["x", "y"]);409});410});411412afterAll(after);413414415