Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/backend/conat/test/sync/limits.test.ts
1454 views
1
/*
2
Testing the limits.
3
4
DEVELOPMENT:
5
6
pnpm test ./limits.test.ts
7
8
*/
9
10
import { dkv as createDkv } from "@cocalc/backend/conat/sync";
11
import { dstream as createDstream } from "@cocalc/backend/conat/sync";
12
import { delay } from "awaiting";
13
import { once } from "@cocalc/util/async-utils";
14
import {
15
before,
16
after,
17
wait,
18
connect,
19
client,
20
} from "@cocalc/backend/conat/test/setup";
21
22
beforeAll(before);
23
24
describe("create a dkv with limit on the total number of keys, and confirm auto-delete works", () => {
25
let kv;
26
const name = `test-${Math.random()}`;
27
28
it("creates the dkv", async () => {
29
kv = await createDkv({ client, name, config: { max_msgs: 2 } });
30
expect(kv.getAll()).toEqual({});
31
});
32
33
it("adds 2 keys, then a third, and sees first is gone", async () => {
34
kv.a = 10;
35
kv.b = 20;
36
expect(kv.a).toEqual(10);
37
expect(kv.b).toEqual(20);
38
kv.c = 30;
39
expect(kv.c).toEqual(30);
40
// have to wait until it's all saved and acknowledged before enforcing limit
41
if (!kv.isStable()) {
42
await once(kv, "stable");
43
}
44
// next change is the enforcement happening
45
if (kv.has("a")) {
46
await once(kv, "change", 500);
47
}
48
// and confirm it
49
expect(kv.a).toBe(undefined);
50
expect(kv.getAll()).toEqual({ b: 20, c: 30 });
51
});
52
53
it("closes the kv", async () => {
54
await kv.clear();
55
await kv.close();
56
});
57
});
58
59
describe("create a dkv with limit on age of keys, and confirm auto-delete works", () => {
60
let kv;
61
const name = `test-${Math.random()}`;
62
63
it("creates the dkv", async () => {
64
kv = await createDkv({ client, name, config: { max_age: 50 } });
65
expect(kv.getAll()).toEqual({});
66
});
67
68
it("adds 2 keys, then a third, and sees first two are gone due to aging out", async () => {
69
kv.a = 10;
70
kv.b = 20;
71
expect(kv.a).toEqual(10);
72
expect(kv.b).toEqual(20);
73
await kv.save();
74
await kv.config();
75
await delay(50);
76
await kv.config();
77
await delay(10);
78
expect(kv.has("a")).toBe(false);
79
expect(kv.has("b")).toBe(false);
80
});
81
82
it("closes the kv", async () => {
83
await kv.clear();
84
await kv.close();
85
});
86
});
87
88
describe("create a dkv with limit on total bytes of keys, and confirm auto-delete works", () => {
89
let kv;
90
const name = `test-${Math.random()}`;
91
92
it("creates the dkv", async () => {
93
kv = await createDkv({ client, name, config: { max_bytes: 100 } });
94
expect(kv.getAll()).toEqual({});
95
});
96
97
it("adds a key, then a second, and sees first one is gone due to bytes", async () => {
98
kv.a = "x".repeat(50);
99
kv.b = "x".repeat(55);
100
expect(kv.getAll()).toEqual({ a: "x".repeat(50), b: "x".repeat(55) });
101
await kv.save();
102
expect(kv.has("b")).toBe(true);
103
await wait({
104
until: async () => {
105
await kv.config();
106
return !kv.has("a");
107
},
108
});
109
expect(kv.getAll()).toEqual({ b: "x".repeat(55) });
110
});
111
112
it("closes the kv", async () => {
113
await kv.clear();
114
await kv.close();
115
});
116
});
117
118
describe("create a dkv with limit on max_msg_size, and confirm writing small messages works but writing a big one result in a 'reject' event", () => {
119
let kv;
120
const name = `test-${Math.random()}`;
121
122
it("creates the dkv", async () => {
123
kv = await createDkv({ client, name, config: { max_msg_size: 100 } });
124
expect(kv.getAll()).toEqual({});
125
});
126
127
it("adds a key, then a second big one results in a 'reject' event", async () => {
128
const rejects: { key: string; value: string }[] = [];
129
kv.once("reject", (x) => {
130
rejects.push(x);
131
});
132
kv.a = "x".repeat(50);
133
await kv.save();
134
kv.b = "x".repeat(150);
135
await kv.save();
136
expect(rejects).toEqual([{ key: "b", value: "x".repeat(150) }]);
137
expect(kv.has("b")).toBe(false);
138
});
139
140
it("closes the kv", async () => {
141
await kv.clear();
142
await kv.close();
143
});
144
});
145
146
describe("create a dstream with limit on the total number of messages, and confirm max_msgs, max_age works", () => {
147
let s, s2;
148
const name = `test-${Math.random()}`;
149
150
it("creates the dstream and another with a different client", async () => {
151
s = await createDstream({ client, name, config: { max_msgs: 2 } });
152
s2 = await createDstream({
153
client: connect(),
154
name,
155
config: { max_msgs: 2 },
156
noCache: true,
157
});
158
expect(s.get()).toEqual([]);
159
expect((await s.config()).max_msgs).toBe(2);
160
expect((await s2.config()).max_msgs).toBe(2);
161
});
162
163
it("push 2 messages, then a third, and see first is gone and that this is reflected on both clients", async () => {
164
expect((await s.config()).max_msgs).toBe(2);
165
expect((await s2.config()).max_msgs).toBe(2);
166
s.push("a");
167
s.push("b");
168
await wait({ until: () => s.length == 2 && s2.length == 2 });
169
expect(s2.get()).toEqual(["a", "b"]);
170
s.push("c");
171
await wait({
172
until: () =>
173
s.get(0) != "a" &&
174
s.get(1) == "c" &&
175
s2.get(0) != "a" &&
176
s2.get(1) == "c",
177
});
178
expect(s.getAll()).toEqual(["b", "c"]);
179
expect(s2.getAll()).toEqual(["b", "c"]);
180
181
// also check limits ar enforced if we close, then open new one:
182
await s.close();
183
s = await createDstream({ client, name, config: { max_msgs: 2 } });
184
expect(s.getAll()).toEqual(["b", "c"]);
185
186
await s.config({ max_msgs: -1 });
187
});
188
189
it("verifies that max_age works", async () => {
190
await s.save();
191
expect(s.hasUnsavedChanges()).toBe(false);
192
await delay(300);
193
s.push("new");
194
await s.config({ max_age: 20 }); // anything older than 20ms should be deleted
195
await wait({ until: () => s.length == 1 });
196
expect(s.getAll()).toEqual(["new"]);
197
await s.config({ max_age: -1 });
198
});
199
200
it("verifies that ttl works", async () => {
201
const conf = await s.config();
202
expect(conf.allow_msg_ttl).toBe(false);
203
const conf2 = await s.config({ max_age: -1, allow_msg_ttl: true });
204
expect(conf2.allow_msg_ttl).toBe(true);
205
206
s.publish("ttl-message", { ttl: 50 });
207
await s.save();
208
await wait({
209
until: async () => {
210
await s.config();
211
return s.length == 1;
212
},
213
});
214
expect(s.get()).toEqual(["new"]);
215
});
216
217
it("verifies that max_bytes works -- publishing something too large causes everything to end up gone", async () => {
218
const conf = await s.config({ max_bytes: 100 });
219
expect(conf.max_bytes).toBe(100);
220
s.publish("x".repeat(1000));
221
await s.config();
222
await wait({ until: () => s.length == 0 });
223
expect(s.length).toBe(0);
224
});
225
226
it("max_bytes -- publish something then another thing that causes the first to get deleted", async () => {
227
s.publish("x".repeat(75));
228
s.publish("y".repeat(90));
229
await wait({
230
until: async () => {
231
await s.config();
232
return s.length == 1;
233
},
234
});
235
expect(s.get()).toEqual(["y".repeat(90)]);
236
await s.config({ max_bytes: -1 });
237
});
238
239
it("verifies that max_msg_size rejects messages that are too big", async () => {
240
await s.config({ max_msg_size: 100 });
241
expect((await s.config()).max_msg_size).toBe(100);
242
s.publish("x".repeat(70));
243
await expect(async () => {
244
await s.stream.publish("x".repeat(150));
245
}).rejects.toThrowError("max_msg_size");
246
await s.config({ max_msg_size: 200 });
247
s.publish("x".repeat(150));
248
await s.config({ max_msg_size: -1 });
249
expect((await s.config()).max_msg_size).toBe(-1);
250
});
251
252
it("closes the stream", async () => {
253
await s.close();
254
await s2.close();
255
});
256
});
257
258
describe("create a dstream with limit on max_age, and confirm auto-delete works", () => {
259
let s;
260
const name = `test-${Math.random()}`;
261
262
it("creates the dstream", async () => {
263
s = await createDstream({ client, name, config: { max_age: 50 } });
264
});
265
266
it("push a message, then another and see first disappears", async () => {
267
s.push({ a: 10 });
268
await delay(75);
269
s.push({ b: 20 });
270
expect(s.get()).toEqual([{ a: 10 }, { b: 20 }]);
271
await wait({
272
until: async () => {
273
await s.config();
274
return s.length == 1;
275
},
276
});
277
expect(s.getAll()).toEqual([{ b: 20 }]);
278
});
279
280
it("closes the stream", async () => {
281
await s.delete({ all: true });
282
await s.close();
283
});
284
});
285
286
describe("create a dstream with limit on max_bytes, and confirm auto-delete works", () => {
287
let s;
288
const name = `test-${Math.random()}`;
289
290
it("creates the dstream", async () => {
291
// note: 60 and not 40 due to slack for headers
292
s = await createDstream({ client, name, config: { max_bytes: 60 } });
293
});
294
295
it("push a message, then another and see first disappears", async () => {
296
s.push("x".repeat(40));
297
s.push("x".repeat(45));
298
s.push("x");
299
if (!s.isStable()) {
300
await once(s, "stable");
301
}
302
expect(s.getAll()).toEqual(["x".repeat(45), "x"]);
303
});
304
305
it("closes the stream", async () => {
306
await s.delete({ all: true });
307
await s.close();
308
});
309
});
310
311
describe("create a dstream with limit on max_msg_size, and confirm auto-delete works", () => {
312
let s;
313
const name = `test-${Math.random()}`;
314
315
it("creates the dstream", async () => {
316
s = await createDstream({ client, name, config: { max_msg_size: 50 } });
317
});
318
319
it("push a message, then another and see first disappears", async () => {
320
const rejects: any[] = [];
321
s.on("reject", ({ mesg }) => {
322
rejects.push(mesg);
323
});
324
s.push("x".repeat(40));
325
s.push("y".repeat(60)); // silently vanishes (well a reject event is emitted)
326
s.push("x");
327
await wait({
328
until: async () => {
329
await s.config();
330
return s.length == 2;
331
},
332
});
333
expect(s.getAll()).toEqual(["x".repeat(40), "x"]);
334
expect(rejects).toEqual(["y".repeat(60)]);
335
});
336
337
it("closes the stream", async () => {
338
await s.close();
339
});
340
});
341
342
describe("test discard_policy 'new' where writes are rejected rather than old data being deleted, for max_bytes and max_msgs", () => {
343
let s;
344
const name = `test-${Math.random()}`;
345
346
it("creates the dstream", async () => {
347
s = await createDstream({
348
client,
349
name,
350
// we can write at most 300 bytes and 3 messages. beyond that we
351
// get reject events.
352
config: {
353
max_bytes: 300,
354
max_msgs: 3,
355
discard_policy: "new",
356
desc: { example: "config" },
357
},
358
});
359
const rejects: any[] = [];
360
s.on("reject", ({ mesg }) => {
361
rejects.push(mesg);
362
});
363
s.publish("x");
364
s.publish("y");
365
s.publish("w");
366
s.publish("foo");
367
368
await wait({
369
until: async () => {
370
await s.config();
371
return rejects.length == 1;
372
},
373
});
374
expect(s.getAll()).toEqual(["x", "y", "w"]);
375
expect(rejects).toEqual(["foo"]);
376
377
s.publish("x".repeat(299));
378
await wait({
379
until: async () => {
380
await s.config();
381
return rejects.length == 2;
382
},
383
});
384
expect(s.getAll()).toEqual(["x", "y", "w"]);
385
expect(rejects).toEqual(["foo", "x".repeat(299)]);
386
});
387
388
it("check the config is persisted", async () => {
389
const lastConfig = await s.config();
390
s.close();
391
s = await createDstream({
392
client,
393
name,
394
noCache: true,
395
});
396
const config = await s.config();
397
expect(lastConfig).toEqual(config);
398
expect(lastConfig.desc).toEqual({ example: "config" });
399
});
400
401
it("closes the stream", async () => {
402
s.close();
403
});
404
});
405
406
describe("test rate limiting", () => {
407
let s;
408
const name = `test-${Math.random()}`;
409
410
it("creates the dstream", async () => {
411
s = await createDstream({
412
client,
413
name,
414
// we can write at most 300 bytes and 3 messages. beyond that we
415
// get reject events.
416
config: {
417
max_bytes_per_second: 300,
418
max_msgs_per_second: 3,
419
discard_policy: "new",
420
},
421
});
422
const rejects: any[] = [];
423
s.on("reject", ({ mesg }) => {
424
rejects.push(mesg);
425
});
426
});
427
428
it("closes the stream", async () => {
429
await s.close();
430
});
431
});
432
433
import { EPHEMERAL_MAX_BYTES } from "@cocalc/conat/persist/storage";
434
describe(`ephemeral streams always have a hard cap of ${EPHEMERAL_MAX_BYTES} on max_bytes `, () => {
435
let s;
436
it("creates a non-ephemeral dstream and checks no automatic max_bytes set", async () => {
437
const s1 = await createDstream({
438
client,
439
name: "test-NON-ephemeral",
440
ephemeral: false,
441
});
442
expect((await s1.config()).max_bytes).toBe(-1);
443
s1.close();
444
});
445
446
it("creates an ephemeral dstream and checks max bytes automatically set", async () => {
447
s = await createDstream({
448
client,
449
name: "test-ephemeral",
450
ephemeral: true,
451
});
452
expect((await s.config()).max_bytes).toBe(EPHEMERAL_MAX_BYTES);
453
});
454
455
it("trying to set larger doesn't work", async () => {
456
expect(
457
(await s.config({ max_bytes: 2 * EPHEMERAL_MAX_BYTES })).max_bytes,
458
).toBe(EPHEMERAL_MAX_BYTES);
459
expect((await s.config()).max_bytes).toBe(EPHEMERAL_MAX_BYTES);
460
});
461
462
it("setting it smaller is allowed", async () => {
463
expect(
464
(await s.config({ max_bytes: EPHEMERAL_MAX_BYTES / 2 })).max_bytes,
465
).toBe(EPHEMERAL_MAX_BYTES / 2);
466
expect((await s.config()).max_bytes).toBe(EPHEMERAL_MAX_BYTES / 2);
467
});
468
});
469
470
afterAll(after);
471
472