Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/backend/conat/test/sync/dstream-ephemeral.test.ts
1451 views
1
/*
2
Testing basic ops with dsteam (distributed streams), but all are ephemeral.
3
4
The first tests are initially similar to those for dstream.test.ts, but with
5
{ephemeral: true}. There are also further tests of the client/server aspects.
6
7
DEVELOPMENT:
8
9
pnpm test ./dstream-ephemeral.test.ts
10
11
*/
12
13
import { connect, before, after, wait } from "@cocalc/backend/conat/test/setup";
14
import { createDstreamEphemeral as create } from "./util";
15
import { dstream as createDstream0 } from "@cocalc/backend/conat/sync";
16
//import { delay } from "awaiting";
17
18
beforeAll(before);
19
20
async function createDstream<T>(opts) {
21
return await createDstream0<T>({
22
noCache: true,
23
noAutosave: true,
24
ephemeral: true,
25
...opts,
26
});
27
}
28
29
jest.setTimeout(10000);
30
31
describe("create a dstream and do some basic operations", () => {
32
let s;
33
34
it("creates stream", async () => {
35
s = await create();
36
});
37
38
it("starts out empty", () => {
39
expect(s.getAll()).toEqual([]);
40
expect(s.length).toEqual(0);
41
});
42
43
const mesg = { stdout: "hello" };
44
it("publishes a message to the stream and confirms it is there", () => {
45
s.push(mesg);
46
expect(s.getAll()).toEqual([mesg]);
47
expect(s.length).toEqual(1);
48
expect(s[0]).toEqual(mesg);
49
});
50
51
it("verifies that unsaved changes works properly", async () => {
52
expect(s.hasUnsavedChanges()).toBe(true);
53
expect(s.unsavedChanges()).toEqual([mesg]);
54
await s.save();
55
expect(s.hasUnsavedChanges()).toBe(false);
56
expect(s.unsavedChanges()).toEqual([]);
57
});
58
59
it("confirm ephemeralness: closes and re-opens stream and confirms message is NOT there", async () => {
60
const name = s.name;
61
await s.save();
62
// close s:
63
await s.close();
64
// using s fails
65
expect(s.getAll).toThrow("closed");
66
// wait for server to discard stream data
67
// (it's instant right now!)
68
// create new stream with same name
69
const t = await createDstream({ name });
70
// ensure it is NOT just from the cache
71
expect(s === t).toBe(false);
72
// make sure it does NOT have our message (it should not -- it's ephemeral)
73
expect(t.getAll()).toEqual([]);
74
});
75
});
76
77
describe("create two dstreams and observe sync between them", () => {
78
const name = `test-${Math.random()}`;
79
let s1, s2;
80
let client2;
81
it("creates two distinct dstream objects s1 and s2 with the same name", async () => {
82
client2 = connect();
83
s1 = await createDstream({ name });
84
s2 = await createDstream({ client: client2, name });
85
// definitely distinct
86
expect(s1 === s2).toBe(false);
87
});
88
89
it("writes to s1 and observes s2 doesn't see anything until we save", async () => {
90
s1.push("hello");
91
expect(s1[0]).toEqual("hello");
92
expect(s2.length).toEqual(0);
93
await s1.save();
94
await wait({ until: () => s2[0] == "hello" });
95
expect(s2[0]).toEqual("hello");
96
expect(s2.getAll()).toEqual(["hello"]);
97
});
98
99
it("now write to s2 and save and see that reflected in s1", async () => {
100
s2.push("hi from s2");
101
await s2.save();
102
await wait({ until: () => s1[1] == "hi from s2" && s2[1] == "hi from s2" });
103
expect(s1[1]).toEqual("hi from s2");
104
expect(s2[1]).toEqual("hi from s2");
105
});
106
107
it("s1.stream and s2.stream should be the same right now", () => {
108
expect(s1.stream.getAll()).toEqual(["hello", "hi from s2"]);
109
expect(s2.stream.getAll()).toEqual(["hello", "hi from s2"]);
110
});
111
112
it("s1 and s2 should be the same right now", () => {
113
expect(s1.getAll()).toEqual(["hello", "hi from s2"]);
114
expect(s2.getAll()).toEqual(["hello", "hi from s2"]);
115
});
116
117
it("cleans up", () => {
118
s1.close();
119
s2.close();
120
client2.close();
121
});
122
});
123
124
describe("create two dstreams and test sync with parallel save", () => {
125
const name = `test-${Math.random()}`;
126
let s1, s2;
127
let client2;
128
it("creates two distinct dstream objects s1 and s2 with the same name", async () => {
129
client2 = connect();
130
s1 = await createDstream({ name });
131
s2 = await createDstream({ client: client2, name });
132
// definitely distinct
133
expect(s1 === s2).toBe(false);
134
});
135
136
it("write to s1 and s2 and save at the same time", async () => {
137
s1.push("s1");
138
s2.push("s2");
139
// our changes are reflected locally
140
expect(s1.getAll()).toEqual(["s1"]);
141
expect(s2.getAll()).toEqual(["s2"]);
142
// now kick off the two saves *in parallel*
143
s1.save();
144
s2.save();
145
await wait({ until: () => s1.length >= 2 && s2.length >= 2 });
146
expect(s1.getAll()).toEqual(s2.getAll());
147
});
148
149
it("cleans up", () => {
150
client2.close();
151
});
152
});
153
154
describe("get sequence number and time of message", () => {
155
let s;
156
157
it("creates stream and write message", async () => {
158
s = await create();
159
s.push("hello");
160
});
161
162
it("sequence number is initialized undefined because it is server assigned ", async () => {
163
const n = s.seq(0);
164
expect(n).toBe(undefined);
165
});
166
167
it("time also undefined because it is server assigned ", async () => {
168
const t = s.time(0);
169
expect(t).toBe(undefined);
170
});
171
172
it("save and get server assigned sequence number", async () => {
173
s.save();
174
await wait({ until: () => s.seq(0) > 0 });
175
const n = s.seq(0);
176
expect(n).toBeGreaterThan(0);
177
});
178
179
it("get server assigned time", async () => {
180
const t = s.time(0);
181
// since testing on the same machine as server, these times should be close:
182
expect(t.getTime() - Date.now()).toBeLessThan(5000);
183
});
184
185
it("publish another message and get next server number is bigger", async () => {
186
const n = s.seq(0);
187
s.push("there");
188
await s.save();
189
const m = s.seq(1);
190
expect(m).toBeGreaterThan(n);
191
});
192
193
it("and time is bigger", async () => {
194
await wait({ until: () => s.time(1) != null });
195
expect(s.time(0).getTime()).toBeLessThanOrEqual(s.time(1).getTime());
196
});
197
});
198
199
describe("testing start_seq", () => {
200
const name = `test-${Math.random()}`;
201
let seq;
202
it("creates a stream and adds 3 messages, noting their assigned sequence numbers", async () => {
203
const s = await createDstream({ name, noAutosave: true });
204
s.push(1, 2, 3);
205
expect(s.getAll()).toEqual([1, 2, 3]);
206
// save, thus getting sequence numbers
207
s.save();
208
await wait({ until: () => s.seq(2) != null });
209
seq = [s.seq(0), s.seq(1), s.seq(2)];
210
// tests partly that these are integers...
211
const n = seq.reduce((a, b) => a + b, 0);
212
expect(typeof n).toBe("number");
213
expect(n).toBeGreaterThan(2);
214
});
215
216
let t;
217
it("it opens another copy of the stream, but starting with the last sequence number, so only one message", async () => {
218
const client = connect();
219
t = await createDstream({
220
client,
221
name,
222
noAutosave: true,
223
start_seq: seq[2],
224
});
225
expect(t.length).toBe(1);
226
expect(t.getAll()).toEqual([3]);
227
expect(t.start_seq).toEqual(seq[2]);
228
});
229
230
it("it then pulls in the previous message, so now two messages are loaded", async () => {
231
await t.load({ start_seq: seq[1] });
232
expect(t.length).toBe(2);
233
expect(t.getAll()).toEqual([2, 3]);
234
expect(t.start_seq).toEqual(seq[1]);
235
});
236
});
237
238
describe("a little bit of a stress test", () => {
239
const name = `test-${Math.random()}`;
240
const count = 100;
241
let s;
242
it(`creates a stream and pushes ${count} messages`, async () => {
243
s = await createDstream({
244
name,
245
noAutosave: true,
246
});
247
for (let i = 0; i < count; i++) {
248
s.push({ i });
249
}
250
expect(s.length).toBe(count);
251
// [ ] TODO rewrite this save to send everything in a single message
252
// which gets chunked, will we be much faster, then change the count
253
// above to 1000 or 10000.
254
await s.save();
255
expect(s.length).toBe(count);
256
});
257
});
258
259
describe("dstream typescript test", () => {
260
it("creates stream", async () => {
261
const name = `test-${Math.random()}`;
262
const s = await createDstream<string>({ name });
263
264
// write a message with the correct type
265
s.push("foo");
266
267
// wrong type -- no way to test this, but if you uncomment
268
// this you should get a typescript error:
269
270
// s.push({ foo: "bar" });
271
});
272
});
273
274
describe("ensure there isn't a really obvious subscription leak", () => {
275
let client;
276
277
it("create a client, which initially has only one subscription (the inbox)", async () => {
278
client = connect();
279
expect(client.numSubscriptions()).toBe(0);
280
await client.getInbox();
281
expect(client.numSubscriptions()).toBe(1);
282
});
283
284
const count = 100;
285
it(`creates and closes ${count} streams and checks there is no leak`, async () => {
286
const before = client.numSubscriptions();
287
// create
288
const a: any = [];
289
for (let i = 0; i < count; i++) {
290
a[i] = await createDstream({
291
name: `${Math.random()}`,
292
});
293
}
294
for (let i = 0; i < count; i++) {
295
await a[i].close();
296
}
297
const after = client.numSubscriptions();
298
expect(after).toBe(before);
299
300
// also check count on server went down.
301
expect((await client.getSubscriptions()).size).toBe(before);
302
});
303
304
it("does another leak test, but with a publish operation each time", async () => {
305
const before = client.numSubscriptions();
306
// create
307
const a: any = [];
308
for (let i = 0; i < count; i++) {
309
a[i] = await createDstream({
310
name: `${Math.random()}`,
311
noAutosave: true,
312
});
313
a[i].publish(i);
314
await a[i].save();
315
}
316
for (let i = 0; i < count; i++) {
317
await a[i].close();
318
}
319
const after = client.numSubscriptions();
320
expect(after).toBe(before);
321
});
322
});
323
324
afterAll(after);
325
326