Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/conat/core/cluster.ts
1542 views
1
import { type Client, connect } from "./client";
2
import { Patterns } from "./patterns";
3
import {
4
updateInterest,
5
updateSticky,
6
type InterestUpdate,
7
type StickyUpdate,
8
} from "@cocalc/conat/core/server";
9
import type { DStream } from "@cocalc/conat/sync/dstream";
10
import { once } from "@cocalc/util/async-utils";
11
import { server as createPersistServer } from "@cocalc/conat/persist/server";
12
import { getLogger } from "@cocalc/conat/client";
13
import { hash_string } from "@cocalc/util/misc";
14
const CREATE_LINK_TIMEOUT = 45_000;
15
16
const logger = getLogger("conat:core:cluster");
17
18
export async function clusterLink(
19
address: string,
20
systemAccountPassword: string,
21
timeout = CREATE_LINK_TIMEOUT,
22
) {
23
const client = connect({ address, systemAccountPassword });
24
if (client.info == null) {
25
try {
26
await client.waitUntilSignedIn({
27
timeout: timeout ?? CREATE_LINK_TIMEOUT,
28
});
29
} catch (err) {
30
client.close();
31
throw err;
32
}
33
if (client.info == null) {
34
// this is impossible
35
throw Error("BUG -- failed to sign in");
36
}
37
}
38
const { id, clusterName } = client.info;
39
if (!id) {
40
throw Error("id must be specified");
41
}
42
if (!clusterName) {
43
throw Error("clusterName must be specified");
44
}
45
const link = new ClusterLink(client, id, clusterName, address);
46
await link.init();
47
return link;
48
}
49
50
export type Sticky = { [pattern: string]: { [subject: string]: string } };
51
export type Interest = Patterns<{ [queue: string]: Set<string> }>;
52
53
export { type ClusterLink };
54
55
class ClusterLink {
56
public interest: Interest = new Patterns();
57
public sticky: Sticky = {};
58
private streams: ClusterStreams;
59
private state: "init" | "ready" | "closed" = "init";
60
private clientStateChanged = Date.now(); // when client status last changed
61
62
constructor(
63
public readonly client: Client,
64
public readonly id: string,
65
public readonly clusterName: string,
66
public readonly address: string,
67
) {
68
if (!client) {
69
throw Error("client must be specified");
70
}
71
if (!clusterName) {
72
throw Error("clusterName must be specified");
73
}
74
if (!id) {
75
throw Error("id must be specified");
76
}
77
}
78
79
init = async () => {
80
this.client.on("connected", this.handleClientStateChanged);
81
this.client.on("disconnected", this.handleClientStateChanged);
82
this.streams = await clusterStreams({
83
client: this.client,
84
id: this.id,
85
clusterName: this.clusterName,
86
});
87
for (const update of this.streams.interest.getAll()) {
88
updateInterest(update, this.interest, this.sticky);
89
}
90
for (const update of this.streams.sticky.getAll()) {
91
updateSticky(update, this.sticky);
92
}
93
// I have a slight concern about this because updates might not
94
// arrive in order during automatic failover. That said, maybe
95
// automatic failover doesn't matter with these streams, since
96
// it shouldn't really happen -- each stream is served from the server
97
// it is about, and when that server goes down none of this state
98
// matters anymore.
99
this.streams.interest.on("change", this.handleInterestUpdate);
100
this.streams.sticky.on("change", this.handleStickyUpdate);
101
this.state = "ready";
102
};
103
104
isConnected = () => {
105
return this.client.state == "connected";
106
};
107
108
handleInterestUpdate = (update: InterestUpdate) => {
109
updateInterest(update, this.interest, this.sticky);
110
};
111
112
handleStickyUpdate = (update: StickyUpdate) => {
113
updateSticky(update, this.sticky);
114
};
115
116
private handleClientStateChanged = () => {
117
this.clientStateChanged = Date.now();
118
};
119
120
howLongDisconnected = () => {
121
if (this.isConnected()) {
122
return 0;
123
}
124
return Date.now() - this.clientStateChanged;
125
};
126
127
close = () => {
128
if (this.state == "closed") {
129
return;
130
}
131
this.state = "closed";
132
this.client.removeListener("connected", this.handleClientStateChanged);
133
this.client.removeListener("disconnected", this.handleClientStateChanged);
134
if (this.streams != null) {
135
this.streams.interest.removeListener("change", this.handleInterestUpdate);
136
this.streams.interest.close();
137
this.streams.sticky.close();
138
// @ts-ignore
139
delete this.streams;
140
}
141
this.client.close();
142
// @ts-ignore
143
delete this.client;
144
};
145
146
hasInterest = (subject) => {
147
return this.interest.hasMatch(subject);
148
};
149
150
waitForInterest = async (
151
subject: string,
152
timeout: number,
153
signal?: AbortSignal,
154
) => {
155
const hasMatch = this.interest.hasMatch(subject);
156
157
if (hasMatch || !timeout) {
158
// NOTE: we never return the actual matches, since this is a
159
// potential security vulnerability.
160
// it could make it very easy to figure out private inboxes, etc.
161
return hasMatch;
162
}
163
const start = Date.now();
164
while (this.state != "closed" && !signal?.aborted) {
165
if (Date.now() - start >= timeout) {
166
throw Error("timeout");
167
}
168
await once(this.interest, "change");
169
if ((this.state as any) == "closed" || signal?.aborted) {
170
return false;
171
}
172
const hasMatch = this.interest.hasMatch(subject);
173
if (hasMatch) {
174
return true;
175
}
176
}
177
178
return false;
179
};
180
181
hash = (): { interest: number; sticky: number } => {
182
return {
183
interest: hashInterest(this.interest),
184
sticky: hashSticky(this.sticky),
185
};
186
};
187
}
188
189
function clusterStreamNames({
190
clusterName,
191
id,
192
}: {
193
clusterName: string;
194
id: string;
195
}) {
196
return {
197
interest: `cluster/${clusterName}/${id}/interest`,
198
sticky: `cluster/${clusterName}/${id}/sticky`,
199
};
200
}
201
202
export function clusterService({
203
id,
204
clusterName,
205
}: {
206
id: string;
207
clusterName: string;
208
}) {
209
return `persist:${clusterName}:${id}`;
210
}
211
212
export async function createClusterPersistServer({
213
client,
214
id,
215
clusterName,
216
}: {
217
client: Client;
218
id: string;
219
clusterName: string;
220
}) {
221
const service = clusterService({ clusterName, id });
222
logger.debug("createClusterPersistServer: ", { service });
223
return await createPersistServer({ client, service });
224
}
225
226
export interface ClusterStreams {
227
interest: DStream<InterestUpdate>;
228
sticky: DStream<StickyUpdate>;
229
}
230
231
export async function clusterStreams({
232
client,
233
clusterName,
234
id,
235
}: {
236
client: Client;
237
clusterName: string;
238
id: string;
239
}): Promise<ClusterStreams> {
240
logger.debug("clusterStream: ", { clusterName, id });
241
if (!clusterName) {
242
throw Error("clusterName must be set");
243
}
244
const names = clusterStreamNames({ clusterName, id });
245
const opts = {
246
service: clusterService({ clusterName, id }),
247
noCache: true,
248
ephemeral: true,
249
};
250
const interest = await client.sync.dstream<InterestUpdate>({
251
noInventory: true,
252
name: names.interest,
253
...opts,
254
});
255
const sticky = await client.sync.dstream<StickyUpdate>({
256
noInventory: true,
257
name: names.sticky,
258
...opts,
259
});
260
logger.debug("clusterStreams: got them", { clusterName });
261
return { interest, sticky };
262
}
263
264
// Periodically delete not-necessary updates from the interest stream
265
export async function trimClusterStreams(
266
streams: ClusterStreams,
267
data: {
268
interest: Patterns<{ [queue: string]: Set<string> }>;
269
sticky: { [pattern: string]: { [subject: string]: string } };
270
links: { interest: Patterns<{ [queue: string]: Set<string> }> }[];
271
},
272
// don't delete anything that isn't at lest minAge ms old.
273
minAge: number,
274
): Promise<{ seqsInterest: number[]; seqsSticky: number[] }> {
275
const { interest, sticky } = streams;
276
// First deal with interst
277
// we iterate over the interest stream checking for subjects
278
// with no current interest at all; in such cases it is safe
279
// to purge them entirely from the stream.
280
const seqs: number[] = [];
281
const now = Date.now();
282
for (let n = 0; n < interest.length; n++) {
283
const time = interest.time(n);
284
if (time == null) continue;
285
if (now - time.valueOf() <= minAge) {
286
break;
287
}
288
const update = interest.get(n) as InterestUpdate;
289
if (!data.interest.hasPattern(update.subject)) {
290
const seq = interest.seq(n);
291
if (seq != null) {
292
seqs.push(seq);
293
}
294
}
295
}
296
if (seqs.length > 0) {
297
// [ ] todo -- add to interest.delete a version where it takes an array of sequence numbers
298
logger.debug("trimClusterStream: trimming interest", { seqs });
299
await interest.delete({ seqs });
300
logger.debug("trimClusterStream: successfully trimmed interest", { seqs });
301
}
302
303
// Next deal with sticky -- trim ones where the pattern is no longer of interest.
304
// There could be other reasons to trim but it gets much trickier. This one is more
305
// obvious, except we have to check for any interest in the whole cluster, not
306
// just this node.
307
const seqs2: number[] = [];
308
function noInterest(pattern: string) {
309
if (data.interest.hasPattern(pattern)) {
310
return false;
311
}
312
for (const link of data.links) {
313
if (link.interest.hasPattern(pattern)) {
314
return false;
315
}
316
}
317
// nobody cares
318
return true;
319
}
320
for (let n = 0; n < sticky.length; n++) {
321
const time = sticky.time(n);
322
if (time == null) continue;
323
if (now - time.valueOf() <= minAge) {
324
break;
325
}
326
const update = sticky.get(n) as StickyUpdate;
327
if (noInterest(update.pattern)) {
328
const seq = sticky.seq(n);
329
if (seq != null) {
330
seqs2.push(seq);
331
}
332
}
333
}
334
if (seqs2.length > 0) {
335
// [ ] todo -- add to interest.delete a version where it takes an array of sequence numbers
336
logger.debug("trimClusterStream: trimming sticky", { seqs2 });
337
await sticky.delete({ seqs: seqs2 });
338
logger.debug("trimClusterStream: successfully trimmed sticky", { seqs2 });
339
}
340
341
return { seqsInterest: seqs, seqsSticky: seqs2 };
342
}
343
344
function hashSet(X: Set<string>): number {
345
let h = 0;
346
for (const a of X) {
347
h += hash_string(a); // integers, and not too many, so should commute
348
}
349
return h;
350
}
351
352
function hashInterestValue(X: { [queue: string]: Set<string> }): number {
353
let h = 0;
354
for (const queue in X) {
355
h += hashSet(X[queue]); // integers, and not too many, so should commute
356
}
357
return h;
358
}
359
360
export function hashInterest(
361
interest: Patterns<{ [queue: string]: Set<string> }>,
362
): number {
363
return interest.hash(hashInterestValue);
364
}
365
366
export function hashSticky(sticky: Sticky): number {
367
let h = 0;
368
for (const pattern in sticky) {
369
h += hash_string(pattern);
370
const x = sticky[pattern];
371
for (const subject in x) {
372
h += hash_string(x[subject]);
373
}
374
}
375
return h;
376
}
377
378