Path: blob/master/src/packages/backend/conat/test/sync/astream.test.ts
1451 views
/*1Testing basic ops with astream23DEVELOPMENT:45pnpm test ./astream.test.ts67*/89import { astream } from "@cocalc/backend/conat/sync";10import { before, after, connect } from "@cocalc/backend/conat/test/setup";11import { delay } from "awaiting";1213beforeAll(before);1415describe("test basics with an astream", () => {16let client, s, s2;17const name = "test-astream";1819it("creates the astream, then publish and read a value", async () => {20client = connect();21s = astream({ name, client });22const { seq } = await s.publish("x");23expect(seq).toBe(1);24expect(await s.get(1)).toBe("x");25});2627it("use a second astream", async () => {28s2 = astream({ name, client, noCache: true });29expect(await s2.get(1)).toBe("x");30s2.close();31});3233it("publish a message with a header", async () => {34const { seq, time } = await s.publish("has a header", {35headers: { foo: "bar" },36});37expect(await s.get(seq)).toBe("has a header");38expect(await s.headers(seq)).toEqual(39expect.objectContaining({ foo: "bar" }),40);41// note that seq and time are also in the header42expect(await s.headers(seq)).toEqual({ foo: "bar", seq, time });43});4445it("closes, then creates a new astream and sees data is there", async () => {46await s.close();47s = await astream({ name, client });48expect(await s.get(1)).toBe("x");49});5051it("get full message, which has both the data and the headers", async () => {52const mesg = await s.getMessage(2);53expect(mesg.data).toBe("has a header");54expect(mesg.headers).toEqual(expect.objectContaining({ foo: "bar" }));55});5657it("getAll messages", async () => {58const x = await s.getAll();59const { value } = await x.next();60expect(value.mesg).toBe("x");61expect(value.seq).toBe(1);62expect(Math.abs(value.time - Date.now())).toBeLessThan(5000);63const { value: value2 } = await x.next();64expect(value2.mesg).toBe("has a header");65expect(value2.headers).toEqual(expect.objectContaining({ foo: "bar" }));66expect(value2.seq).toBe(2);67expect(Math.abs(value2.time - Date.now())).toBeLessThan(5000);68const { done } = await x.next();69expect(done).toBe(true);70});7172it("getAll messages starting from the second one", async () => {73const x = await s.getAll({ start_seq: 2, end_seq: 2 });74const { value } = await x.next();75expect(value.mesg).toBe("has a header");76expect(value.seq).toBe(2);77const { done } = await x.next();78expect(done).toBe(true);79});8081it("getAll messages starting from the first and ending on the first", async () => {82const x = await s.getAll({ start_seq: 1, end_seq: 1 });83const { value } = await x.next();84expect(value.mesg).toBe("x");85expect(value.seq).toBe(1);86const { done } = await x.next();87expect(done).toBe(true);88});8990it("cleans up", () => {91s.close();92});93});9495const stress1 = 1e4;96describe(`stress test -- write, then read back, ${stress1} messages`, () => {97let client, s;98const name = "stress-test";99100it("creates the astream", async () => {101client = connect();102s = await astream({ name, client });103});104105it(`publishes ${stress1} messages`, async () => {106const v: number[] = [];107for (let i = 0; i < stress1; i++) {108v.push(i);109}110const z = await s.push(...v);111expect(z.length).toBe(stress1);112});113114it(`reads back ${stress1} messages`, async () => {115const v: any[] = [];116for await (const x of await s.getAll()) {117v.push(x);118}119expect(v.length).toBe(stress1);120});121122it("cleans up", () => {123s.close();124});125});126127describe("test a changefeed", () => {128let client, s, s2, cf, cf2, cf2b;129const name = "test-astream";130131it("creates two astreams and three changefeeds on them", async () => {132client = connect();133s = astream({ name, client });134cf = await s.changefeed();135s2 = astream({ name, client, noCache: true });136cf2 = await s2.changefeed();137cf2b = await s2.changefeed();138});139140it("writes to the stream and sees this in the changefeed", async () => {141const first = cf.next();142const first2 = cf2.next();143const first2b = cf2b.next();144await s.publish("hi");145146const { value, done } = await first;147expect(done).toBe(false);148149expect(value.mesg).toBe("hi");150const { value: value2 } = await first2;151expect(value2.mesg).toBe("hi");152const { value: value2b } = await first2b;153expect(value2b.mesg).toBe("hi");154});155156it("verify the three changefeeds are all distinct and do not interfere with each other", async () => {157// write 2 messages and see they are received independently158await s.publish("one");159await s.publish("two");160expect((await cf.next()).value.mesg).toBe("one");161expect((await cf.next()).value.mesg).toBe("two");162expect((await cf2.next()).value.mesg).toBe("one");163expect((await cf2b.next()).value.mesg).toBe("one");164expect((await cf2.next()).value.mesg).toBe("two");165expect((await cf2b.next()).value.mesg).toBe("two");166});167168const stress = 10000;169it(`stress test -- write ${stress} values`, async () => {170const v: number[] = [];171for (let i = 0; i < stress; i++) {172v.push(i);173}174const z = await s.push(...v);175expect(z.length).toBe(v.length);176});177178it(`stress test getting ${stress} values from a changefeed`, async () => {179for (let i = 0; i < stress; i++) {180await cf.next();181}182});183184it("cleans up", () => {185s.close();186s2.close();187});188});189190describe("test setting with key, ttl and msgID", () => {191let client, s;192const name = "test-astream-sets";193194it("creates the astream, then publish and read a value", async () => {195client = connect();196s = astream({ name, client });197const { seq } = await s.publish("x", {198key: "y",199headers: { with: "key" },200});201expect(seq).toBe(1);202expect(await s.get(1)).toBe("x");203expect(await s.get("y")).toBe("x");204expect(await s.headers("y")).toEqual(205expect.objectContaining({ with: "key" }),206);207});208209it("publish a value with msgID twice and sees that it only appears once", async () => {210const { seq } = await s.publish("foo", { msgID: "xx" });211const { seq: seq2 } = await s.publish("foo", { msgID: "xx" });212expect(seq).toEqual(seq2);213});214215it("publish a value with ttl and sees it vanishes as expected", async () => {216await s.config({ allow_msg_ttl: true });217const { seq } = await s.publish("foo", { key: "i-have-ttl", ttl: 25 });218expect(await s.get("i-have-ttl")).toBe("foo");219await delay(50);220// call config to force enforcing limits221await s.config();222expect(await s.get("i-have-ttl")).toBe(undefined);223expect(await s.get(seq)).toBe(undefined);224});225226it("cleans up", () => {227s.close();228});229});230231afterAll(after);232233234