Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/backend/conat/test/sync/dstream.test.ts
1451 views
1
/*
2
Testing basic ops with *persistent* dstreams.
3
4
DEVELOPMENT:
5
6
pnpm test ./dstream.test.ts
7
8
*/
9
10
import { createDstream as create } from "./util";
11
import { dstream as createDstream } from "@cocalc/backend/conat/sync";
12
import { once } from "@cocalc/util/async-utils";
13
import { connect, before, after, wait } from "@cocalc/backend/conat/test/setup";
14
15
beforeAll(before);
16
17
jest.setTimeout(10000);
18
19
describe("create a dstream and do some basic operations", () => {
20
let s;
21
22
it("creates stream", async () => {
23
s = await create();
24
});
25
26
it("starts out empty", () => {
27
expect(s.getAll()).toEqual([]);
28
expect(s.length).toEqual(0);
29
});
30
31
const mesg = { stdout: "hello" };
32
it("publishes a message to the stream and confirms it is there", () => {
33
s.push(mesg);
34
expect(s.getAll()).toEqual([mesg]);
35
expect(s.length).toEqual(1);
36
expect(s[0]).toEqual(mesg);
37
});
38
39
it("verifies that unsaved changes works properly", async () => {
40
expect(s.hasUnsavedChanges()).toBe(true);
41
expect(s.unsavedChanges()).toEqual([mesg]);
42
await s.save();
43
expect(s.hasUnsavedChanges()).toBe(false);
44
expect(s.unsavedChanges()).toEqual([]);
45
});
46
47
it("confirm persistence: closes and re-opens stream and confirms message is still there", async () => {
48
const name = s.name;
49
await s.save();
50
// close s:
51
await s.close();
52
// using s fails
53
expect(s.getAll).toThrow("closed");
54
// create new stream with same name
55
const t = await createDstream({ name });
56
// ensure it is NOT just from the cache
57
expect(s === t).toBe(false);
58
// make sure it has our message
59
expect(t.getAll()).toEqual([mesg]);
60
});
61
});
62
63
describe("create two dstreams and observe sync between them", () => {
64
const name = `test-${Math.random()}`;
65
let s1, s2;
66
it("creates two distinct dstream objects s1 and s2 with the same name", async () => {
67
s1 = await createDstream({ name, noAutosave: true, noCache: true });
68
s2 = await createDstream({ name, noAutosave: true, noCache: true });
69
// definitely distinct
70
expect(s1 === s2).toBe(false);
71
});
72
73
it("writes to s1 and observes s2 doesn't see anything until we save", async () => {
74
s1.push("hello");
75
expect(s1[0]).toEqual("hello");
76
expect(s2.length).toEqual(0);
77
s1.save();
78
await once(s2, "change");
79
expect(s2[0]).toEqual("hello");
80
expect(s2.getAll()).toEqual(["hello"]);
81
});
82
83
it("now write to s2 and save and see that reflected in s1", async () => {
84
s2.push("hi from s2");
85
s2.save();
86
while (s1[1] != "hi from s2") {
87
await once(s1, "change");
88
}
89
expect(s1[1]).toEqual("hi from s2");
90
});
91
92
it("write to s1 and s2 and save at the same time and see some 'random choice' of order gets imposed by the server", async () => {
93
s1.push("s1");
94
s2.push("s2");
95
// our changes are reflected locally
96
expect(s1.getAll()).toEqual(["hello", "hi from s2", "s1"]);
97
expect(s2.getAll()).toEqual(["hello", "hi from s2", "s2"]);
98
// now kick off the two saves *in parallel*
99
s1.save();
100
s2.save();
101
await wait({
102
until: () => {
103
return s1.length == 4 && s2.length == 4;
104
},
105
});
106
expect(s1.getAll()).toEqual(s2.getAll());
107
expect(new Set(s1.getAll())).toEqual(
108
new Set(["hello", "hi from s2", "s1", "s2"]),
109
);
110
});
111
});
112
113
describe("get sequence number and time of message", () => {
114
let s;
115
116
it("creates stream and write message", async () => {
117
s = await create();
118
s.push("hello");
119
});
120
121
it("sequence number is initialized undefined because it is server assigned ", async () => {
122
const n = s.seq(0);
123
expect(n).toBe(undefined);
124
});
125
126
it("time also undefined because it is server assigned ", async () => {
127
const t = s.time(0);
128
expect(t).toBe(undefined);
129
});
130
131
it("save and get server assigned sequence number", async () => {
132
s.save();
133
await once(s, "change");
134
const n = s.seq(0);
135
expect(n).toBeGreaterThan(0);
136
});
137
138
it("get server assigned time", async () => {
139
const t = s.time(0);
140
// since testing on the same machine as server, these times should be close:
141
expect(t.getTime() - Date.now()).toBeLessThan(5000);
142
});
143
144
it("publish another message and get next server number is bigger", async () => {
145
const n = s.seq(0);
146
s.push("there");
147
await s.save();
148
const m = s.seq(1);
149
expect(m).toBeGreaterThan(n);
150
});
151
152
it("and time is bigger", async () => {
153
if (s.time(1) == null) {
154
await once(s, "change");
155
}
156
expect(s.time(0).getTime()).toBeLessThan(s.time(1).getTime());
157
});
158
});
159
160
describe("closing also saves by default, but not if autosave is off", () => {
161
let s;
162
const name = `test-${Math.random()}`;
163
164
it("creates stream and write a message", async () => {
165
// noAutosave: false is the default:
166
s = await createDstream({ name, noAutosave: false });
167
s.push(389);
168
});
169
170
it("closes then opens and message is there, since autosave is on", async () => {
171
await s.close();
172
const t = await createDstream({ name });
173
expect(t[0]).toEqual(389);
174
});
175
176
it("make another stream with autosave off, and close which causes LOSS OF DATA", async () => {
177
const name = `test-${Math.random()}`;
178
const s = await createDstream({ name, noAutosave: true });
179
s.push(389);
180
s.close();
181
const t = await createDstream({ name, noAutosave: true });
182
// data is gone forever!
183
expect(t.length).toBe(0);
184
});
185
});
186
187
describe("testing start_seq", () => {
188
const name = `test-${Math.random()}`;
189
let seq;
190
it("creates a stream and adds 3 messages, noting their assigned sequence numbers", async () => {
191
const s = await createDstream({ name, noAutosave: true });
192
s.push(1, 2, 3);
193
expect(s.getAll()).toEqual([1, 2, 3]);
194
// save, thus getting sequence numbers
195
s.save();
196
while (s.seq(2) == null) {
197
s.save();
198
await once(s, "change");
199
}
200
seq = [s.seq(0), s.seq(1), s.seq(2)];
201
// tests partly that these are integers...
202
const n = seq.reduce((a, b) => a + b, 0);
203
expect(typeof n).toBe("number");
204
expect(n).toBeGreaterThan(2);
205
await s.close();
206
});
207
208
let s;
209
it("it opens the stream but starting with the last sequence number, so only one message", async () => {
210
s = await createDstream({
211
name,
212
noAutosave: true,
213
start_seq: seq[2],
214
});
215
expect(s.length).toBe(1);
216
expect(s.getAll()).toEqual([3]);
217
expect(s.start_seq).toEqual(seq[2]);
218
});
219
220
it("it then pulls in the previous message, so now two messages are loaded", async () => {
221
await s.load({ start_seq: seq[1] });
222
expect(s.length).toBe(2);
223
expect(s.getAll()).toEqual([2, 3]);
224
expect(s.start_seq).toEqual(seq[1]);
225
});
226
227
it("a bigger example involving loading older messages", async () => {
228
for (let i = 4; i < 100; i++) {
229
s.push(i);
230
}
231
await s.save();
232
const last = s.seq(s.length - 1);
233
const mid = s.seq(s.length - 50);
234
await s.close();
235
s = await createDstream({
236
name,
237
noAutosave: true,
238
start_seq: last,
239
});
240
expect(s.length).toBe(1);
241
expect(s.getAll()).toEqual([99]);
242
expect(s.start_seq).toEqual(last);
243
244
await s.load({ start_seq: mid });
245
expect(s.length).toEqual(50);
246
expect(s.start_seq).toEqual(mid);
247
for (let i = 0; i < 50; i++) {
248
expect(s.get(i)).toBe(i + 50);
249
}
250
251
await s.load({ start_seq: 0 });
252
for (let i = 0; i < 99; i++) {
253
expect(s.get(i)).toBe(i + 1);
254
}
255
});
256
});
257
258
describe("a little bit of a stress test", () => {
259
const name = `test-${Math.random()}`;
260
const count = 100;
261
let s;
262
it(`creates a stream and pushes ${count} messages`, async () => {
263
s = await createDstream({
264
name,
265
noAutosave: true,
266
});
267
for (let i = 0; i < count; i++) {
268
s.push({ i });
269
}
270
expect(s.length).toBe(count);
271
// NOTE: warning -- this is **MUCH SLOWER**, e.g., 10x slower,
272
// running under jest, hence why count is small.
273
await s.save();
274
expect(s.length).toBe(count);
275
});
276
});
277
278
describe("dstream typescript test", () => {
279
it("creates stream", async () => {
280
const name = `test-${Math.random()}`;
281
const s = await createDstream<string>({ name });
282
283
// write a message with the correct type
284
s.push("foo");
285
286
// wrong type -- no way to test this, but if you uncomment
287
// this you should get a typescript error:
288
289
// s.push({ foo: "bar" });
290
});
291
});
292
293
describe("ensure there isn't a really obvious subscription leak", () => {
294
let client;
295
296
it("create a client, which initially has only one subscription (the inbox)", async () => {
297
client = connect();
298
await client.getInbox();
299
expect(client.numSubscriptions()).toBe(1);
300
});
301
302
const count = 100;
303
it(`creates and closes ${count} streams and checks there is no leak`, async () => {
304
const before = client.numSubscriptions();
305
// create
306
const a: any = [];
307
for (let i = 0; i < count; i++) {
308
a[i] = await createDstream({
309
name: `${Math.random()}`,
310
});
311
}
312
for (let i = 0; i < count; i++) {
313
await a[i].close();
314
}
315
const after = client.numSubscriptions();
316
expect(after).toBe(before);
317
318
// also check count on server went down.
319
expect((await client.getSubscriptions()).size).toBe(before);
320
});
321
322
it("does another leak test, but with a publish operation each time", async () => {
323
const before = client.numSubscriptions();
324
// create
325
const a: any = [];
326
for (let i = 0; i < count; i++) {
327
a[i] = await createDstream({
328
name: `${Math.random()}`,
329
noAutosave: true,
330
});
331
a[i].publish(i);
332
await a[i].save();
333
}
334
for (let i = 0; i < count; i++) {
335
await a[i].close();
336
}
337
const after = client.numSubscriptions();
338
expect(after).toBe(before);
339
});
340
});
341
342
describe("test delete of messages from stream", () => {
343
let client1, client2, s1, s2;
344
const name = "test-delete";
345
it("create two clients", async () => {
346
client1 = connect();
347
client2 = connect();
348
s1 = await createDstream({
349
client: client1,
350
name,
351
noAutosave: true,
352
noCache: true,
353
});
354
s2 = await createDstream({
355
client: client2,
356
name,
357
noAutosave: true,
358
noCache: true,
359
});
360
});
361
362
it("writes message one, confirm seen by other, then delete and confirm works", async () => {
363
s1.push("hello");
364
await s1.save();
365
await wait({ until: () => s2.length > 0 });
366
s1.delete({ all: true });
367
await wait({ until: () => s2.length == 0 && s1.length == 0 });
368
});
369
370
it("same delete test as above but with a few more items and delete on s2 instead", async () => {
371
for (let i = 0; i < 10; i++) {
372
s1.push(i);
373
}
374
await s1.save();
375
await wait({ until: () => s2.length == 10 });
376
s2.delete({ all: true });
377
await wait({ until: () => s2.length == 0 && s1.length == 0 });
378
});
379
380
it("delete specific index", async () => {
381
s1.push("x", "y", "z");
382
await s1.save();
383
await wait({ until: () => s2.length == 3 });
384
s2.delete({ last_index: 1 });
385
await wait({ until: () => s2.length == 1 && s1.length == 1 });
386
expect(s1.get()).toEqual(["z"]);
387
});
388
389
it("delete specific seq number", async () => {
390
s1.push("x", "y");
391
await s1.save();
392
expect(s1.get()).toEqual(["z", "x", "y"]);
393
const seq = s1.seq(1);
394
const { seqs } = await s1.delete({ seq });
395
expect(seqs).toEqual([seq]);
396
await wait({ until: () => s2.length == 2 && s1.length == 2 });
397
expect(s1.get()).toEqual(["z", "y"]);
398
});
399
400
it("delete up to a sequence number", async () => {
401
s1.push("x", "y");
402
await s1.save();
403
expect(s1.get()).toEqual(["z", "y", "x", "y"]);
404
const seq = s1.seq(1);
405
const { seqs } = await s1.delete({ last_seq: seq });
406
expect(seqs.length).toBe(2);
407
expect(seqs[1]).toBe(seq);
408
await wait({ until: () => s1.length == 2 });
409
expect(s1.get()).toEqual(["x", "y"]);
410
});
411
});
412
413
afterAll(after);
414
415