Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/backend/conat/test/persist/persist-client.test.ts
1451 views
1
/*
2
Tests of persist client.
3
4
pnpm test ./persist-client.test.ts
5
6
*/
7
8
import {
9
before,
10
after,
11
connect,
12
restartServer,
13
restartPersistServer,
14
wait,
15
} from "@cocalc/backend/conat/test/setup";
16
import { stream } from "@cocalc/conat/persist/client";
17
import { messageData } from "@cocalc/conat/core/client";
18
import { delay } from "awaiting";
19
20
beforeAll(before);
21
22
jest.setTimeout(10000);
23
describe("create a persist client stream and test the basic operations", () => {
24
let client, s1;
25
26
it("creates a client and stream", () => {
27
client = connect();
28
s1 = stream({
29
client,
30
user: { hub_id: "x" },
31
storage: { path: "hub/foo" },
32
});
33
});
34
35
let seq0;
36
it("write a value to the stream", async () => {
37
const { seq, time } = await s1.set({
38
messageData: messageData("hi", { headers: { foo: "bar" } }),
39
});
40
expect(Math.abs(time - Date.now())).toBeLessThan(1000);
41
seq0 = seq;
42
});
43
44
it("get the value back", async () => {
45
const mesg = await s1.get({ seq: seq0 });
46
expect(mesg.data).toBe("hi");
47
expect(mesg.headers.foo).toBe("bar");
48
});
49
50
it("writes a value with a key", async () => {
51
await s1.set({
52
key: "my-key",
53
messageData: messageData("value", { headers: { foo: "bar" } }),
54
});
55
const mesg = await s1.get({ key: "my-key" });
56
expect(mesg.data).toBe("value");
57
});
58
});
59
60
describe("restarting persist server", () => {
61
let client, s1;
62
63
it("creates a client and stream and write test data", async () => {
64
client = connect();
65
s1 = stream({
66
client,
67
user: { hub_id: "x" },
68
storage: { path: "hub/bar" },
69
});
70
await s1.set({
71
key: "test",
72
messageData: messageData("data", { headers: { foo: "bar" } }),
73
});
74
});
75
76
it("restart the persist server", async () => {
77
await restartPersistServer();
78
});
79
80
it("first attempt to read the data written above fails because persist server hasn't started yet", async () => {
81
await expect(async () => {
82
await s1.get({ key: "test", timeout: 500 });
83
}).rejects.toThrow("no subscribers");
84
});
85
86
jest.setTimeout(10000);
87
it("it does start working relatively quickly though", async () => {
88
await wait({
89
until: async () => {
90
try {
91
await s1.get({ key: "test", timeout: 1500 });
92
return true;
93
} catch {}
94
},
95
});
96
97
const mesg = await s1.get({ key: "test" });
98
expect(mesg.data).toBe("data");
99
});
100
});
101
102
describe("restarting persist server with an ephemeral stream", () => {
103
let client, s1;
104
105
it("creates a client and an ephemeral stream and write test data", async () => {
106
client = connect();
107
s1 = stream({
108
client,
109
user: { hub_id: "x" },
110
storage: { path: "hub/in-memory-only", ephemeral: true },
111
});
112
await s1.set({
113
key: "test",
114
messageData: messageData("data", { headers: { foo: "bar" } }),
115
});
116
});
117
118
it("restart the persist server", async () => {
119
await restartPersistServer();
120
});
121
122
it("our data is gone - it's ephemeral", async () => {
123
s1 = stream({
124
client,
125
user: { hub_id: "x" },
126
storage: { path: "hub/in-memory-onl", ephemeral: true },
127
});
128
await wait({
129
until: async () => {
130
try {
131
const mesg = await s1.get({ key: "test", timeout: 500 });
132
return mesg === undefined;
133
} catch {}
134
},
135
});
136
137
expect(await s1.get({ key: "test" })).toBe(undefined);
138
});
139
});
140
141
describe("restarting the network but not the persist server", () => {
142
let client, s1;
143
144
it("creates a client and stream and write test data", async () => {
145
client = connect();
146
s1 = stream({
147
client,
148
user: { hub_id: "x" },
149
storage: { path: "hub/network" },
150
});
151
await s1.set({
152
key: "test",
153
messageData: messageData("data", { headers: { foo: "bar" } }),
154
});
155
});
156
157
it("restart conat networking", async () => {
158
await restartServer();
159
});
160
161
it("it does start working eventually", async () => {
162
await wait({
163
until: async () => {
164
try {
165
await s1.get({ key: "test", timeout: 1000 });
166
return true;
167
} catch {}
168
},
169
});
170
const mesg = await s1.get({ key: "test" });
171
expect(mesg.data).toBe("data");
172
});
173
});
174
175
describe("test a changefeed", () => {
176
let client, s1, cf;
177
178
it("creates a client, stream and changefeed", async () => {
179
client = connect();
180
s1 = stream({
181
client,
182
user: { hub_id: "x" },
183
storage: { path: "hub/changefeed" },
184
});
185
cf = await s1.changefeed();
186
});
187
188
it("write and see result via changefeed", async () => {
189
await s1.set({
190
key: "test",
191
messageData: messageData("data", { headers: { foo: "bar" } }),
192
});
193
const { value: updates, done } = await cf.next();
194
expect(done).toBe(false);
195
expect(updates[0]).toEqual(
196
expect.objectContaining({
197
seq: 1,
198
key: "test",
199
headers: { foo: "bar" },
200
}),
201
);
202
});
203
204
let s2, client2;
205
it("write via another client and see result via changefeed", async () => {
206
client2 = connect();
207
s2 = stream({
208
client: client2,
209
user: { hub_id: "x" },
210
storage: { path: "hub/changefeed" },
211
});
212
expect(s1).not.toBe(s2);
213
await s2.set({
214
key: "test2",
215
messageData: messageData("data2", { headers: { foo: "bar2" } }),
216
});
217
218
const { value: updates, done } = await cf.next();
219
expect(done).toBe(false);
220
expect(updates[0]).toEqual(
221
expect.objectContaining({
222
seq: 2,
223
key: "test2",
224
headers: { foo: "bar2" },
225
}),
226
);
227
expect(updates[0].seq).toBe(2);
228
expect(updates.length).toBe(1);
229
});
230
231
// this takes a while due to it having to deal with the network restart
232
it("restart conat socketio server, and verify changefeed still works", async () => {
233
// send one more
234
await s2.set({
235
key: "test3",
236
messageData: messageData("data3", { headers: { foo: "bar3" } }),
237
});
238
await restartServer();
239
await wait({
240
until: async () => {
241
// this set is expected to fail while networking is restarting
242
try {
243
await s1.set({
244
key: "test4",
245
messageData: messageData("data4", { headers: { foo: "bar4" } }),
246
timeout: 1000,
247
});
248
return true;
249
} catch {
250
return false;
251
}
252
},
253
start: 500,
254
});
255
256
// all three updates must get through, and in the correct order
257
const { value: updates0, done: done0 } = await cf.next();
258
expect(done0).toBe(false);
259
expect(updates0[0].seq).toBe(3);
260
// its random whether or not test4 comes through as part of the
261
// first group or not. The ones sent when offline always come
262
// together in a group.
263
if (updates0.length >= 2) {
264
expect(updates0[1].seq).toBe(4);
265
} else {
266
const { value: updates1 } = await cf.next();
267
expect(updates1[0].seq).toBe(4);
268
}
269
});
270
271
it("restart the persist server -- this is pretty brutal", async () => {
272
await restartPersistServer();
273
});
274
275
it("set still works (with error) after restarting persist server", async () => {
276
// doing this set should fail due to persist for a second due server being
277
// off and having to connect again.
278
await wait({
279
until: async () => {
280
try {
281
await s2.set({
282
key: "test4",
283
messageData: messageData("data4", { headers: { foo: "bar4" } }),
284
timeout: 500,
285
});
286
287
return true;
288
} catch {
289
return false;
290
}
291
},
292
});
293
const mesg = await s2.get({ key: "test4" });
294
expect(mesg.data).toBe("data4");
295
});
296
297
it("changefeed still works after restarting persist server, though what gets received is somewhat random -- the persist server doesn't have its own state so can't guarantee continguous changefeeds when it restarts", async () => {
298
await delay(1000);
299
await s2.set({
300
key: "test5",
301
messageData: messageData("data5", { headers: { foo: "bar5" } }),
302
timeout: 1000,
303
});
304
const { value: updates, done } = await cf.next();
305
expect(done).toBe(false);
306
// changefeed may or may not have dropped a message, depending on timing
307
expect(updates[0].headers?.foo?.startsWith("bar")).toBe(true);
308
});
309
});
310
311
afterAll(after);
312
313