Path: blob/master/src/packages/backend/conat/test/sync/dstream-ephemeral.test.ts
1451 views
/*1Testing basic ops with dsteam (distributed streams), but all are ephemeral.23The first tests are initially similar to those for dstream.test.ts, but with4{ephemeral: true}. There are also further tests of the client/server aspects.56DEVELOPMENT:78pnpm test ./dstream-ephemeral.test.ts910*/1112import { connect, before, after, wait } from "@cocalc/backend/conat/test/setup";13import { createDstreamEphemeral as create } from "./util";14import { dstream as createDstream0 } from "@cocalc/backend/conat/sync";15//import { delay } from "awaiting";1617beforeAll(before);1819async function createDstream<T>(opts) {20return await createDstream0<T>({21noCache: true,22noAutosave: true,23ephemeral: true,24...opts,25});26}2728jest.setTimeout(10000);2930describe("create a dstream and do some basic operations", () => {31let s;3233it("creates stream", async () => {34s = await create();35});3637it("starts out empty", () => {38expect(s.getAll()).toEqual([]);39expect(s.length).toEqual(0);40});4142const mesg = { stdout: "hello" };43it("publishes a message to the stream and confirms it is there", () => {44s.push(mesg);45expect(s.getAll()).toEqual([mesg]);46expect(s.length).toEqual(1);47expect(s[0]).toEqual(mesg);48});4950it("verifies that unsaved changes works properly", async () => {51expect(s.hasUnsavedChanges()).toBe(true);52expect(s.unsavedChanges()).toEqual([mesg]);53await s.save();54expect(s.hasUnsavedChanges()).toBe(false);55expect(s.unsavedChanges()).toEqual([]);56});5758it("confirm ephemeralness: closes and re-opens stream and confirms message is NOT there", async () => {59const name = s.name;60await s.save();61// close s:62await s.close();63// using s fails64expect(s.getAll).toThrow("closed");65// wait for server to discard stream data66// (it's instant right now!)67// create new stream with same name68const t = await createDstream({ name });69// ensure it is NOT just from the cache70expect(s === t).toBe(false);71// make sure it does NOT have our message (it should not -- it's ephemeral)72expect(t.getAll()).toEqual([]);73});74});7576describe("create two dstreams and observe sync between them", () => {77const name = `test-${Math.random()}`;78let s1, s2;79let client2;80it("creates two distinct dstream objects s1 and s2 with the same name", async () => {81client2 = connect();82s1 = await createDstream({ name });83s2 = await createDstream({ client: client2, name });84// definitely distinct85expect(s1 === s2).toBe(false);86});8788it("writes to s1 and observes s2 doesn't see anything until we save", async () => {89s1.push("hello");90expect(s1[0]).toEqual("hello");91expect(s2.length).toEqual(0);92await s1.save();93await wait({ until: () => s2[0] == "hello" });94expect(s2[0]).toEqual("hello");95expect(s2.getAll()).toEqual(["hello"]);96});9798it("now write to s2 and save and see that reflected in s1", async () => {99s2.push("hi from s2");100await s2.save();101await wait({ until: () => s1[1] == "hi from s2" && s2[1] == "hi from s2" });102expect(s1[1]).toEqual("hi from s2");103expect(s2[1]).toEqual("hi from s2");104});105106it("s1.stream and s2.stream should be the same right now", () => {107expect(s1.stream.getAll()).toEqual(["hello", "hi from s2"]);108expect(s2.stream.getAll()).toEqual(["hello", "hi from s2"]);109});110111it("s1 and s2 should be the same right now", () => {112expect(s1.getAll()).toEqual(["hello", "hi from s2"]);113expect(s2.getAll()).toEqual(["hello", "hi from s2"]);114});115116it("cleans up", () => {117s1.close();118s2.close();119client2.close();120});121});122123describe("create two dstreams and test sync with parallel save", () => {124const name = `test-${Math.random()}`;125let s1, s2;126let client2;127it("creates two distinct dstream objects s1 and s2 with the same name", async () => {128client2 = connect();129s1 = await createDstream({ name });130s2 = await createDstream({ client: client2, name });131// definitely distinct132expect(s1 === s2).toBe(false);133});134135it("write to s1 and s2 and save at the same time", async () => {136s1.push("s1");137s2.push("s2");138// our changes are reflected locally139expect(s1.getAll()).toEqual(["s1"]);140expect(s2.getAll()).toEqual(["s2"]);141// now kick off the two saves *in parallel*142s1.save();143s2.save();144await wait({ until: () => s1.length >= 2 && s2.length >= 2 });145expect(s1.getAll()).toEqual(s2.getAll());146});147148it("cleans up", () => {149client2.close();150});151});152153describe("get sequence number and time of message", () => {154let s;155156it("creates stream and write message", async () => {157s = await create();158s.push("hello");159});160161it("sequence number is initialized undefined because it is server assigned ", async () => {162const n = s.seq(0);163expect(n).toBe(undefined);164});165166it("time also undefined because it is server assigned ", async () => {167const t = s.time(0);168expect(t).toBe(undefined);169});170171it("save and get server assigned sequence number", async () => {172s.save();173await wait({ until: () => s.seq(0) > 0 });174const n = s.seq(0);175expect(n).toBeGreaterThan(0);176});177178it("get server assigned time", async () => {179const t = s.time(0);180// since testing on the same machine as server, these times should be close:181expect(t.getTime() - Date.now()).toBeLessThan(5000);182});183184it("publish another message and get next server number is bigger", async () => {185const n = s.seq(0);186s.push("there");187await s.save();188const m = s.seq(1);189expect(m).toBeGreaterThan(n);190});191192it("and time is bigger", async () => {193await wait({ until: () => s.time(1) != null });194expect(s.time(0).getTime()).toBeLessThanOrEqual(s.time(1).getTime());195});196});197198describe("testing start_seq", () => {199const name = `test-${Math.random()}`;200let seq;201it("creates a stream and adds 3 messages, noting their assigned sequence numbers", async () => {202const s = await createDstream({ name, noAutosave: true });203s.push(1, 2, 3);204expect(s.getAll()).toEqual([1, 2, 3]);205// save, thus getting sequence numbers206s.save();207await wait({ until: () => s.seq(2) != null });208seq = [s.seq(0), s.seq(1), s.seq(2)];209// tests partly that these are integers...210const n = seq.reduce((a, b) => a + b, 0);211expect(typeof n).toBe("number");212expect(n).toBeGreaterThan(2);213});214215let t;216it("it opens another copy of the stream, but starting with the last sequence number, so only one message", async () => {217const client = connect();218t = await createDstream({219client,220name,221noAutosave: true,222start_seq: seq[2],223});224expect(t.length).toBe(1);225expect(t.getAll()).toEqual([3]);226expect(t.start_seq).toEqual(seq[2]);227});228229it("it then pulls in the previous message, so now two messages are loaded", async () => {230await t.load({ start_seq: seq[1] });231expect(t.length).toBe(2);232expect(t.getAll()).toEqual([2, 3]);233expect(t.start_seq).toEqual(seq[1]);234});235});236237describe("a little bit of a stress test", () => {238const name = `test-${Math.random()}`;239const count = 100;240let s;241it(`creates a stream and pushes ${count} messages`, async () => {242s = await createDstream({243name,244noAutosave: true,245});246for (let i = 0; i < count; i++) {247s.push({ i });248}249expect(s.length).toBe(count);250// [ ] TODO rewrite this save to send everything in a single message251// which gets chunked, will we be much faster, then change the count252// above to 1000 or 10000.253await s.save();254expect(s.length).toBe(count);255});256});257258describe("dstream typescript test", () => {259it("creates stream", async () => {260const name = `test-${Math.random()}`;261const s = await createDstream<string>({ name });262263// write a message with the correct type264s.push("foo");265266// wrong type -- no way to test this, but if you uncomment267// this you should get a typescript error:268269// s.push({ foo: "bar" });270});271});272273describe("ensure there isn't a really obvious subscription leak", () => {274let client;275276it("create a client, which initially has only one subscription (the inbox)", async () => {277client = connect();278expect(client.numSubscriptions()).toBe(0);279await client.getInbox();280expect(client.numSubscriptions()).toBe(1);281});282283const count = 100;284it(`creates and closes ${count} streams and checks there is no leak`, async () => {285const before = client.numSubscriptions();286// create287const a: any = [];288for (let i = 0; i < count; i++) {289a[i] = await createDstream({290name: `${Math.random()}`,291});292}293for (let i = 0; i < count; i++) {294await a[i].close();295}296const after = client.numSubscriptions();297expect(after).toBe(before);298299// also check count on server went down.300expect((await client.getSubscriptions()).size).toBe(before);301});302303it("does another leak test, but with a publish operation each time", async () => {304const before = client.numSubscriptions();305// create306const a: any = [];307for (let i = 0; i < count; i++) {308a[i] = await createDstream({309name: `${Math.random()}`,310noAutosave: true,311});312a[i].publish(i);313await a[i].save();314}315for (let i = 0; i < count; i++) {316await a[i].close();317}318const after = client.numSubscriptions();319expect(after).toBe(before);320});321});322323afterAll(after);324325326