Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/backend/conat/test/sync/astream.test.ts
1451 views
1
/*
2
Testing basic ops with astream
3
4
DEVELOPMENT:
5
6
pnpm test ./astream.test.ts
7
8
*/
9
10
import { astream } from "@cocalc/backend/conat/sync";
11
import { before, after, connect } from "@cocalc/backend/conat/test/setup";
12
import { delay } from "awaiting";
13
14
beforeAll(before);
15
16
describe("test basics with an astream", () => {
17
let client, s, s2;
18
const name = "test-astream";
19
20
it("creates the astream, then publish and read a value", async () => {
21
client = connect();
22
s = astream({ name, client });
23
const { seq } = await s.publish("x");
24
expect(seq).toBe(1);
25
expect(await s.get(1)).toBe("x");
26
});
27
28
it("use a second astream", async () => {
29
s2 = astream({ name, client, noCache: true });
30
expect(await s2.get(1)).toBe("x");
31
s2.close();
32
});
33
34
it("publish a message with a header", async () => {
35
const { seq, time } = await s.publish("has a header", {
36
headers: { foo: "bar" },
37
});
38
expect(await s.get(seq)).toBe("has a header");
39
expect(await s.headers(seq)).toEqual(
40
expect.objectContaining({ foo: "bar" }),
41
);
42
// note that seq and time are also in the header
43
expect(await s.headers(seq)).toEqual({ foo: "bar", seq, time });
44
});
45
46
it("closes, then creates a new astream and sees data is there", async () => {
47
await s.close();
48
s = await astream({ name, client });
49
expect(await s.get(1)).toBe("x");
50
});
51
52
it("get full message, which has both the data and the headers", async () => {
53
const mesg = await s.getMessage(2);
54
expect(mesg.data).toBe("has a header");
55
expect(mesg.headers).toEqual(expect.objectContaining({ foo: "bar" }));
56
});
57
58
it("getAll messages", async () => {
59
const x = await s.getAll();
60
const { value } = await x.next();
61
expect(value.mesg).toBe("x");
62
expect(value.seq).toBe(1);
63
expect(Math.abs(value.time - Date.now())).toBeLessThan(5000);
64
const { value: value2 } = await x.next();
65
expect(value2.mesg).toBe("has a header");
66
expect(value2.headers).toEqual(expect.objectContaining({ foo: "bar" }));
67
expect(value2.seq).toBe(2);
68
expect(Math.abs(value2.time - Date.now())).toBeLessThan(5000);
69
const { done } = await x.next();
70
expect(done).toBe(true);
71
});
72
73
it("getAll messages starting from the second one", async () => {
74
const x = await s.getAll({ start_seq: 2, end_seq: 2 });
75
const { value } = await x.next();
76
expect(value.mesg).toBe("has a header");
77
expect(value.seq).toBe(2);
78
const { done } = await x.next();
79
expect(done).toBe(true);
80
});
81
82
it("getAll messages starting from the first and ending on the first", async () => {
83
const x = await s.getAll({ start_seq: 1, end_seq: 1 });
84
const { value } = await x.next();
85
expect(value.mesg).toBe("x");
86
expect(value.seq).toBe(1);
87
const { done } = await x.next();
88
expect(done).toBe(true);
89
});
90
91
it("cleans up", () => {
92
s.close();
93
});
94
});
95
96
const stress1 = 1e4;
97
describe(`stress test -- write, then read back, ${stress1} messages`, () => {
98
let client, s;
99
const name = "stress-test";
100
101
it("creates the astream", async () => {
102
client = connect();
103
s = await astream({ name, client });
104
});
105
106
it(`publishes ${stress1} messages`, async () => {
107
const v: number[] = [];
108
for (let i = 0; i < stress1; i++) {
109
v.push(i);
110
}
111
const z = await s.push(...v);
112
expect(z.length).toBe(stress1);
113
});
114
115
it(`reads back ${stress1} messages`, async () => {
116
const v: any[] = [];
117
for await (const x of await s.getAll()) {
118
v.push(x);
119
}
120
expect(v.length).toBe(stress1);
121
});
122
123
it("cleans up", () => {
124
s.close();
125
});
126
});
127
128
describe("test a changefeed", () => {
129
let client, s, s2, cf, cf2, cf2b;
130
const name = "test-astream";
131
132
it("creates two astreams and three changefeeds on them", async () => {
133
client = connect();
134
s = astream({ name, client });
135
cf = await s.changefeed();
136
s2 = astream({ name, client, noCache: true });
137
cf2 = await s2.changefeed();
138
cf2b = await s2.changefeed();
139
});
140
141
it("writes to the stream and sees this in the changefeed", async () => {
142
const first = cf.next();
143
const first2 = cf2.next();
144
const first2b = cf2b.next();
145
await s.publish("hi");
146
147
const { value, done } = await first;
148
expect(done).toBe(false);
149
150
expect(value.mesg).toBe("hi");
151
const { value: value2 } = await first2;
152
expect(value2.mesg).toBe("hi");
153
const { value: value2b } = await first2b;
154
expect(value2b.mesg).toBe("hi");
155
});
156
157
it("verify the three changefeeds are all distinct and do not interfere with each other", async () => {
158
// write 2 messages and see they are received independently
159
await s.publish("one");
160
await s.publish("two");
161
expect((await cf.next()).value.mesg).toBe("one");
162
expect((await cf.next()).value.mesg).toBe("two");
163
expect((await cf2.next()).value.mesg).toBe("one");
164
expect((await cf2b.next()).value.mesg).toBe("one");
165
expect((await cf2.next()).value.mesg).toBe("two");
166
expect((await cf2b.next()).value.mesg).toBe("two");
167
});
168
169
const stress = 10000;
170
it(`stress test -- write ${stress} values`, async () => {
171
const v: number[] = [];
172
for (let i = 0; i < stress; i++) {
173
v.push(i);
174
}
175
const z = await s.push(...v);
176
expect(z.length).toBe(v.length);
177
});
178
179
it(`stress test getting ${stress} values from a changefeed`, async () => {
180
for (let i = 0; i < stress; i++) {
181
await cf.next();
182
}
183
});
184
185
it("cleans up", () => {
186
s.close();
187
s2.close();
188
});
189
});
190
191
describe("test setting with key, ttl and msgID", () => {
192
let client, s;
193
const name = "test-astream-sets";
194
195
it("creates the astream, then publish and read a value", async () => {
196
client = connect();
197
s = astream({ name, client });
198
const { seq } = await s.publish("x", {
199
key: "y",
200
headers: { with: "key" },
201
});
202
expect(seq).toBe(1);
203
expect(await s.get(1)).toBe("x");
204
expect(await s.get("y")).toBe("x");
205
expect(await s.headers("y")).toEqual(
206
expect.objectContaining({ with: "key" }),
207
);
208
});
209
210
it("publish a value with msgID twice and sees that it only appears once", async () => {
211
const { seq } = await s.publish("foo", { msgID: "xx" });
212
const { seq: seq2 } = await s.publish("foo", { msgID: "xx" });
213
expect(seq).toEqual(seq2);
214
});
215
216
it("publish a value with ttl and sees it vanishes as expected", async () => {
217
await s.config({ allow_msg_ttl: true });
218
const { seq } = await s.publish("foo", { key: "i-have-ttl", ttl: 25 });
219
expect(await s.get("i-have-ttl")).toBe("foo");
220
await delay(50);
221
// call config to force enforcing limits
222
await s.config();
223
expect(await s.get("i-have-ttl")).toBe(undefined);
224
expect(await s.get(seq)).toBe(undefined);
225
});
226
227
it("cleans up", () => {
228
s.close();
229
});
230
});
231
232
afterAll(after);
233
234