import {
connect as connectToSocketIO,
type SocketOptions,
type ManagerOptions,
} from "socket.io-client";
import { EventIterator } from "@cocalc/util/event-iterator";
import type { ConnectionStats, ServerInfo } from "./types";
import * as msgpack from "@msgpack/msgpack";
import { randomId } from "@cocalc/conat/names";
import type { JSONValue } from "@cocalc/util/types";
import { EventEmitter } from "events";
import { callback } from "awaiting";
import {
isValidSubject,
isValidSubjectWithoutWildcards,
} from "@cocalc/conat/util";
import { reuseInFlight } from "@cocalc/util/reuse-in-flight";
import { once, until } from "@cocalc/util/async-utils";
import { delay } from "awaiting";
import { getLogger } from "@cocalc/conat/client";
import { refCacheSync } from "@cocalc/util/refcache";
import { join } from "path";
import { dko, type DKO } from "@cocalc/conat/sync/dko";
import { dkv, type DKVOptions, type DKV } from "@cocalc/conat/sync/dkv";
import {
dstream,
type DStreamOptions,
type DStream,
} from "@cocalc/conat/sync/dstream";
import { akv, type AKV } from "@cocalc/conat/sync/akv";
import { astream, type AStream } from "@cocalc/conat/sync/astream";
import TTL from "@isaacs/ttlcache";
import {
ConatSocketServer,
ConatSocketClient,
ServerSocket,
type SocketConfiguration,
} from "@cocalc/conat/socket";
export { type ConatSocketServer, ConatSocketClient, ServerSocket };
import {
type SyncTableOptions,
type ConatSyncTable,
createSyncTable,
} from "@cocalc/conat/sync/synctable";
export const MAX_INTEREST_TIMEOUT = 90000;
const MSGPACK_ENCODER_OPTIONS = {
ignoreUndefined: true,
};
export const STICKY_QUEUE_GROUP = "sticky";
export const DEFAULT_SOCKETIO_CLIENT_OPTIONS = {
transports: ["websocket"],
rejectUnauthorized: false,
reconnection: true,
reconnectionDelay: 1000,
reconnectionDelayMax: 15000,
reconnectionAttempts: 9999999999,
};
type State = "disconnected" | "connected" | "closed";
const logger = getLogger("core/client");
interface Options {
address?: string;
inboxPrefix?: string;
}
export type ClientOptions = Options & {
noCache?: boolean;
} & Partial<SocketOptions> &
Partial<ManagerOptions>;
const INBOX_PREFIX = "_INBOX";
const REPLY_HEADER = "CN-Reply";
const MAX_HEADER_SIZE = 100000;
const STATS_LOOP = 5000;
export const DEFAULT_SUBSCRIPTION_TIMEOUT = 60000;
export let DEFAULT_REQUEST_TIMEOUT = 7500;
export let DEFAULT_PUBLISH_TIMEOUT = 7500;
export function setDefaultTimeouts({
request = DEFAULT_REQUEST_TIMEOUT,
publish = DEFAULT_PUBLISH_TIMEOUT,
}: {
request?: number;
publish?: number;
}) {
DEFAULT_REQUEST_TIMEOUT = request;
DEFAULT_PUBLISH_TIMEOUT = publish;
}
export enum DataEncoding {
MsgPack = 0,
JsonCodec = 1,
}
interface SubscriptionOptions {
maxWait?: number;
mesgLimit?: number;
queue?: string;
sticky?: boolean;
respond?: Function;
timeout?: number;
}
const DEFAULT_ENCODING = DataEncoding.MsgPack;
function cocalcServerToSocketioAddress(url?: string): {
address: string;
path: string;
} {
url = url ?? process.env.CONAT_SERVER;
if (!url) {
throw Error(
"Must give Conat server address or set CONAT_SERVER environment variable",
);
}
const u = new URL(url, "http://dummy.org");
const address = u.origin;
const path = join(u.pathname, "conat");
return { address, path };
}
const cache = refCacheSync<ClientOptions, Client>({
name: "conat-client",
createObject: (opts: ClientOptions) => {
return new Client(opts);
},
});
export function connect(opts: ClientOptions = {}) {
if (!opts.address) {
const x = cache.one();
if (x != null) {
return x;
}
}
return cache(opts);
}
export function getClient() {
return cache.one() ?? connect();
}
export class Client extends EventEmitter {
public conn: ReturnType<typeof connectToSocketIO>;
private queueGroups: { [subject: string]: string } = {};
private subs: { [subject: string]: SubscriptionEmitter } = {};
private sockets: {
servers: { [subject: string]: ConatSocketServer };
clients: { [subject: string]: { [id: string]: ConatSocketClient } };
} = { servers: {}, clients: {} };
public readonly options: ClientOptions;
private inboxSubject: string;
private inbox?: EventEmitter;
private permissionError = {
pub: new TTL<string, string>({ ttl: 1000 * 60 }),
sub: new TTL<string, string>({ ttl: 1000 * 60 }),
};
public info: ServerInfo | undefined = undefined;
public readonly stats: ConnectionStats & {
recv0: { messages: number; bytes: number };
} = {
send: { messages: 0, bytes: 0 },
recv: { messages: 0, bytes: 0 },
recv0: { messages: 0, bytes: 0 },
subs: 0,
};
public readonly id: string = randomId();
public state: State = "disconnected";
constructor(options: ClientOptions) {
super();
this.setMaxListeners(1000);
this.options = options;
const { address, path } = cocalcServerToSocketioAddress(
this.options.address,
);
logger.debug(`Conat: Connecting to ${this.options.address}...`);
this.conn = connectToSocketIO(address, {
...DEFAULT_SOCKETIO_CLIENT_OPTIONS,
...options,
path,
});
this.conn.on("info", (info) => {
const firstTime = this.info == null;
this.info = info;
this.emit("info", info);
setTimeout(this.syncSubscriptions, firstTime ? 3000 : 0);
});
this.conn.on("permission", ({ message, type, subject }) => {
logger.debug(message);
this.permissionError[type]?.set(subject, message);
});
this.conn.on("connect", async () => {
logger.debug(`Conat: Connected to ${this.options.address}`);
if (this.conn.connected) {
this.setState("connected");
}
});
this.conn.io.on("error", (...args) => {
logger.debug(
`Conat: Error connecting to ${this.options.address} -- `,
...args,
);
});
this.conn.on("disconnect", () => {
this.stats.recv0 = { messages: 0, bytes: 0 };
this.setState("disconnected");
this.disconnectAllSockets();
});
this.initInbox();
this.statsLoop();
}
disconnect = () => {
this.disconnectAllSockets();
setTimeout(() => this.conn.io.disconnect(), 1);
};
waitUntilSignedIn = reuseInFlight(async () => {
if (
this.info == null ||
this.state != "connected" ||
this.info?.user?.error
) {
await once(this, "info");
}
});
private statsLoop = async () => {
await until(
async () => {
if (this.isClosed()) {
return true;
}
try {
await this.waitUntilConnected();
if (this.isClosed()) {
return true;
}
this.conn.emit("stats", { recv0: this.stats.recv0 });
} catch {}
return false;
},
{ start: STATS_LOOP, max: STATS_LOOP },
);
};
interest = async (subject: string): Promise<boolean> => {
return await this.waitForInterest(subject, { timeout: 0 });
};
waitForInterest = async (
subject: string,
{
timeout = MAX_INTEREST_TIMEOUT,
}: {
timeout?: number;
} = {},
) => {
if (!isValidSubjectWithoutWildcards(subject)) {
throw Error(
`subject ${subject} must be a valid subject without wildcards`,
);
}
timeout = Math.min(timeout, MAX_INTEREST_TIMEOUT);
const f = (cb) => {
this.conn
.timeout(timeout ? timeout : 10000)
.emit("wait-for-interest", { subject, timeout }, (err, response) => {
if (err) {
cb(err);
} else if (response.error) {
cb(new ConatError(response.error, { code: response.code }));
} else {
cb(undefined, response);
}
});
};
return await callback(f);
};
recvStats = (bytes: number) => {
this.stats.recv.messages += 1;
this.stats.recv.bytes += bytes;
this.stats.recv0.messages += 1;
this.stats.recv0.bytes += bytes;
};
waitUntilConnected = reuseInFlight(async () => {
if (this.conn.connected) {
return;
}
await once(this.conn, "connect");
});
waitUntilReady = reuseInFlight(async () => {
await this.waitUntilSignedIn();
await this.waitUntilConnected();
});
private setState = (state: State) => {
if (this.isClosed() || this.state == state) {
return;
}
this.state = state;
this.emit(state);
};
private temporaryInboxSubject = () => {
if (!this.inboxSubject) {
throw Error("inbox not setup properly");
}
return `${this.inboxSubject}.${randomId()}`;
};
private getInbox = reuseInFlight(async (): Promise<EventEmitter> => {
if (this.inbox == null) {
if (this.isClosed()) {
throw Error("closed");
}
await once(this, "inbox");
}
if (this.inbox == null) {
throw Error("bug");
}
return this.inbox;
});
private initInbox = async () => {
const inboxPrefix = this.options.inboxPrefix ?? INBOX_PREFIX;
if (!inboxPrefix.startsWith(INBOX_PREFIX)) {
throw Error(`custom inboxPrefix must start with '${INBOX_PREFIX}'`);
}
this.inboxSubject = `${inboxPrefix}.${randomId()}`;
let sub;
await until(
async () => {
try {
await this.waitUntilSignedIn();
sub = await this.subscribe(this.inboxSubject + ".*");
return true;
} catch (err) {
if (this.isClosed()) {
return true;
}
if (!process.env.COCALC_TEST_MODE) {
console.log(`WARNING: inbox not available -- ${err}`);
}
}
return false;
},
{ start: 1000, max: 15000 },
);
if (this.isClosed()) {
return;
}
this.inbox = new EventEmitter();
(async () => {
for await (const mesg of sub) {
if (this.inbox == null) {
return;
}
this.inbox.emit(mesg.subject, mesg);
}
})();
this.emit("inbox", this.inboxSubject);
};
private isClosed = () => {
return this.state == "closed";
};
close = () => {
if (this.isClosed()) {
return;
}
this.setState("closed");
this.removeAllListeners();
this.closeAllSockets();
delete this.sockets;
for (const subject in this.queueGroups) {
this.conn.emit("unsubscribe", { subject });
delete this.queueGroups[subject];
}
for (const sub of Object.values(this.subs)) {
sub.refCount = 0;
sub.close();
delete this.subs;
}
delete this.queueGroups;
delete this.inboxSubject;
delete this.inbox;
delete this.options;
delete this.info;
delete this.permissionError;
try {
this.conn.close();
} catch {}
};
private syncSubscriptions = reuseInFlight(async () => {
let fails = 0;
await until(
async () => {
if (this.isClosed()) return true;
try {
if (this.info == null) {
await once(this, "info");
}
if (this.isClosed()) return true;
await this.waitUntilConnected();
if (this.isClosed()) return true;
const stable = await this.syncSubscriptions0(10000);
if (stable) {
return true;
}
} catch (err) {
fails++;
if (fails >= 3) {
console.log(
`WARNING: failed to sync subscriptions ${fails} times -- ${err}`,
);
}
}
return false;
},
{ start: 1000, max: 15000 },
);
});
private syncSubscriptions0 = async (timeout: number): Promise<boolean> => {
if (this.isClosed()) return true;
if (this.info == null) {
throw Error("not signed in");
}
const subs = await this.getSubscriptions(timeout);
const missing: { subject: string; queue: string }[] = [];
for (const subject in this.queueGroups) {
if (!subs.has(subject)) {
missing.push({
subject,
queue: this.queueGroups[subject],
});
}
}
let stable = true;
if (missing.length > 0) {
stable = false;
const resp = await callback(
this.conn.timeout(timeout).emit.bind(this.conn),
"subscribe",
missing,
);
for (let i = 0; i < missing.length; i++) {
if (resp[i].error) {
const sub = this.subs[missing[i].subject];
if (sub != null) {
sub.close(true);
}
}
}
}
const extra: { subject: string }[] = [];
for (const subject in subs) {
if (this.queueGroups[subject] != null) {
extra.push({ subject });
}
}
if (extra.length > 0) {
await callback(
this.conn.timeout(timeout).emit.bind(this.conn),
"unsubscribe",
extra,
);
stable = false;
}
return stable;
};
numSubscriptions = () => Object.keys(this.queueGroups).length;
private getSubscriptions = async (
timeout = DEFAULT_REQUEST_TIMEOUT,
): Promise<Set<string>> => {
const subs = await callback(
this.conn.timeout(timeout).emit.bind(this.conn),
"subscriptions",
null,
);
return new Set(subs);
};
private subscriptionEmitter = (
subject: string,
{
closeWhenOffCalled,
queue,
sticky,
confirm,
timeout,
}: {
closeWhenOffCalled?: boolean;
queue?: string;
sticky?: boolean;
confirm?: boolean;
timeout?: number;
} = {},
): { sub: SubscriptionEmitter; promise? } => {
if (!timeout) {
timeout = DEFAULT_SUBSCRIPTION_TIMEOUT;
}
if (this.isClosed()) {
throw Error("closed");
}
if (!isValidSubject(subject)) {
throw Error(`invalid subscribe subject '${subject}'`);
}
if (this.permissionError.sub.has(subject)) {
const message = this.permissionError.sub.get(subject)!;
logger.debug(message);
throw new ConatError(message, { code: 403 });
}
if (sticky) {
if (queue) {
throw Error("must not specify queue group if sticky is true");
}
queue = STICKY_QUEUE_GROUP;
}
let sub = this.subs[subject];
if (sub != null) {
if (queue && this.queueGroups[subject] != queue) {
throw Error(
`client can only have one queue group subscription for a given subject -- subject='${subject}', queue='${queue}'`,
);
}
if (queue == STICKY_QUEUE_GROUP) {
throw Error(
`can only have one sticky subscription per client -- subject='${subject}'`,
);
}
sub.refCount += 1;
return { sub, promise: undefined };
}
if (this.queueGroups[subject] != null) {
throw Error(`already subscribed to '${subject}'`);
}
if (!queue) {
queue = randomId();
}
this.queueGroups[subject] = queue;
sub = new SubscriptionEmitter({
client: this,
subject,
closeWhenOffCalled,
});
this.subs[subject] = sub;
this.stats.subs++;
let promise;
if (confirm) {
const f = (cb) => {
const handle = (response) => {
if (response?.error) {
cb(new ConatError(response.error, { code: response.code }));
} else {
cb(response?.error, response);
}
};
if (timeout) {
this.conn
.timeout(timeout)
.emit("subscribe", { subject, queue }, (err, response) => {
if (err) {
handle({ error: `${err}`, code: 408 });
} else {
handle(response);
}
});
} else {
this.conn.emit("subscribe", { subject, queue }, handle);
}
};
promise = callback(f);
} else {
this.conn.emit("subscribe", { subject, queue });
promise = undefined;
}
sub.once("closed", () => {
if (this.isClosed()) {
return;
}
this.conn.emit("unsubscribe", { subject });
delete this.queueGroups[subject];
if (this.subs[subject] != null) {
this.stats.subs--;
delete this.subs[subject];
}
});
return { sub, promise };
};
private subscriptionIterator = (
sub,
opts?: SubscriptionOptions,
): Subscription => {
const iter = new EventIterator<Message>(sub, "message", {
idle: opts?.maxWait,
limit: opts?.mesgLimit,
map: (args) => args[0],
});
return iter;
};
subscribeSync = (
subject: string,
opts?: SubscriptionOptions,
): Subscription => {
const { sub } = this.subscriptionEmitter(subject, {
confirm: false,
closeWhenOffCalled: true,
sticky: opts?.sticky,
queue: opts?.queue,
});
return this.subscriptionIterator(sub, opts);
};
subscribe = async (
subject: string,
opts?: SubscriptionOptions,
): Promise<Subscription> => {
await this.waitUntilSignedIn();
const { sub, promise } = this.subscriptionEmitter(subject, {
confirm: true,
closeWhenOffCalled: true,
queue: opts?.queue,
sticky: opts?.sticky,
timeout: opts?.timeout,
});
try {
await promise;
} catch (err) {
sub.close();
throw err;
}
return this.subscriptionIterator(sub, opts);
};
sub = this.subscribe;
service: <T = any>(
subject: string,
impl: T,
opts?: SubscriptionOptions,
) => Promise<Subscription> = async (subject, impl, opts) => {
const sub = await this.subscribe(subject, {
...opts,
queue: opts?.queue ?? "0",
});
const respond = async (mesg: Message) => {
try {
const [name, args] = mesg.data;
let f = impl[name];
if (f == null) {
throw Error(`${name} not defined`);
}
const result = await f.apply(mesg, args);
mesg.respondSync(result);
} catch (err) {
mesg.respondSync(null, { headers: { error: `${err}` } });
}
};
const loop = async () => {
for await (const mesg of sub) {
respond(mesg);
}
};
loop();
return sub;
};
call<T = any>(subject: string, opts?: PublishOptions): T {
const call = async (name: string, args: any[]) => {
const resp = await this.request(subject, [name, args], opts);
if (resp.headers?.error) {
throw Error(`${resp.headers.error}`);
} else {
return resp.data;
}
};
return new Proxy(
{},
{
get: (_, name) => {
if (typeof name !== "string") {
return undefined;
}
return async (...args) => await call(name, args);
},
},
) as T;
}
callMany<T = any>(subject: string, opts?: RequestManyOptions): T {
const maxWait = opts?.maxWait ? opts?.maxWait : DEFAULT_REQUEST_TIMEOUT;
const self = this;
async function* callMany(name: string, args: any[]) {
const sub = await self.requestMany(subject, [name, args], {
...opts,
maxWait,
});
for await (const resp of sub) {
if (resp.headers?.error) {
yield new ConatError(`${resp.headers.error}`, {
code: resp.headers.code,
});
} else {
yield resp.data;
}
}
}
return new Proxy(
{},
{
get: (_, name) => {
if (typeof name !== "string") {
return undefined;
}
return async (...args) => await callMany(name, args);
},
},
) as T;
}
publishSync = (
subject: string,
mesg,
opts?: PublishOptions,
): { bytes: number } => {
if (this.isClosed()) {
return { bytes: 0 };
}
return this._publish(subject, mesg, opts);
};
publish = async (
subject: string,
mesg,
opts?: PublishOptions,
): Promise<{
bytes: number;
count: number;
}> => {
if (this.isClosed()) {
return { bytes: 0, count: 0 };
}
await this.waitUntilSignedIn();
const { bytes, getCount, promise } = this._publish(subject, mesg, {
...opts,
confirm: true,
});
await promise;
return { bytes, count: getCount?.()! };
};
private _publish = (
subject: string,
mesg,
{
headers,
raw,
encoding = DEFAULT_ENCODING,
confirm,
timeout = DEFAULT_PUBLISH_TIMEOUT,
}: PublishOptions & { confirm?: boolean } = {},
) => {
if (this.isClosed()) {
return { bytes: 0 };
}
if (!isValidSubjectWithoutWildcards(subject)) {
throw Error(`invalid publish subject ${subject}`);
}
if (this.permissionError.pub.has(subject)) {
const message = this.permissionError.pub.get(subject)!;
logger.debug(message);
throw new ConatError(message, { code: 403 });
}
raw = raw ?? encode({ encoding, mesg });
this.stats.send.messages += 1;
this.stats.send.bytes += raw.length;
const chunkSize = Math.max(
1000,
(this.info?.max_payload ?? 1e6) - MAX_HEADER_SIZE,
);
let seq = 0;
let id = randomId();
const promises: any[] = [];
let count = 0;
for (let i = 0; i < raw.length; i += chunkSize) {
const done = i + chunkSize >= raw.length ? 1 : 0;
const v: any[] = [
subject,
id,
seq,
done,
encoding,
raw.slice(i, i + chunkSize),
];
if (done && headers) {
v.push(headers);
}
if (confirm) {
let done = false;
const f = (cb) => {
const handle = (response) => {
if (response?.error) {
cb(new ConatError(response.error, { code: response.code }));
} else {
cb(response?.error, response);
}
};
if (timeout) {
const timer = setTimeout(() => {
done = true;
cb(new ConatError("timeout", { code: 408 }));
}, timeout);
this.conn.timeout(timeout).emit("publish", v, (err, response) => {
if (done) {
return;
}
clearTimeout(timer);
if (err) {
handle({ error: `${err}`, code: 408 });
} else {
handle(response);
}
});
} else {
this.conn.emit("publish", v, handle);
}
};
const promise = (async () => {
const response = await callback(f);
count = Math.max(count, response.count ?? 0);
})();
promises.push(promise);
} else {
this.conn.emit("publish", v);
}
seq += 1;
}
if (confirm) {
return {
bytes: raw.length,
getCount: () => count,
promise: Promise.all(promises),
};
}
return { bytes: raw.length };
};
pub = this.publish;
request = async (
subject: string,
mesg: any,
{
timeout = DEFAULT_REQUEST_TIMEOUT,
waitForInterest = false,
...options
}: PublishOptions & { timeout?: number; waitForInterest?: boolean } = {},
): Promise<Message> => {
if (timeout <= 0) {
throw Error("timeout must be positive");
}
const start = Date.now();
const inbox = await this.getInbox();
const inboxSubject = this.temporaryInboxSubject();
const sub = new EventIterator<Message>(inbox, inboxSubject, {
idle: timeout,
limit: 1,
map: (args) => args[0],
});
const opts = {
...options,
timeout,
headers: { ...options?.headers, [REPLY_HEADER]: inboxSubject },
};
const { count } = await this.publish(subject, mesg, opts);
if (!count) {
const giveUp = () => {
sub.stop();
throw new ConatError(
`request -- no subscribers matching '${subject}'`,
{
code: 503,
},
);
};
if (waitForInterest) {
await this.waitForInterest(subject, { timeout });
if (this.state == "closed") {
throw Error("closed");
}
const remaining = timeout - (Date.now() - start);
if (remaining <= 1000) {
throw new ConatError("timeout", { code: 408 });
}
const { count } = await this.publish(subject, mesg, {
...opts,
timeout: remaining,
});
if (!count) {
giveUp();
}
} else {
giveUp();
}
}
for await (const resp of sub) {
sub.stop();
return resp;
}
sub.stop();
throw new ConatError("timeout", { code: 408 });
};
requestMany = async (
subject: string,
mesg: any,
{ maxMessages, maxWait, ...options }: RequestManyOptions = {},
): Promise<Subscription> => {
if (maxMessages != null && maxMessages <= 0) {
throw Error("maxMessages must be positive");
}
if (maxWait != null && maxWait <= 0) {
throw Error("maxWait must be positive");
}
const inbox = await this.getInbox();
const inboxSubject = this.temporaryInboxSubject();
const sub = new EventIterator<Message>(inbox, inboxSubject, {
idle: maxWait,
limit: maxMessages,
map: (args) => args[0],
});
const { count } = await this.publish(subject, mesg, {
...options,
headers: { ...options?.headers, [REPLY_HEADER]: inboxSubject },
});
if (!count) {
sub.stop();
throw new ConatError(
`requestMany -- no subscribers matching ${subject}`,
{ code: 503 },
);
}
return sub;
};
watch = (
subject: string,
cb = (x) => console.log(`${new Date()}: ${x.subject}:`, x.data, x.headers),
opts?: SubscriptionOptions,
) => {
const sub = this.subscribeSync(subject, opts);
const f = async () => {
for await (const x of sub) {
cb(x);
}
};
f();
return sub;
};
sync = {
dkv: async (opts: DKVOptions): Promise<DKV> =>
await dkv({ ...opts, client: this }),
akv: async (opts: DKVOptions): Promise<AKV> =>
await akv({ ...opts, client: this }),
dko: async (opts: DKVOptions): Promise<DKO> =>
await dko({ ...opts, client: this }),
dstream: async (opts: DStreamOptions): Promise<DStream> =>
await dstream({ ...opts, client: this }),
astream: async (opts: DStreamOptions): Promise<AStream> =>
await astream({ ...opts, client: this }),
synctable: async (opts: SyncTableOptions): Promise<ConatSyncTable> =>
await createSyncTable({ ...opts, client: this }),
};
socket = {
listen: (
subject: string,
opts?: SocketConfiguration,
): ConatSocketServer => {
if (this.state == "closed") {
throw Error("closed");
}
if (this.sockets.servers[subject] !== undefined) {
throw Error(
`there can be at most one socket server per client listening on a subject (subject='${subject}')`,
);
}
const server = new ConatSocketServer({
subject,
role: "server",
client: this,
id: this.id,
...opts,
});
this.sockets.servers[subject] = server;
server.once("closed", () => {
delete this.sockets.servers[subject];
});
return server;
},
connect: (
subject: string,
opts?: SocketConfiguration,
): ConatSocketClient => {
if (this.state == "closed") {
throw Error("closed");
}
const id = randomId();
const client = new ConatSocketClient({
subject,
role: "client",
client: this,
id,
...opts,
});
if (this.sockets.clients[subject] === undefined) {
this.sockets.clients[subject] = { [id]: client };
} else {
this.sockets.clients[subject][id] = client;
}
client.once("closed", () => {
const v = this.sockets.clients[subject];
if (v != null) {
delete v[id];
if (isEmpty(v)) {
delete this.sockets.clients[subject];
}
}
});
return client;
},
};
private disconnectAllSockets = () => {
for (const subject in this.sockets.servers) {
this.sockets.servers[subject].disconnect();
}
for (const subject in this.sockets.clients) {
for (const id in this.sockets.clients[subject]) {
this.sockets.clients[subject][id].disconnect();
}
}
};
private closeAllSockets = () => {
for (const subject in this.sockets.servers) {
this.sockets.servers[subject].close();
}
for (const subject in this.sockets.clients) {
for (const id in this.sockets.clients[subject]) {
this.sockets.clients[subject][id].close();
}
}
};
message = (mesg, options?) => messageData(mesg, options);
}
interface PublishOptions {
headers?: Headers;
encoding?: DataEncoding;
raw?;
timeout?: number;
}
interface RequestManyOptions extends PublishOptions {
maxWait?: number;
maxMessages?: number;
}
export function encode({
encoding,
mesg,
}: {
encoding: DataEncoding;
mesg: any;
}) {
if (encoding == DataEncoding.MsgPack) {
return msgpack.encode(mesg, MSGPACK_ENCODER_OPTIONS);
} else if (encoding == DataEncoding.JsonCodec) {
return jsonEncoder(mesg);
} else {
throw Error(`unknown encoding ${encoding}`);
}
}
export function decode({
encoding,
data,
}: {
encoding: DataEncoding;
data;
}): any {
if (encoding == DataEncoding.MsgPack) {
return msgpack.decode(data);
} else if (encoding == DataEncoding.JsonCodec) {
return jsonDecoder(data);
} else {
throw Error(`unknown encoding ${encoding}`);
}
}
let textEncoder: TextEncoder | undefined = undefined;
let textDecoder: TextDecoder | undefined = undefined;
function jsonEncoder(obj: any) {
if (textEncoder === undefined) {
textEncoder = new TextEncoder();
}
return textEncoder.encode(JSON.stringify(obj));
}
function jsonDecoder(data: Buffer): any {
if (textDecoder === undefined) {
textDecoder = new TextDecoder();
}
return JSON.parse(textDecoder.decode(data));
}
interface Chunk {
id: string;
seq: number;
done: number;
buffer: Buffer;
headers?: any;
}
const MAX_CHUNK_TIME = 2 * 60000;
class SubscriptionEmitter extends EventEmitter {
private incoming: { [id: string]: (Partial<Chunk> & { time: number })[] } =
{};
private client: Client;
private closeWhenOffCalled?: boolean;
private subject: string;
public refCount: number = 1;
constructor({ client, subject, closeWhenOffCalled }) {
super();
this.client = client;
this.subject = subject;
this.client.conn.on(subject, this.handle);
this.closeWhenOffCalled = closeWhenOffCalled;
this.dropOldLoop();
}
close = (force?) => {
this.refCount -= 1;
if (this.client == null || (!force && this.refCount > 0)) {
return;
}
this.emit("closed");
this.client.conn.removeListener(this.subject, this.handle);
delete this.incoming;
delete this.client;
delete this.subject;
delete this.closeWhenOffCalled;
this.removeAllListeners();
};
off(a, b) {
super.off(a, b);
if (this.closeWhenOffCalled) {
this.close();
}
return this;
}
private handle = ({ subject, data }) => {
if (this.client == null) {
return;
}
const [id, seq, done, encoding, buffer, headers] = data;
const chunk = { seq, done, encoding, buffer, headers };
const { incoming } = this;
if (incoming[id] == null) {
if (seq != 0) {
console.log(
`WARNING: drop packet from ${this.subject} -- first message has wrong seq`,
{ seq },
);
return;
}
incoming[id] = [];
} else {
const prev = incoming[id].slice(-1)[0].seq ?? -1;
if (prev + 1 != seq) {
console.log(
`WARNING: drop packet from ${this.subject} -- seq number wrong`,
{ prev, seq },
);
delete incoming[id];
return;
}
}
incoming[id].push({ ...chunk, time: Date.now() });
if (chunk.done) {
const chunks = incoming[id].map((x) => x.buffer!);
const raw = concatArrayBuffers(chunks);
delete incoming[id];
const mesg = new Message({
encoding,
raw,
headers,
client: this.client,
subject,
});
this.emit("message", mesg);
this.client.recvStats(raw.byteLength);
}
};
dropOldLoop = async () => {
while (this.incoming != null) {
const cutoff = Date.now() - MAX_CHUNK_TIME;
for (const id in this.incoming) {
const chunks = this.incoming[id];
if (chunks.length > 0 && chunks[0].time <= cutoff) {
console.log(
`WARNING: drop partial message from ${this.subject} due to timeout`,
);
delete this.incoming[id];
}
}
await delay(MAX_CHUNK_TIME / 2);
}
};
}
function concatArrayBuffers(buffers) {
if (buffers.length == 1) {
return buffers[0];
}
if (Buffer.isBuffer(buffers[0])) {
return Buffer.concat(buffers);
}
const totalLength = buffers.reduce((sum, buf) => sum + buf.byteLength, 0);
const result = new Uint8Array(totalLength);
let offset = 0;
for (const buf of buffers) {
result.set(new Uint8Array(buf), offset);
offset += buf.byteLength;
}
return result.buffer;
}
export type Headers = { [key: string]: JSONValue };
export class MessageData<T = any> {
public readonly encoding: DataEncoding;
public readonly raw;
public readonly headers?: Headers;
constructor({ encoding, raw, headers }) {
this.encoding = encoding;
this.raw = raw;
this.headers = headers;
}
get data(): T {
return decode({ encoding: this.encoding, data: this.raw });
}
get length(): number {
return this.raw.length;
}
}
export class Message<T = any> extends MessageData<T> {
private client: Client;
public readonly subject;
constructor({ encoding, raw, headers, client, subject }) {
super({ encoding, raw, headers });
this.client = client;
this.subject = subject;
}
isRequest = (): boolean => !!this.headers?.[REPLY_HEADER];
private respondSubject = () => {
const subject = this.headers?.[REPLY_HEADER];
if (!subject) {
console.log(
`WARNING: respond -- message to '${this.subject}' is not a request`,
);
return;
}
return `${subject}`;
};
respondSync = (mesg, opts?: PublishOptions): { bytes: number } => {
const subject = this.respondSubject();
if (!subject) return { bytes: 0 };
return this.client.publishSync(subject, mesg, opts);
};
respond = async (
mesg,
opts: PublishOptions = {},
): Promise<{ bytes: number; count: number }> => {
const subject = this.respondSubject();
if (!subject) {
return { bytes: 0, count: 0 };
}
return await this.client.publish(subject, mesg, opts);
};
}
export function messageData(
mesg,
{ headers, raw, encoding = DEFAULT_ENCODING }: PublishOptions = {},
) {
return new MessageData({
encoding,
raw: raw ?? encode({ encoding, mesg }),
headers,
});
}
export type Subscription = EventIterator<Message>;
export class ConatError extends Error {
code: string | number;
constructor(mesg: string, { code }) {
super(mesg);
this.code = code;
}
}
function isEmpty(obj: object): boolean {
for (const _x in obj) {
return false;
}
return true;
}