Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/conat/core/client.ts
1452 views
1
/*
2
core/client.s -- core conat client
3
4
This is a client that has a similar API to NATS / Socket.io, but is much,
5
much better in so many ways:
6
7
- It has global pub/sub just like with NATS. This uses the server to
8
rebroadcast messages, and for authentication.
9
Better than NATS: Authentication is done for a subject *as
10
needed* instead of at connection time.
11
12
- Message can be arbitrarily large and they are *automatically* divided
13
into chunks and reassembled. Better than both NATS and socket.io.
14
15
- There are multiple supported ways of encoding messages, and
16
no coordination is required with the server or other clients! E.g.,
17
one message can be sent with one encoding and the next with a different
18
encoding and that's fine.
19
- MsgPack: https://msgpack.org/ -- a very compact encoding that handles
20
dates nicely and small numbers efficiently. This also works
21
well with binary Buffer objects, which is nice.
22
- JsonCodec: uses JSON.stringify and TextEncoder. This does not work
23
with Buffer or Date and is less compact, but can be very fast.
24
25
26
THE CORE API
27
28
This section contains the crucial information you have to know to build a distributed
29
system using Conat. It's our take on the NATS primitives (it's not exactly the
30
same, but it is close). It's basically a symmetrical pub/sub/reqest/respond model
31
for messaging on which you can build distributed systems. The tricky part, which
32
NATS.js gets wrong (in my opinion), is implementing this in a way that is robust
33
and scalable, in terms for authentication, real world browser connectivity and
34
so on. Our approach is to use proven mature technology like socket.io, sqlite
35
and valkey, instead of writing everything from scratch.
36
37
Clients: We view all clients as plugged into a common "dial tone",
38
except for optional permissions that are configured when starting the server.
39
The methods you call on the client to build everything are:
40
41
- subscribe, subscribeSync - subscribe to a subject which returns an
42
async iterator over all messages that match the subject published by
43
anyone with permission to do so. If you provide the same optional
44
queue parameter for multiple subscribers, then one subscriber in each queue
45
group receives each message. The async form of this functino confirms
46
the subscription was created before returning. If a client creates multiple
47
subscriptions at the same time, the queue group must be the same.
48
Subscriptions are guaranteed to stay valid until the client ends them;
49
they do not stop working due to client or server reconnects or restarts.
50
(If you need more subscriptions with different queue groups, make another
51
client object.)
52
53
- publish, publishSync - publish to a subject. The async version returns
54
a count of the number of recipients, whereas the sync version is
55
fire-and-forget.
56
**There is no a priori size limit on messages since chunking
57
is automatic. However, we have to impose some limit, but
58
it can be much larger than the socketio message size limit.**
59
60
- request - send a message to a subject, and if there is at least one
61
subscriber listening, it may respond. If there are no subscribers,
62
it throws a 503 error. To create a microservice, subscribe
63
to a subject pattern and called mesg.respond(...) on each message you
64
receive.
65
66
- requestMany - send a message to a subject, and receive many
67
messages in reply. Typically you end the response stream by sending
68
a null message, but what you do is up to you. This is very useful
69
for streaming arbitrarily large data, long running changefeeds, LLM
70
responses, etc.
71
72
73
Messages: A message mesg is:
74
75
- Data:
76
- subject - the subject the message was sent to
77
- encoding - usually MessagePack
78
- raw - encoded binary data
79
- headers - a JSON-able Javascript object.
80
81
- Methods:
82
- data: this is a property, so if you do mesg.data, then it decodes raw
83
and returns the resulting Javascript object.
84
- respond, respondSync: if REPLY_HEADER is set, calling this publishes a
85
respond message to the original sender of the message.
86
87
88
Persistence:
89
90
We also implement persistent streams, where you can also set a key. This can
91
be used to build the analogue of Jetstream's streams and kv stores. The object
92
store isn't necessary since there is no limit on message size. Conat's persistent
93
streams are compressed by default and backed by individual sqlite files, which
94
makes them very memory efficient and it is easy to tier storage to cloud storage.
95
96
UNIT TESTS: See packages/server/conat/test/core
97
98
MISC NOTES:
99
100
NOTE: There is a socketio msgpack parser, but it just doesn't
101
work at all, which is weird. Also, I think it's impossible to
102
do the sort of chunking we want at the level of a socket.io
103
parser -- it's just not possible in that the encoding. We customize
104
things purely client side without using a parser, and get a much
105
simpler and better result, inspired by how NATS approaches things
106
with opaque messages.
107
108
109
SUBSCRIPTION ROBUSTNESS: When you call client.subscribe(...) you get back an async iterator.
110
It ONLY ends when you explicitly do the standard ways of terminating
111
such an iterator, including calling .close() on it. It is a MAJOR BUG
112
if it were to terminate for any other reason. In particular, the subscription
113
MUST NEVER throw an error or silently end when the connection is dropped
114
then resumed, or the server is restarted, or the client connects to
115
a different server! These situations can, of course, result in missing
116
some messages, but that's understood. There are no guarantees at all with
117
a subscription that every message is received. That said, we have enabled
118
connectionStateRecovery (and added special conat support for it) so no messages
119
are dropped for temporary disconnects, even up to several minutes,
120
and even in valkey cluster mode! Finally, any time a client disconnects
121
and reconnects, the client ensures that all subscriptions exist for it on the server
122
via a sync process.
123
124
Subscription robustness is a major difference with NATS.js, which would
125
mysteriously terminate subscriptions for a variety of reasons, meaning that any
126
code using subscriptions had to be wrapped in ugly complexity to be
127
usable in production.
128
129
USAGE:
130
131
The following should mostly work to interactively play around with this
132
code and develop it. It's NOT automatically tested and depends on your
133
environment though, so may break. See the unit tests in
134
135
packages/server/conat/test/core/
136
137
for something that definitely works perfectly.
138
139
140
For developing at the command line, cd to packages/backend, then in node:
141
142
c = require('@cocalc/backend/conat/conat').connect()
143
144
or
145
146
c = require('@cocalc/conat/core/client').connect('http://localhost:3000')
147
148
c.watch('a')
149
150
s = await c.subscribe('a'); for await (const x of s) { console.log(x.length)}
151
152
// in another console
153
154
c = require('@cocalc/backend/conat/conat').connect()
155
c.publish('a', 'hello there')
156
157
// in browser (right now)
158
159
cc.conat.conat()
160
161
// client server:
162
163
s = await c.subscribe('eval'); for await(const x of s) { x.respond(eval(x.data)) }
164
165
then in another console
166
167
f = async () => (await c.request('eval', '2+3')).data
168
await f()
169
170
t = Date.now(); for(i=0;i<1000;i++) { await f()} ; Date.now()-t
171
172
// slower, but won't silently fail due to errors, etc.
173
174
f2 = async () => (await c.request('eval', '2+3', {confirm:true})).data
175
176
Wildcard subject:
177
178
179
c = require('@cocalc/conat/core/client').connect(); c.watch('a.*');
180
181
182
c = require('@cocalc/conat/core/client').connect(); c.publish('a.x', 'foo')
183
184
185
Testing disconnect
186
187
c.sub('>')
188
c.conn.io.engine.close();0;
189
190
other:
191
192
a=0; setInterval(()=>c.pub('a',a++), 250)
193
194
*/
195
196
import {
197
connect as connectToSocketIO,
198
type SocketOptions,
199
type ManagerOptions,
200
} from "socket.io-client";
201
import { EventIterator } from "@cocalc/util/event-iterator";
202
import type { ConnectionStats, ServerInfo } from "./types";
203
import * as msgpack from "@msgpack/msgpack";
204
import { randomId } from "@cocalc/conat/names";
205
import type { JSONValue } from "@cocalc/util/types";
206
import { EventEmitter } from "events";
207
import { callback } from "awaiting";
208
import {
209
isValidSubject,
210
isValidSubjectWithoutWildcards,
211
} from "@cocalc/conat/util";
212
import { reuseInFlight } from "@cocalc/util/reuse-in-flight";
213
import { once, until } from "@cocalc/util/async-utils";
214
import { delay } from "awaiting";
215
import { getLogger } from "@cocalc/conat/client";
216
import { refCacheSync } from "@cocalc/util/refcache";
217
import { join } from "path";
218
import { dko, type DKO } from "@cocalc/conat/sync/dko";
219
import { dkv, type DKVOptions, type DKV } from "@cocalc/conat/sync/dkv";
220
import {
221
dstream,
222
type DStreamOptions,
223
type DStream,
224
} from "@cocalc/conat/sync/dstream";
225
import { akv, type AKV } from "@cocalc/conat/sync/akv";
226
import { astream, type AStream } from "@cocalc/conat/sync/astream";
227
import TTL from "@isaacs/ttlcache";
228
import {
229
ConatSocketServer,
230
ConatSocketClient,
231
ServerSocket,
232
type SocketConfiguration,
233
} from "@cocalc/conat/socket";
234
export { type ConatSocketServer, ConatSocketClient, ServerSocket };
235
import {
236
type SyncTableOptions,
237
type ConatSyncTable,
238
createSyncTable,
239
} from "@cocalc/conat/sync/synctable";
240
241
export const MAX_INTEREST_TIMEOUT = 90000;
242
243
const MSGPACK_ENCODER_OPTIONS = {
244
// ignoreUndefined is critical so database queries work properly, and
245
// also we have a lot of api calls with tons of wasted undefined values.
246
ignoreUndefined: true,
247
};
248
249
export const STICKY_QUEUE_GROUP = "sticky";
250
251
export const DEFAULT_SOCKETIO_CLIENT_OPTIONS = {
252
// A major problem if we allow long polling is that we must always use at most
253
// half the chunk size... because there is no way to know if recipients will be
254
// using long polling to RECEIVE messages. Not insurmountable.
255
transports: ["websocket"],
256
257
// nodejs specific for project/compute server in some settings
258
rejectUnauthorized: false,
259
260
reconnection: true,
261
reconnectionDelay: 1000,
262
reconnectionDelayMax: 15000,
263
reconnectionAttempts: 9999999999, // infinite
264
};
265
266
type State = "disconnected" | "connected" | "closed";
267
268
const logger = getLogger("core/client");
269
270
interface Options {
271
// address = the address of a cocalc server, including the base url, e.g.,
272
//
273
// https://cocalc.com
274
//
275
// or for a dev server running locally with a base url:
276
//
277
// http://localhost:4043/3fa218e5-7196-4020-8b30-e2127847cc4f/port/5002
278
//
279
// The socketio path is always /conat (after the base url) and is set automatically.
280
//
281
address?: string;
282
inboxPrefix?: string;
283
}
284
285
export type ClientOptions = Options & {
286
noCache?: boolean;
287
} & Partial<SocketOptions> &
288
Partial<ManagerOptions>;
289
290
const INBOX_PREFIX = "_INBOX";
291
const REPLY_HEADER = "CN-Reply";
292
const MAX_HEADER_SIZE = 100000;
293
294
const STATS_LOOP = 5000;
295
296
// fairly long since this is to avoid leaks, not for responsiveness in the UI.
297
export const DEFAULT_SUBSCRIPTION_TIMEOUT = 60000;
298
299
export let DEFAULT_REQUEST_TIMEOUT = 7500;
300
export let DEFAULT_PUBLISH_TIMEOUT = 7500;
301
302
export function setDefaultTimeouts({
303
request = DEFAULT_REQUEST_TIMEOUT,
304
publish = DEFAULT_PUBLISH_TIMEOUT,
305
}: {
306
request?: number;
307
publish?: number;
308
}) {
309
DEFAULT_REQUEST_TIMEOUT = request;
310
DEFAULT_PUBLISH_TIMEOUT = publish;
311
}
312
313
export enum DataEncoding {
314
MsgPack = 0,
315
JsonCodec = 1,
316
}
317
318
interface SubscriptionOptions {
319
maxWait?: number;
320
mesgLimit?: number;
321
queue?: string;
322
// sticky: when a choice from a queue group is made, the same choice is always made
323
// in the future for any message with subject matching subject with the last segment
324
// replaced by a *, until the target goes away. Setting this just sets the queue
325
// option to STICKY_QUEUE_GROUP.
326
//
327
// Examples of subjects matching except possibly last segment are
328
// foo.bar.lJcBSieLn and foo.bar.ZzsDC376ge
329
// You can put anything random in the last segment and all messages
330
// that match foo.bar.* get the same choice from the queue group.
331
// The idea is that *when* the message with subject foo.bar.lJcBSieLn gets
332
// sent, the backend server selects a target from the queue group to receive
333
// that message. It remembers the choice, and so long as that target is subscribed,
334
// it sends any message matching foo.bar.* to that same target.
335
// This is used in our implementation of persistent socket connections that
336
// are built on pub/sub. The underlying implementation uses consistent
337
// hashing and messages to sync state of the servers.
338
sticky?: boolean;
339
respond?: Function;
340
// timeout to create the subscription -- this may wait *until* you connect before
341
// it starts ticking.
342
timeout?: number;
343
}
344
345
// WARNING! This is the default and you can't just change it!
346
// Yes, for specific messages you can, but in general DO NOT. The reason is because, e.g.,
347
// JSON will turn Dates into strings, and we no longer fix that. So unless you modify the
348
// JsonCodec to handle Date's properly, don't change this!!
349
const DEFAULT_ENCODING = DataEncoding.MsgPack;
350
351
function cocalcServerToSocketioAddress(url?: string): {
352
address: string;
353
path: string;
354
} {
355
url = url ?? process.env.CONAT_SERVER;
356
if (!url) {
357
throw Error(
358
"Must give Conat server address or set CONAT_SERVER environment variable",
359
);
360
}
361
const u = new URL(url, "http://dummy.org");
362
const address = u.origin;
363
const path = join(u.pathname, "conat");
364
return { address, path };
365
}
366
367
const cache = refCacheSync<ClientOptions, Client>({
368
name: "conat-client",
369
createObject: (opts: ClientOptions) => {
370
return new Client(opts);
371
},
372
});
373
374
export function connect(opts: ClientOptions = {}) {
375
if (!opts.address) {
376
const x = cache.one();
377
if (x != null) {
378
return x;
379
}
380
}
381
return cache(opts);
382
}
383
384
// Get any cached client, if there is one; otherwise make one
385
// with default options.
386
export function getClient() {
387
return cache.one() ?? connect();
388
}
389
390
export class Client extends EventEmitter {
391
public conn: ReturnType<typeof connectToSocketIO>;
392
// queueGroups is a map from subject to the queue group for the subscription to that subject
393
private queueGroups: { [subject: string]: string } = {};
394
private subs: { [subject: string]: SubscriptionEmitter } = {};
395
private sockets: {
396
// all socket servers created using this Client
397
servers: { [subject: string]: ConatSocketServer };
398
// all client connections created using this Client.
399
clients: { [subject: string]: { [id: string]: ConatSocketClient } };
400
} = { servers: {}, clients: {} };
401
public readonly options: ClientOptions;
402
private inboxSubject: string;
403
private inbox?: EventEmitter;
404
private permissionError = {
405
pub: new TTL<string, string>({ ttl: 1000 * 60 }),
406
sub: new TTL<string, string>({ ttl: 1000 * 60 }),
407
};
408
public info: ServerInfo | undefined = undefined;
409
// total number of
410
public readonly stats: ConnectionStats & {
411
recv0: { messages: number; bytes: number };
412
} = {
413
send: { messages: 0, bytes: 0 },
414
recv: { messages: 0, bytes: 0 },
415
// recv0 = count since last connect
416
recv0: { messages: 0, bytes: 0 },
417
subs: 0,
418
};
419
420
public readonly id: string = randomId();
421
public state: State = "disconnected";
422
423
constructor(options: ClientOptions) {
424
super();
425
this.setMaxListeners(1000);
426
this.options = options;
427
428
// for socket.io the address has no base url
429
const { address, path } = cocalcServerToSocketioAddress(
430
this.options.address,
431
);
432
logger.debug(`Conat: Connecting to ${this.options.address}...`);
433
// if (options.extraHeaders == null) {
434
// console.trace("WARNING: no auth set");
435
// }
436
this.conn = connectToSocketIO(address, {
437
...DEFAULT_SOCKETIO_CLIENT_OPTIONS,
438
...options,
439
path,
440
});
441
442
this.conn.on("info", (info) => {
443
const firstTime = this.info == null;
444
this.info = info;
445
this.emit("info", info);
446
setTimeout(this.syncSubscriptions, firstTime ? 3000 : 0);
447
});
448
this.conn.on("permission", ({ message, type, subject }) => {
449
logger.debug(message);
450
this.permissionError[type]?.set(subject, message);
451
});
452
this.conn.on("connect", async () => {
453
logger.debug(`Conat: Connected to ${this.options.address}`);
454
if (this.conn.connected) {
455
this.setState("connected");
456
}
457
});
458
this.conn.io.on("error", (...args) => {
459
logger.debug(
460
`Conat: Error connecting to ${this.options.address} -- `,
461
...args,
462
);
463
});
464
this.conn.on("disconnect", () => {
465
this.stats.recv0 = { messages: 0, bytes: 0 }; // reset on disconnect
466
this.setState("disconnected");
467
this.disconnectAllSockets();
468
});
469
this.initInbox();
470
this.statsLoop();
471
}
472
473
disconnect = () => {
474
this.disconnectAllSockets();
475
// @ts-ignore
476
setTimeout(() => this.conn.io.disconnect(), 1);
477
};
478
479
waitUntilSignedIn = reuseInFlight(async () => {
480
// not "signed in" if --
481
// - not connected, or
482
// - no info at all (which gets sent on sign in)
483
// - or the user is {error:....}, which is what happens when sign in fails
484
// e.g., do to an expired cookie
485
if (
486
this.info == null ||
487
this.state != "connected" ||
488
this.info?.user?.error
489
) {
490
await once(this, "info");
491
}
492
});
493
494
private statsLoop = async () => {
495
await until(
496
async () => {
497
if (this.isClosed()) {
498
return true;
499
}
500
try {
501
await this.waitUntilConnected();
502
if (this.isClosed()) {
503
return true;
504
}
505
this.conn.emit("stats", { recv0: this.stats.recv0 });
506
} catch {}
507
return false;
508
},
509
{ start: STATS_LOOP, max: STATS_LOOP },
510
);
511
};
512
513
interest = async (subject: string): Promise<boolean> => {
514
return await this.waitForInterest(subject, { timeout: 0 });
515
};
516
517
waitForInterest = async (
518
subject: string,
519
{
520
timeout = MAX_INTEREST_TIMEOUT,
521
}: {
522
timeout?: number;
523
} = {},
524
) => {
525
if (!isValidSubjectWithoutWildcards(subject)) {
526
throw Error(
527
`subject ${subject} must be a valid subject without wildcards`,
528
);
529
}
530
timeout = Math.min(timeout, MAX_INTEREST_TIMEOUT);
531
const f = (cb) => {
532
this.conn
533
.timeout(timeout ? timeout : 10000)
534
.emit("wait-for-interest", { subject, timeout }, (err, response) => {
535
if (err) {
536
cb(err);
537
} else if (response.error) {
538
cb(new ConatError(response.error, { code: response.code }));
539
} else {
540
cb(undefined, response);
541
}
542
});
543
};
544
return await callback(f);
545
};
546
547
recvStats = (bytes: number) => {
548
this.stats.recv.messages += 1;
549
this.stats.recv.bytes += bytes;
550
this.stats.recv0.messages += 1;
551
this.stats.recv0.bytes += bytes;
552
};
553
554
// There should usually be no reason to call this because socket.io
555
// is so good at abstracting this away. It's useful for unit testing.
556
waitUntilConnected = reuseInFlight(async () => {
557
if (this.conn.connected) {
558
return;
559
}
560
// @ts-ignore
561
await once(this.conn, "connect");
562
});
563
564
waitUntilReady = reuseInFlight(async () => {
565
await this.waitUntilSignedIn();
566
await this.waitUntilConnected();
567
});
568
569
private setState = (state: State) => {
570
if (this.isClosed() || this.state == state) {
571
return;
572
}
573
this.state = state;
574
this.emit(state);
575
};
576
577
private temporaryInboxSubject = () => {
578
if (!this.inboxSubject) {
579
throw Error("inbox not setup properly");
580
}
581
return `${this.inboxSubject}.${randomId()}`;
582
};
583
584
private getInbox = reuseInFlight(async (): Promise<EventEmitter> => {
585
if (this.inbox == null) {
586
if (this.isClosed()) {
587
throw Error("closed");
588
}
589
await once(this, "inbox");
590
}
591
if (this.inbox == null) {
592
throw Error("bug");
593
}
594
return this.inbox;
595
});
596
597
private initInbox = async () => {
598
// For request/respond instead of setting up one
599
// inbox *every time there is a request*, we setup a single
600
// inbox once and for all for all responses. We listen for
601
// everything to inbox...Prefix.* and emit it via this.inbox.
602
// The request sender then listens on this.inbox for the response.
603
// We *could* use a regular subscription for each request,
604
// but (1) that massively increases the load on the server for
605
// every single request (having to create and destroy subscriptions)
606
// and (2) there is a race condition between creating that subscription
607
// and getting the response; it's fine with one server, but with
608
// multiple servers solving the race condition would slow everything down
609
// due to having to wait for so many acknowledgements. Instead, we
610
// remove all those problems by just using a single inbox subscription.
611
const inboxPrefix = this.options.inboxPrefix ?? INBOX_PREFIX;
612
if (!inboxPrefix.startsWith(INBOX_PREFIX)) {
613
throw Error(`custom inboxPrefix must start with '${INBOX_PREFIX}'`);
614
}
615
this.inboxSubject = `${inboxPrefix}.${randomId()}`;
616
let sub;
617
await until(
618
async () => {
619
try {
620
await this.waitUntilSignedIn();
621
sub = await this.subscribe(this.inboxSubject + ".*");
622
return true;
623
} catch (err) {
624
if (this.isClosed()) {
625
return true;
626
}
627
// this should only fail due to permissions issues, at which point
628
// request can't work, but pub/sub can.
629
if (!process.env.COCALC_TEST_MODE) {
630
console.log(`WARNING: inbox not available -- ${err}`);
631
}
632
}
633
return false;
634
},
635
{ start: 1000, max: 15000 },
636
);
637
if (this.isClosed()) {
638
return;
639
}
640
641
this.inbox = new EventEmitter();
642
(async () => {
643
for await (const mesg of sub) {
644
if (this.inbox == null) {
645
return;
646
}
647
this.inbox.emit(mesg.subject, mesg);
648
}
649
})();
650
this.emit("inbox", this.inboxSubject);
651
};
652
653
private isClosed = () => {
654
return this.state == "closed";
655
};
656
657
close = () => {
658
if (this.isClosed()) {
659
return;
660
}
661
this.setState("closed");
662
this.removeAllListeners();
663
this.closeAllSockets();
664
// @ts-ignore
665
delete this.sockets;
666
for (const subject in this.queueGroups) {
667
this.conn.emit("unsubscribe", { subject });
668
delete this.queueGroups[subject];
669
}
670
for (const sub of Object.values(this.subs)) {
671
sub.refCount = 0;
672
sub.close();
673
// @ts-ignore
674
delete this.subs;
675
}
676
// @ts-ignore
677
delete this.queueGroups;
678
// @ts-ignore
679
delete this.inboxSubject;
680
delete this.inbox;
681
// @ts-ignore
682
delete this.options;
683
// @ts-ignore
684
delete this.info;
685
// @ts-ignore
686
delete this.permissionError;
687
688
try {
689
this.conn.close();
690
} catch {}
691
};
692
693
private syncSubscriptions = reuseInFlight(async () => {
694
let fails = 0;
695
await until(
696
async () => {
697
if (this.isClosed()) return true;
698
try {
699
if (this.info == null) {
700
// no point in trying until we are signed in and connected
701
await once(this, "info");
702
}
703
if (this.isClosed()) return true;
704
await this.waitUntilConnected();
705
if (this.isClosed()) return true;
706
const stable = await this.syncSubscriptions0(10000);
707
if (stable) {
708
return true;
709
}
710
} catch (err) {
711
fails++;
712
if (fails >= 3) {
713
console.log(
714
`WARNING: failed to sync subscriptions ${fails} times -- ${err}`,
715
);
716
}
717
}
718
return false;
719
},
720
{ start: 1000, max: 15000 },
721
);
722
});
723
724
// syncSubscriptions0 ensures that we're subscribed on server
725
// to what we think we're subscribed to, or throws an error.
726
private syncSubscriptions0 = async (timeout: number): Promise<boolean> => {
727
if (this.isClosed()) return true;
728
if (this.info == null) {
729
throw Error("not signed in");
730
}
731
const subs = await this.getSubscriptions(timeout);
732
// console.log("syncSubscriptions", {
733
// server: subs,
734
// client: Object.keys(this.queueGroups),
735
// });
736
const missing: { subject: string; queue: string }[] = [];
737
for (const subject in this.queueGroups) {
738
// subscribe on backend to all subscriptions we think we should have that
739
// the server does not have
740
if (!subs.has(subject)) {
741
missing.push({
742
subject,
743
queue: this.queueGroups[subject],
744
});
745
}
746
}
747
let stable = true;
748
if (missing.length > 0) {
749
stable = false;
750
const resp = await callback(
751
this.conn.timeout(timeout).emit.bind(this.conn),
752
"subscribe",
753
missing,
754
);
755
// some subscription could fail due to permissions changes, e.g., user got
756
// removed from a project.
757
for (let i = 0; i < missing.length; i++) {
758
if (resp[i].error) {
759
const sub = this.subs[missing[i].subject];
760
if (sub != null) {
761
sub.close(true);
762
}
763
}
764
}
765
}
766
const extra: { subject: string }[] = [];
767
for (const subject in subs) {
768
if (this.queueGroups[subject] != null) {
769
// server thinks we're subscribed but we do not think so, so cancel that
770
extra.push({ subject });
771
}
772
}
773
if (extra.length > 0) {
774
await callback(
775
this.conn.timeout(timeout).emit.bind(this.conn),
776
"unsubscribe",
777
extra,
778
);
779
stable = false;
780
}
781
return stable;
782
};
783
784
numSubscriptions = () => Object.keys(this.queueGroups).length;
785
786
private getSubscriptions = async (
787
timeout = DEFAULT_REQUEST_TIMEOUT,
788
): Promise<Set<string>> => {
789
const subs = await callback(
790
this.conn.timeout(timeout).emit.bind(this.conn),
791
"subscriptions",
792
null,
793
);
794
return new Set(subs);
795
};
796
797
// returns EventEmitter that emits 'message', mesg: Message
798
private subscriptionEmitter = (
799
subject: string,
800
{
801
closeWhenOffCalled,
802
queue,
803
sticky,
804
confirm,
805
timeout,
806
}: {
807
// if true, when the off method of the event emitter is called, then
808
// the entire subscription is closed. This is very useful when we wrap the
809
// EvenEmitter in an async iterator.
810
closeWhenOffCalled?: boolean;
811
812
// the queue group -- if not given, then one is randomly assigned.
813
queue?: string;
814
815
// if true, sets queue to "sticky"
816
sticky?: boolean;
817
818
// confirm -- get confirmation back from server that subscription was created
819
confirm?: boolean;
820
821
// how long to wait to confirm creation of the subscription;
822
// only explicitly *used* when confirm=true, but always must be set.
823
timeout?: number;
824
} = {},
825
): { sub: SubscriptionEmitter; promise? } => {
826
// Having timeout set at all is absolutely critical because if the connection
827
// goes down while making the subscription, having some timeout causes
828
// socketio to throw an error, which avoids a huge potential subscription
829
// leak. We set this by default to DEFAULT_SUBSCRIPTION_TIMEOUT.
830
if (!timeout) {
831
timeout = DEFAULT_SUBSCRIPTION_TIMEOUT;
832
}
833
if (this.isClosed()) {
834
throw Error("closed");
835
}
836
if (!isValidSubject(subject)) {
837
throw Error(`invalid subscribe subject '${subject}'`);
838
}
839
if (this.permissionError.sub.has(subject)) {
840
const message = this.permissionError.sub.get(subject)!;
841
logger.debug(message);
842
throw new ConatError(message, { code: 403 });
843
}
844
if (sticky) {
845
if (queue) {
846
throw Error("must not specify queue group if sticky is true");
847
}
848
queue = STICKY_QUEUE_GROUP;
849
}
850
let sub = this.subs[subject];
851
if (sub != null) {
852
if (queue && this.queueGroups[subject] != queue) {
853
throw Error(
854
`client can only have one queue group subscription for a given subject -- subject='${subject}', queue='${queue}'`,
855
);
856
}
857
if (queue == STICKY_QUEUE_GROUP) {
858
throw Error(
859
`can only have one sticky subscription per client -- subject='${subject}'`,
860
);
861
}
862
sub.refCount += 1;
863
return { sub, promise: undefined };
864
}
865
if (this.queueGroups[subject] != null) {
866
throw Error(`already subscribed to '${subject}'`);
867
}
868
if (!queue) {
869
queue = randomId();
870
}
871
this.queueGroups[subject] = queue;
872
sub = new SubscriptionEmitter({
873
client: this,
874
subject,
875
closeWhenOffCalled,
876
});
877
this.subs[subject] = sub;
878
this.stats.subs++;
879
let promise;
880
if (confirm) {
881
const f = (cb) => {
882
const handle = (response) => {
883
if (response?.error) {
884
cb(new ConatError(response.error, { code: response.code }));
885
} else {
886
cb(response?.error, response);
887
}
888
};
889
if (timeout) {
890
this.conn
891
.timeout(timeout)
892
.emit("subscribe", { subject, queue }, (err, response) => {
893
if (err) {
894
handle({ error: `${err}`, code: 408 });
895
} else {
896
handle(response);
897
}
898
});
899
} else {
900
// this should never be used -- see above
901
this.conn.emit("subscribe", { subject, queue }, handle);
902
}
903
};
904
promise = callback(f);
905
} else {
906
this.conn.emit("subscribe", { subject, queue });
907
promise = undefined;
908
}
909
sub.once("closed", () => {
910
if (this.isClosed()) {
911
return;
912
}
913
this.conn.emit("unsubscribe", { subject });
914
delete this.queueGroups[subject];
915
if (this.subs[subject] != null) {
916
this.stats.subs--;
917
delete this.subs[subject];
918
}
919
});
920
return { sub, promise };
921
};
922
923
private subscriptionIterator = (
924
sub,
925
opts?: SubscriptionOptions,
926
): Subscription => {
927
// @ts-ignore
928
const iter = new EventIterator<Message>(sub, "message", {
929
idle: opts?.maxWait,
930
limit: opts?.mesgLimit,
931
map: (args) => args[0],
932
});
933
return iter;
934
};
935
936
subscribeSync = (
937
subject: string,
938
opts?: SubscriptionOptions,
939
): Subscription => {
940
const { sub } = this.subscriptionEmitter(subject, {
941
confirm: false,
942
closeWhenOffCalled: true,
943
sticky: opts?.sticky,
944
queue: opts?.queue,
945
});
946
return this.subscriptionIterator(sub, opts);
947
};
948
949
subscribe = async (
950
subject: string,
951
opts?: SubscriptionOptions,
952
): Promise<Subscription> => {
953
await this.waitUntilSignedIn();
954
const { sub, promise } = this.subscriptionEmitter(subject, {
955
confirm: true,
956
closeWhenOffCalled: true,
957
queue: opts?.queue,
958
sticky: opts?.sticky,
959
timeout: opts?.timeout,
960
});
961
try {
962
await promise;
963
} catch (err) {
964
sub.close();
965
throw err;
966
}
967
return this.subscriptionIterator(sub, opts);
968
};
969
970
sub = this.subscribe;
971
972
/*
973
A service is a subscription with a function to respond to requests by name.
974
Call service with an implementation:
975
976
service = await client1.service('arith', {mul : async (a,b)=>{a*b}, add : async (a,b)=>a+b})
977
978
Use the service:
979
980
arith = await client2.call('arith')
981
await arith.mul(2,3)
982
await arith.add(2,3)
983
984
There's by default a single queue group '0', so if you create multiple services on various
985
computers, then requests are load balanced across them automatically. Explicitly set
986
a random queue group (or something else) and use callMany if you don't want this behavior.
987
988
Close the service when done:
989
990
service.close();
991
992
See backend/conat/test/core/services.test.ts for a tested and working example
993
that involves typescript and shows how to use wildcard subjects and get the
994
specific subject used for a call by using that this is bound to the calling mesg.
995
*/
996
service: <T = any>(
997
subject: string,
998
impl: T,
999
opts?: SubscriptionOptions,
1000
) => Promise<Subscription> = async (subject, impl, opts) => {
1001
const sub = await this.subscribe(subject, {
1002
...opts,
1003
queue: opts?.queue ?? "0",
1004
});
1005
const respond = async (mesg: Message) => {
1006
try {
1007
const [name, args] = mesg.data;
1008
// call impl[name], but with 'this' set to the object {subject:...},
1009
// so inside the service, it is possible to know what subject was used
1010
// in the request, in case subject is a wildcard subject.
1011
// const result = await impl[name].apply(
1012
// { subject: mesg.subject },
1013
// ...args,
1014
// );
1015
// const result = await impl[name].apply(
1016
// { subject: mesg.subject },
1017
// ...args,
1018
// );
1019
// mesg.respondSync(result);
1020
let f = impl[name];
1021
if (f == null) {
1022
throw Error(`${name} not defined`);
1023
}
1024
const result = await f.apply(mesg, args);
1025
mesg.respondSync(result);
1026
} catch (err) {
1027
mesg.respondSync(null, { headers: { error: `${err}` } });
1028
}
1029
};
1030
const loop = async () => {
1031
// todo -- param to set max number of responses at once.
1032
for await (const mesg of sub) {
1033
respond(mesg);
1034
}
1035
};
1036
loop();
1037
return sub;
1038
};
1039
1040
// Call a service as defined above.
1041
call<T = any>(subject: string, opts?: PublishOptions): T {
1042
const call = async (name: string, args: any[]) => {
1043
const resp = await this.request(subject, [name, args], opts);
1044
if (resp.headers?.error) {
1045
throw Error(`${resp.headers.error}`);
1046
} else {
1047
return resp.data;
1048
}
1049
};
1050
1051
return new Proxy(
1052
{},
1053
{
1054
get: (_, name) => {
1055
if (typeof name !== "string") {
1056
return undefined;
1057
}
1058
return async (...args) => await call(name, args);
1059
},
1060
},
1061
) as T;
1062
}
1063
1064
callMany<T = any>(subject: string, opts?: RequestManyOptions): T {
1065
const maxWait = opts?.maxWait ? opts?.maxWait : DEFAULT_REQUEST_TIMEOUT;
1066
const self = this;
1067
async function* callMany(name: string, args: any[]) {
1068
const sub = await self.requestMany(subject, [name, args], {
1069
...opts,
1070
maxWait,
1071
});
1072
for await (const resp of sub) {
1073
if (resp.headers?.error) {
1074
yield new ConatError(`${resp.headers.error}`, {
1075
code: resp.headers.code,
1076
});
1077
} else {
1078
yield resp.data;
1079
}
1080
}
1081
}
1082
1083
return new Proxy(
1084
{},
1085
{
1086
get: (_, name) => {
1087
if (typeof name !== "string") {
1088
return undefined;
1089
}
1090
return async (...args) => await callMany(name, args);
1091
},
1092
},
1093
) as T;
1094
}
1095
1096
publishSync = (
1097
subject: string,
1098
mesg,
1099
opts?: PublishOptions,
1100
): { bytes: number } => {
1101
if (this.isClosed()) {
1102
// already closed
1103
return { bytes: 0 };
1104
}
1105
return this._publish(subject, mesg, opts);
1106
};
1107
1108
publish = async (
1109
subject: string,
1110
mesg,
1111
opts?: PublishOptions,
1112
): Promise<{
1113
// bytes encoded (doesn't count some extra wrapping)
1114
bytes: number;
1115
// count is the number of matching subscriptions
1116
// that the server *sent* this message to since the server knows about them.
1117
// However, there's no guaranteee that the subscribers actually exist
1118
// **right now** or received these messages.
1119
count: number;
1120
}> => {
1121
if (this.isClosed()) {
1122
// already closed
1123
return { bytes: 0, count: 0 };
1124
}
1125
await this.waitUntilSignedIn();
1126
const { bytes, getCount, promise } = this._publish(subject, mesg, {
1127
...opts,
1128
confirm: true,
1129
});
1130
await promise;
1131
return { bytes, count: getCount?.()! };
1132
};
1133
1134
private _publish = (
1135
subject: string,
1136
mesg,
1137
{
1138
headers,
1139
raw,
1140
encoding = DEFAULT_ENCODING,
1141
confirm,
1142
timeout = DEFAULT_PUBLISH_TIMEOUT,
1143
}: PublishOptions & { confirm?: boolean } = {},
1144
) => {
1145
if (this.isClosed()) {
1146
return { bytes: 0 };
1147
}
1148
if (!isValidSubjectWithoutWildcards(subject)) {
1149
throw Error(`invalid publish subject ${subject}`);
1150
}
1151
if (this.permissionError.pub.has(subject)) {
1152
const message = this.permissionError.pub.get(subject)!;
1153
logger.debug(message);
1154
throw new ConatError(message, { code: 403 });
1155
}
1156
raw = raw ?? encode({ encoding, mesg });
1157
this.stats.send.messages += 1;
1158
this.stats.send.bytes += raw.length;
1159
1160
// default to 1MB is safe since it's at least that big.
1161
const chunkSize = Math.max(
1162
1000,
1163
(this.info?.max_payload ?? 1e6) - MAX_HEADER_SIZE,
1164
);
1165
let seq = 0;
1166
let id = randomId();
1167
const promises: any[] = [];
1168
let count = 0;
1169
for (let i = 0; i < raw.length; i += chunkSize) {
1170
// !!FOR TESTING ONLY!!
1171
// if (Math.random() <= 0.01) {
1172
// console.log("simulating a chunk drop", { subject, seq });
1173
// seq += 1;
1174
// continue;
1175
// }
1176
const done = i + chunkSize >= raw.length ? 1 : 0;
1177
const v: any[] = [
1178
subject,
1179
id,
1180
seq,
1181
done,
1182
encoding,
1183
raw.slice(i, i + chunkSize),
1184
];
1185
if (done && headers) {
1186
v.push(headers);
1187
}
1188
if (confirm) {
1189
let done = false;
1190
const f = (cb) => {
1191
const handle = (response) => {
1192
// console.log("_publish", { done, subject, mesg, headers, confirm });
1193
if (response?.error) {
1194
cb(new ConatError(response.error, { code: response.code }));
1195
} else {
1196
cb(response?.error, response);
1197
}
1198
};
1199
if (timeout) {
1200
const timer = setTimeout(() => {
1201
done = true;
1202
cb(new ConatError("timeout", { code: 408 }));
1203
}, timeout);
1204
1205
this.conn.timeout(timeout).emit("publish", v, (err, response) => {
1206
if (done) {
1207
return;
1208
}
1209
clearTimeout(timer);
1210
if (err) {
1211
handle({ error: `${err}`, code: 408 });
1212
} else {
1213
handle(response);
1214
}
1215
});
1216
} else {
1217
this.conn.emit("publish", v, handle);
1218
}
1219
};
1220
const promise = (async () => {
1221
const response = await callback(f);
1222
count = Math.max(count, response.count ?? 0);
1223
})();
1224
promises.push(promise);
1225
} else {
1226
this.conn.emit("publish", v);
1227
}
1228
seq += 1;
1229
}
1230
if (confirm) {
1231
return {
1232
bytes: raw.length,
1233
getCount: () => count,
1234
promise: Promise.all(promises),
1235
};
1236
}
1237
return { bytes: raw.length };
1238
};
1239
1240
pub = this.publish;
1241
1242
request = async (
1243
subject: string,
1244
mesg: any,
1245
{
1246
timeout = DEFAULT_REQUEST_TIMEOUT,
1247
// waitForInterest -- if publish fails due to no receivers and
1248
// waitForInterest is true, will wait until there is a receiver
1249
// and publish again:
1250
waitForInterest = false,
1251
...options
1252
}: PublishOptions & { timeout?: number; waitForInterest?: boolean } = {},
1253
): Promise<Message> => {
1254
if (timeout <= 0) {
1255
throw Error("timeout must be positive");
1256
}
1257
const start = Date.now();
1258
const inbox = await this.getInbox();
1259
const inboxSubject = this.temporaryInboxSubject();
1260
const sub = new EventIterator<Message>(inbox, inboxSubject, {
1261
idle: timeout,
1262
limit: 1,
1263
map: (args) => args[0],
1264
});
1265
1266
const opts = {
1267
...options,
1268
timeout,
1269
headers: { ...options?.headers, [REPLY_HEADER]: inboxSubject },
1270
};
1271
const { count } = await this.publish(subject, mesg, opts);
1272
1273
if (!count) {
1274
const giveUp = () => {
1275
sub.stop();
1276
throw new ConatError(
1277
`request -- no subscribers matching '${subject}'`,
1278
{
1279
code: 503,
1280
},
1281
);
1282
};
1283
if (waitForInterest) {
1284
await this.waitForInterest(subject, { timeout });
1285
if (this.state == "closed") {
1286
throw Error("closed");
1287
}
1288
const remaining = timeout - (Date.now() - start);
1289
if (remaining <= 1000) {
1290
throw new ConatError("timeout", { code: 408 });
1291
}
1292
// no error so there is very likely now interest, so we publish again:
1293
const { count } = await this.publish(subject, mesg, {
1294
...opts,
1295
timeout: remaining,
1296
});
1297
if (!count) {
1298
giveUp();
1299
}
1300
} else {
1301
giveUp();
1302
}
1303
}
1304
1305
for await (const resp of sub) {
1306
sub.stop();
1307
return resp;
1308
}
1309
sub.stop();
1310
throw new ConatError("timeout", { code: 408 });
1311
};
1312
1313
// NOTE: Using requestMany returns a Subscription sub, and
1314
// you can call sub.close(). However, the sender doesn't
1315
// know that this happened and the messages are still going
1316
// to your inbox. Similarly if you set a maxWait, the
1317
// subscription just ends at that point, but the server
1318
// sending messages doesn't know. This is a shortcoming the
1319
// pub/sub model. You must decide entirely based on your
1320
// own application protocol how to terminate.
1321
requestMany = async (
1322
subject: string,
1323
mesg: any,
1324
{ maxMessages, maxWait, ...options }: RequestManyOptions = {},
1325
): Promise<Subscription> => {
1326
if (maxMessages != null && maxMessages <= 0) {
1327
throw Error("maxMessages must be positive");
1328
}
1329
if (maxWait != null && maxWait <= 0) {
1330
throw Error("maxWait must be positive");
1331
}
1332
const inbox = await this.getInbox();
1333
const inboxSubject = this.temporaryInboxSubject();
1334
const sub = new EventIterator<Message>(inbox, inboxSubject, {
1335
idle: maxWait,
1336
limit: maxMessages,
1337
map: (args) => args[0],
1338
});
1339
const { count } = await this.publish(subject, mesg, {
1340
...options,
1341
headers: { ...options?.headers, [REPLY_HEADER]: inboxSubject },
1342
});
1343
if (!count) {
1344
sub.stop();
1345
throw new ConatError(
1346
`requestMany -- no subscribers matching ${subject}`,
1347
{ code: 503 },
1348
);
1349
}
1350
return sub;
1351
};
1352
1353
// watch: this is mainly for debugging and interactive use.
1354
watch = (
1355
subject: string,
1356
cb = (x) => console.log(`${new Date()}: ${x.subject}:`, x.data, x.headers),
1357
opts?: SubscriptionOptions,
1358
) => {
1359
const sub = this.subscribeSync(subject, opts);
1360
const f = async () => {
1361
for await (const x of sub) {
1362
cb(x);
1363
}
1364
};
1365
f();
1366
return sub;
1367
};
1368
1369
sync = {
1370
dkv: async (opts: DKVOptions): Promise<DKV> =>
1371
await dkv({ ...opts, client: this }),
1372
akv: async (opts: DKVOptions): Promise<AKV> =>
1373
await akv({ ...opts, client: this }),
1374
dko: async (opts: DKVOptions): Promise<DKO> =>
1375
await dko({ ...opts, client: this }),
1376
dstream: async (opts: DStreamOptions): Promise<DStream> =>
1377
await dstream({ ...opts, client: this }),
1378
astream: async (opts: DStreamOptions): Promise<AStream> =>
1379
await astream({ ...opts, client: this }),
1380
synctable: async (opts: SyncTableOptions): Promise<ConatSyncTable> =>
1381
await createSyncTable({ ...opts, client: this }),
1382
};
1383
1384
socket = {
1385
listen: (
1386
subject: string,
1387
opts?: SocketConfiguration,
1388
): ConatSocketServer => {
1389
if (this.state == "closed") {
1390
throw Error("closed");
1391
}
1392
if (this.sockets.servers[subject] !== undefined) {
1393
throw Error(
1394
`there can be at most one socket server per client listening on a subject (subject='${subject}')`,
1395
);
1396
}
1397
const server = new ConatSocketServer({
1398
subject,
1399
role: "server",
1400
client: this,
1401
id: this.id,
1402
...opts,
1403
});
1404
this.sockets.servers[subject] = server;
1405
server.once("closed", () => {
1406
delete this.sockets.servers[subject];
1407
});
1408
return server;
1409
},
1410
1411
connect: (
1412
subject: string,
1413
opts?: SocketConfiguration,
1414
): ConatSocketClient => {
1415
if (this.state == "closed") {
1416
throw Error("closed");
1417
}
1418
const id = randomId();
1419
const client = new ConatSocketClient({
1420
subject,
1421
role: "client",
1422
client: this,
1423
id,
1424
...opts,
1425
});
1426
if (this.sockets.clients[subject] === undefined) {
1427
this.sockets.clients[subject] = { [id]: client };
1428
} else {
1429
this.sockets.clients[subject][id] = client;
1430
}
1431
client.once("closed", () => {
1432
const v = this.sockets.clients[subject];
1433
if (v != null) {
1434
delete v[id];
1435
if (isEmpty(v)) {
1436
delete this.sockets.clients[subject];
1437
}
1438
}
1439
});
1440
return client;
1441
},
1442
};
1443
1444
private disconnectAllSockets = () => {
1445
for (const subject in this.sockets.servers) {
1446
this.sockets.servers[subject].disconnect();
1447
}
1448
for (const subject in this.sockets.clients) {
1449
for (const id in this.sockets.clients[subject]) {
1450
this.sockets.clients[subject][id].disconnect();
1451
}
1452
}
1453
};
1454
1455
private closeAllSockets = () => {
1456
for (const subject in this.sockets.servers) {
1457
this.sockets.servers[subject].close();
1458
}
1459
for (const subject in this.sockets.clients) {
1460
for (const id in this.sockets.clients[subject]) {
1461
this.sockets.clients[subject][id].close();
1462
}
1463
}
1464
};
1465
1466
message = (mesg, options?) => messageData(mesg, options);
1467
}
1468
1469
interface PublishOptions {
1470
headers?: Headers;
1471
// if encoding is given, it specifies the encoding used to encode the message
1472
encoding?: DataEncoding;
1473
// if raw is given, then it is assumed to be the raw binary
1474
// encoded message (using encoding) and any mesg parameter
1475
// is *IGNORED*.
1476
raw?;
1477
// timeout used when publishing a message and awaiting a response.
1478
timeout?: number;
1479
}
1480
1481
interface RequestManyOptions extends PublishOptions {
1482
maxWait?: number;
1483
maxMessages?: number;
1484
}
1485
1486
export function encode({
1487
encoding,
1488
mesg,
1489
}: {
1490
encoding: DataEncoding;
1491
mesg: any;
1492
}) {
1493
if (encoding == DataEncoding.MsgPack) {
1494
return msgpack.encode(mesg, MSGPACK_ENCODER_OPTIONS);
1495
} else if (encoding == DataEncoding.JsonCodec) {
1496
return jsonEncoder(mesg);
1497
} else {
1498
throw Error(`unknown encoding ${encoding}`);
1499
}
1500
}
1501
1502
export function decode({
1503
encoding,
1504
data,
1505
}: {
1506
encoding: DataEncoding;
1507
data;
1508
}): any {
1509
if (encoding == DataEncoding.MsgPack) {
1510
return msgpack.decode(data);
1511
} else if (encoding == DataEncoding.JsonCodec) {
1512
return jsonDecoder(data);
1513
} else {
1514
throw Error(`unknown encoding ${encoding}`);
1515
}
1516
}
1517
1518
let textEncoder: TextEncoder | undefined = undefined;
1519
let textDecoder: TextDecoder | undefined = undefined;
1520
1521
function jsonEncoder(obj: any) {
1522
if (textEncoder === undefined) {
1523
textEncoder = new TextEncoder();
1524
}
1525
return textEncoder.encode(JSON.stringify(obj));
1526
}
1527
1528
function jsonDecoder(data: Buffer): any {
1529
if (textDecoder === undefined) {
1530
textDecoder = new TextDecoder();
1531
}
1532
return JSON.parse(textDecoder.decode(data));
1533
}
1534
1535
interface Chunk {
1536
id: string;
1537
seq: number;
1538
done: number;
1539
buffer: Buffer;
1540
headers?: any;
1541
}
1542
1543
// if an incoming message has chunks at least this old
1544
// we give up on it and discard all of them. This avoids
1545
// memory leaks when a chunk is dropped.
1546
const MAX_CHUNK_TIME = 2 * 60000;
1547
1548
class SubscriptionEmitter extends EventEmitter {
1549
private incoming: { [id: string]: (Partial<Chunk> & { time: number })[] } =
1550
{};
1551
private client: Client;
1552
private closeWhenOffCalled?: boolean;
1553
private subject: string;
1554
public refCount: number = 1;
1555
1556
constructor({ client, subject, closeWhenOffCalled }) {
1557
super();
1558
this.client = client;
1559
this.subject = subject;
1560
this.client.conn.on(subject, this.handle);
1561
this.closeWhenOffCalled = closeWhenOffCalled;
1562
this.dropOldLoop();
1563
}
1564
1565
close = (force?) => {
1566
this.refCount -= 1;
1567
// console.log("SubscriptionEmitter.close - refCount =", this.refCount, this.subject);
1568
if (this.client == null || (!force && this.refCount > 0)) {
1569
return;
1570
}
1571
this.emit("closed");
1572
this.client.conn.removeListener(this.subject, this.handle);
1573
// @ts-ignore
1574
delete this.incoming;
1575
// @ts-ignore
1576
delete this.client;
1577
// @ts-ignore
1578
delete this.subject;
1579
// @ts-ignore
1580
delete this.closeWhenOffCalled;
1581
this.removeAllListeners();
1582
};
1583
1584
off(a, b) {
1585
super.off(a, b);
1586
if (this.closeWhenOffCalled) {
1587
this.close();
1588
}
1589
return this;
1590
}
1591
1592
private handle = ({ subject, data }) => {
1593
if (this.client == null) {
1594
return;
1595
}
1596
const [id, seq, done, encoding, buffer, headers] = data;
1597
// console.log({ id, seq, done, encoding, buffer, headers });
1598
const chunk = { seq, done, encoding, buffer, headers };
1599
const { incoming } = this;
1600
if (incoming[id] == null) {
1601
if (seq != 0) {
1602
// part of a dropped message -- by definition this should just
1603
// silently happen and be handled via application level encodings
1604
// elsewhere
1605
console.log(
1606
`WARNING: drop packet from ${this.subject} -- first message has wrong seq`,
1607
{ seq },
1608
);
1609
return;
1610
}
1611
incoming[id] = [];
1612
} else {
1613
const prev = incoming[id].slice(-1)[0].seq ?? -1;
1614
if (prev + 1 != seq) {
1615
console.log(
1616
`WARNING: drop packet from ${this.subject} -- seq number wrong`,
1617
{ prev, seq },
1618
);
1619
// part of message was dropped -- discard everything
1620
delete incoming[id];
1621
return;
1622
}
1623
}
1624
incoming[id].push({ ...chunk, time: Date.now() });
1625
if (chunk.done) {
1626
// console.log("assembling ", incoming[id].length, "chunks");
1627
const chunks = incoming[id].map((x) => x.buffer!);
1628
// TESTING ONLY!!
1629
// This is not necessary due to the above checks as messages arrive.
1630
// for (let i = 0; i < incoming[id].length; i++) {
1631
// if (incoming[id][i]?.seq != i) {
1632
// console.log(`WARNING: bug -- invalid chunk data! -- ${subject}`);
1633
// throw Error("bug -- invalid chunk data!");
1634
// }
1635
// }
1636
const raw = concatArrayBuffers(chunks);
1637
1638
// TESTING ONLY!!
1639
// try {
1640
// decode({ encoding, data: raw });
1641
// } catch (err) {
1642
// console.log(`ERROR - invalid data ${subject}`, incoming[id], err);
1643
// }
1644
1645
delete incoming[id];
1646
const mesg = new Message({
1647
encoding,
1648
raw,
1649
headers,
1650
client: this.client,
1651
subject,
1652
});
1653
this.emit("message", mesg);
1654
this.client.recvStats(raw.byteLength);
1655
}
1656
};
1657
1658
dropOldLoop = async () => {
1659
while (this.incoming != null) {
1660
const cutoff = Date.now() - MAX_CHUNK_TIME;
1661
for (const id in this.incoming) {
1662
const chunks = this.incoming[id];
1663
if (chunks.length > 0 && chunks[0].time <= cutoff) {
1664
console.log(
1665
`WARNING: drop partial message from ${this.subject} due to timeout`,
1666
);
1667
delete this.incoming[id];
1668
}
1669
}
1670
await delay(MAX_CHUNK_TIME / 2);
1671
}
1672
};
1673
}
1674
1675
function concatArrayBuffers(buffers) {
1676
if (buffers.length == 1) {
1677
return buffers[0];
1678
}
1679
if (Buffer.isBuffer(buffers[0])) {
1680
return Buffer.concat(buffers);
1681
}
1682
// browser fallback
1683
const totalLength = buffers.reduce((sum, buf) => sum + buf.byteLength, 0);
1684
const result = new Uint8Array(totalLength);
1685
let offset = 0;
1686
for (const buf of buffers) {
1687
result.set(new Uint8Array(buf), offset);
1688
offset += buf.byteLength;
1689
}
1690
1691
return result.buffer;
1692
}
1693
1694
export type Headers = { [key: string]: JSONValue };
1695
1696
export class MessageData<T = any> {
1697
public readonly encoding: DataEncoding;
1698
public readonly raw;
1699
public readonly headers?: Headers;
1700
1701
constructor({ encoding, raw, headers }) {
1702
this.encoding = encoding;
1703
this.raw = raw;
1704
this.headers = headers;
1705
}
1706
1707
get data(): T {
1708
return decode({ encoding: this.encoding, data: this.raw });
1709
}
1710
1711
get length(): number {
1712
// raw is binary data so it's the closest thing we have to the
1713
// size of this message. It would also make sense to include
1714
// the headers, but JSON'ing them would be expensive, so we don't.
1715
return this.raw.length;
1716
}
1717
}
1718
1719
export class Message<T = any> extends MessageData<T> {
1720
private client: Client;
1721
public readonly subject;
1722
1723
constructor({ encoding, raw, headers, client, subject }) {
1724
super({ encoding, raw, headers });
1725
this.client = client;
1726
this.subject = subject;
1727
}
1728
1729
isRequest = (): boolean => !!this.headers?.[REPLY_HEADER];
1730
1731
private respondSubject = () => {
1732
const subject = this.headers?.[REPLY_HEADER];
1733
if (!subject) {
1734
console.log(
1735
`WARNING: respond -- message to '${this.subject}' is not a request`,
1736
);
1737
return;
1738
}
1739
return `${subject}`;
1740
};
1741
1742
respondSync = (mesg, opts?: PublishOptions): { bytes: number } => {
1743
const subject = this.respondSubject();
1744
if (!subject) return { bytes: 0 };
1745
return this.client.publishSync(subject, mesg, opts);
1746
};
1747
1748
respond = async (
1749
mesg,
1750
opts: PublishOptions = {},
1751
): Promise<{ bytes: number; count: number }> => {
1752
const subject = this.respondSubject();
1753
if (!subject) {
1754
return { bytes: 0, count: 0 };
1755
}
1756
return await this.client.publish(subject, mesg, opts);
1757
};
1758
}
1759
1760
export function messageData(
1761
mesg,
1762
{ headers, raw, encoding = DEFAULT_ENCODING }: PublishOptions = {},
1763
) {
1764
return new MessageData({
1765
encoding,
1766
raw: raw ?? encode({ encoding, mesg }),
1767
headers,
1768
});
1769
}
1770
1771
export type Subscription = EventIterator<Message>;
1772
1773
export class ConatError extends Error {
1774
code: string | number;
1775
constructor(mesg: string, { code }) {
1776
super(mesg);
1777
this.code = code;
1778
}
1779
}
1780
1781
function isEmpty(obj: object): boolean {
1782
for (const _x in obj) {
1783
return false;
1784
}
1785
return true;
1786
}
1787
1788