Path: blob/master/src/packages/backend/conat/test/sync/limits.test.ts
1451 views
/*1Testing the limits.23DEVELOPMENT:45pnpm test ./limits.test.ts67*/89import { dkv as createDkv } from "@cocalc/backend/conat/sync";10import { dstream as createDstream } from "@cocalc/backend/conat/sync";11import { delay } from "awaiting";12import { once } from "@cocalc/util/async-utils";13import {14before,15after,16wait,17connect,18client,19} from "@cocalc/backend/conat/test/setup";2021beforeAll(before);2223describe("create a dkv with limit on the total number of keys, and confirm auto-delete works", () => {24let kv;25const name = `test-${Math.random()}`;2627it("creates the dkv", async () => {28kv = await createDkv({ client, name, config: { max_msgs: 2 } });29expect(kv.getAll()).toEqual({});30});3132it("adds 2 keys, then a third, and sees first is gone", async () => {33kv.a = 10;34kv.b = 20;35expect(kv.a).toEqual(10);36expect(kv.b).toEqual(20);37kv.c = 30;38expect(kv.c).toEqual(30);39// have to wait until it's all saved and acknowledged before enforcing limit40if (!kv.isStable()) {41await once(kv, "stable");42}43// next change is the enforcement happening44if (kv.has("a")) {45await once(kv, "change", 500);46}47// and confirm it48expect(kv.a).toBe(undefined);49expect(kv.getAll()).toEqual({ b: 20, c: 30 });50});5152it("closes the kv", async () => {53await kv.clear();54await kv.close();55});56});5758describe("create a dkv with limit on age of keys, and confirm auto-delete works", () => {59let kv;60const name = `test-${Math.random()}`;6162it("creates the dkv", async () => {63kv = await createDkv({ client, name, config: { max_age: 50 } });64expect(kv.getAll()).toEqual({});65});6667it("adds 2 keys, then a third, and sees first two are gone due to aging out", async () => {68kv.a = 10;69kv.b = 20;70expect(kv.a).toEqual(10);71expect(kv.b).toEqual(20);72await kv.save();73await kv.config();74await delay(50);75await kv.config();76await delay(10);77expect(kv.has("a")).toBe(false);78expect(kv.has("b")).toBe(false);79});8081it("closes the kv", async () => {82await kv.clear();83await kv.close();84});85});8687describe("create a dkv with limit on total bytes of keys, and confirm auto-delete works", () => {88let kv;89const name = `test-${Math.random()}`;9091it("creates the dkv", async () => {92kv = await createDkv({ client, name, config: { max_bytes: 100 } });93expect(kv.getAll()).toEqual({});94});9596it("adds a key, then a second, and sees first one is gone due to bytes", async () => {97kv.a = "x".repeat(50);98kv.b = "x".repeat(55);99expect(kv.getAll()).toEqual({ a: "x".repeat(50), b: "x".repeat(55) });100await kv.save();101expect(kv.has("b")).toBe(true);102await wait({103until: async () => {104await kv.config();105return !kv.has("a");106},107});108expect(kv.getAll()).toEqual({ b: "x".repeat(55) });109});110111it("closes the kv", async () => {112await kv.clear();113await kv.close();114});115});116117describe("create a dkv with limit on max_msg_size, and confirm writing small messages works but writing a big one result in a 'reject' event", () => {118let kv;119const name = `test-${Math.random()}`;120121it("creates the dkv", async () => {122kv = await createDkv({ client, name, config: { max_msg_size: 100 } });123expect(kv.getAll()).toEqual({});124});125126it("adds a key, then a second big one results in a 'reject' event", async () => {127const rejects: { key: string; value: string }[] = [];128kv.once("reject", (x) => {129rejects.push(x);130});131kv.a = "x".repeat(50);132await kv.save();133kv.b = "x".repeat(150);134await kv.save();135expect(rejects).toEqual([{ key: "b", value: "x".repeat(150) }]);136expect(kv.has("b")).toBe(false);137});138139it("closes the kv", async () => {140await kv.clear();141await kv.close();142});143});144145describe("create a dstream with limit on the total number of messages, and confirm max_msgs, max_age works", () => {146let s, s2;147const name = `test-${Math.random()}`;148149it("creates the dstream and another with a different client", async () => {150s = await createDstream({ client, name, config: { max_msgs: 2 } });151s2 = await createDstream({152client: connect(),153name,154config: { max_msgs: 2 },155noCache: true,156});157expect(s.get()).toEqual([]);158expect((await s.config()).max_msgs).toBe(2);159expect((await s2.config()).max_msgs).toBe(2);160});161162it("push 2 messages, then a third, and see first is gone and that this is reflected on both clients", async () => {163expect((await s.config()).max_msgs).toBe(2);164expect((await s2.config()).max_msgs).toBe(2);165s.push("a");166s.push("b");167await wait({ until: () => s.length == 2 && s2.length == 2 });168expect(s2.get()).toEqual(["a", "b"]);169s.push("c");170await wait({171until: () =>172s.get(0) != "a" &&173s.get(1) == "c" &&174s2.get(0) != "a" &&175s2.get(1) == "c",176});177expect(s.getAll()).toEqual(["b", "c"]);178expect(s2.getAll()).toEqual(["b", "c"]);179180// also check limits ar enforced if we close, then open new one:181await s.close();182s = await createDstream({ client, name, config: { max_msgs: 2 } });183expect(s.getAll()).toEqual(["b", "c"]);184185await s.config({ max_msgs: -1 });186});187188it("verifies that max_age works", async () => {189await s.save();190expect(s.hasUnsavedChanges()).toBe(false);191await delay(300);192s.push("new");193await s.config({ max_age: 20 }); // anything older than 20ms should be deleted194await wait({ until: () => s.length == 1 });195expect(s.getAll()).toEqual(["new"]);196await s.config({ max_age: -1 });197});198199it("verifies that ttl works", async () => {200const conf = await s.config();201expect(conf.allow_msg_ttl).toBe(false);202const conf2 = await s.config({ max_age: -1, allow_msg_ttl: true });203expect(conf2.allow_msg_ttl).toBe(true);204205s.publish("ttl-message", { ttl: 50 });206await s.save();207await wait({208until: async () => {209await s.config();210return s.length == 1;211},212});213expect(s.get()).toEqual(["new"]);214});215216it("verifies that max_bytes works -- publishing something too large causes everything to end up gone", async () => {217const conf = await s.config({ max_bytes: 100 });218expect(conf.max_bytes).toBe(100);219s.publish("x".repeat(1000));220await s.config();221await wait({ until: () => s.length == 0 });222expect(s.length).toBe(0);223});224225it("max_bytes -- publish something then another thing that causes the first to get deleted", async () => {226s.publish("x".repeat(75));227s.publish("y".repeat(90));228await wait({229until: async () => {230await s.config();231return s.length == 1;232},233});234expect(s.get()).toEqual(["y".repeat(90)]);235await s.config({ max_bytes: -1 });236});237238it("verifies that max_msg_size rejects messages that are too big", async () => {239await s.config({ max_msg_size: 100 });240expect((await s.config()).max_msg_size).toBe(100);241s.publish("x".repeat(70));242await expect(async () => {243await s.stream.publish("x".repeat(150));244}).rejects.toThrowError("max_msg_size");245await s.config({ max_msg_size: 200 });246s.publish("x".repeat(150));247await s.config({ max_msg_size: -1 });248expect((await s.config()).max_msg_size).toBe(-1);249});250251it("closes the stream", async () => {252await s.close();253await s2.close();254});255});256257describe("create a dstream with limit on max_age, and confirm auto-delete works", () => {258let s;259const name = `test-${Math.random()}`;260261it("creates the dstream", async () => {262s = await createDstream({ client, name, config: { max_age: 50 } });263});264265it("push a message, then another and see first disappears", async () => {266s.push({ a: 10 });267await delay(75);268s.push({ b: 20 });269expect(s.get()).toEqual([{ a: 10 }, { b: 20 }]);270await wait({271until: async () => {272await s.config();273return s.length == 1;274},275});276expect(s.getAll()).toEqual([{ b: 20 }]);277});278279it("closes the stream", async () => {280await s.delete({ all: true });281await s.close();282});283});284285describe("create a dstream with limit on max_bytes, and confirm auto-delete works", () => {286let s;287const name = `test-${Math.random()}`;288289it("creates the dstream", async () => {290// note: 60 and not 40 due to slack for headers291s = await createDstream({ client, name, config: { max_bytes: 60 } });292});293294it("push a message, then another and see first disappears", async () => {295s.push("x".repeat(40));296s.push("x".repeat(45));297s.push("x");298if (!s.isStable()) {299await once(s, "stable");300}301expect(s.getAll()).toEqual(["x".repeat(45), "x"]);302});303304it("closes the stream", async () => {305await s.delete({ all: true });306await s.close();307});308});309310describe("create a dstream with limit on max_msg_size, and confirm auto-delete works", () => {311let s;312const name = `test-${Math.random()}`;313314it("creates the dstream", async () => {315s = await createDstream({ client, name, config: { max_msg_size: 50 } });316});317318it("push a message, then another and see first disappears", async () => {319const rejects: any[] = [];320s.on("reject", ({ mesg }) => {321rejects.push(mesg);322});323s.push("x".repeat(40));324s.push("y".repeat(60)); // silently vanishes (well a reject event is emitted)325s.push("x");326await wait({327until: async () => {328await s.config();329return s.length == 2;330},331});332expect(s.getAll()).toEqual(["x".repeat(40), "x"]);333expect(rejects).toEqual(["y".repeat(60)]);334});335336it("closes the stream", async () => {337await s.close();338});339});340341describe("test discard_policy 'new' where writes are rejected rather than old data being deleted, for max_bytes and max_msgs", () => {342let s;343const name = `test-${Math.random()}`;344345it("creates the dstream", async () => {346s = await createDstream({347client,348name,349// we can write at most 300 bytes and 3 messages. beyond that we350// get reject events.351config: {352max_bytes: 300,353max_msgs: 3,354discard_policy: "new",355desc: { example: "config" },356},357});358const rejects: any[] = [];359s.on("reject", ({ mesg }) => {360rejects.push(mesg);361});362s.publish("x");363s.publish("y");364s.publish("w");365s.publish("foo");366367await wait({368until: async () => {369await s.config();370return rejects.length == 1;371},372});373expect(s.getAll()).toEqual(["x", "y", "w"]);374expect(rejects).toEqual(["foo"]);375376s.publish("x".repeat(299));377await wait({378until: async () => {379await s.config();380return rejects.length == 2;381},382});383expect(s.getAll()).toEqual(["x", "y", "w"]);384expect(rejects).toEqual(["foo", "x".repeat(299)]);385});386387it("check the config is persisted", async () => {388const lastConfig = await s.config();389s.close();390s = await createDstream({391client,392name,393noCache: true,394});395const config = await s.config();396expect(lastConfig).toEqual(config);397expect(lastConfig.desc).toEqual({ example: "config" });398});399400it("closes the stream", async () => {401s.close();402});403});404405describe("test rate limiting", () => {406let s;407const name = `test-${Math.random()}`;408409it("creates the dstream", async () => {410s = await createDstream({411client,412name,413// we can write at most 300 bytes and 3 messages. beyond that we414// get reject events.415config: {416max_bytes_per_second: 300,417max_msgs_per_second: 3,418discard_policy: "new",419},420});421const rejects: any[] = [];422s.on("reject", ({ mesg }) => {423rejects.push(mesg);424});425});426427it("closes the stream", async () => {428await s.close();429});430});431432import { EPHEMERAL_MAX_BYTES } from "@cocalc/conat/persist/storage";433describe(`ephemeral streams always have a hard cap of ${EPHEMERAL_MAX_BYTES} on max_bytes `, () => {434let s;435it("creates a non-ephemeral dstream and checks no automatic max_bytes set", async () => {436const s1 = await createDstream({437client,438name: "test-NON-ephemeral",439ephemeral: false,440});441expect((await s1.config()).max_bytes).toBe(-1);442s1.close();443});444445it("creates an ephemeral dstream and checks max bytes automatically set", async () => {446s = await createDstream({447client,448name: "test-ephemeral",449ephemeral: true,450});451expect((await s.config()).max_bytes).toBe(EPHEMERAL_MAX_BYTES);452});453454it("trying to set larger doesn't work", async () => {455expect(456(await s.config({ max_bytes: 2 * EPHEMERAL_MAX_BYTES })).max_bytes,457).toBe(EPHEMERAL_MAX_BYTES);458expect((await s.config()).max_bytes).toBe(EPHEMERAL_MAX_BYTES);459});460461it("setting it smaller is allowed", async () => {462expect(463(await s.config({ max_bytes: EPHEMERAL_MAX_BYTES / 2 })).max_bytes,464).toBe(EPHEMERAL_MAX_BYTES / 2);465expect((await s.config()).max_bytes).toBe(EPHEMERAL_MAX_BYTES / 2);466});467});468469afterAll(after);470471472