import { SOCKET_HEADER_SEQ, type Role } from "./util";
import { EventEmitter } from "events";
import {
type Message,
messageData,
type MessageData,
ConatError,
} from "@cocalc/conat/core/client";
import { reuseInFlight } from "@cocalc/util/reuse-in-flight";
import { once, until } from "@cocalc/util/async-utils";
const DEFAULT_TIMEOUT = 2 * 60 * 1000;
export interface TCP {
send: Sender;
recv: Receiver;
}
export function createTCP({ request, send, reset, role, size }): TCP {
return {
send: new Sender(send, role, size),
recv: new Receiver(request, reset, role),
};
}
export class Receiver extends EventEmitter {
private incoming?: { [id: number]: MessageData } = {};
private seq?: {
next: number;
emitted: number;
reported: number;
largest: number;
} = { next: 1, emitted: 0, reported: 0, largest: 0 };
constructor(
private request,
private reset,
public readonly role: Role,
) {
super();
}
close = () => {
this.removeAllListeners();
delete this.incoming;
delete this.seq;
};
process = (mesg: MessageData) => {
if (this.seq === undefined || this.incoming === undefined) return;
const seq = mesg.headers?.[SOCKET_HEADER_SEQ];
if (typeof seq != "number" || seq < 1) {
console.log(
`WARNING: ${this.role} discarding message -- seq must be a positive integer`,
{ seq, mesg: mesg.data, headers: mesg.headers },
);
return;
}
this.seq.largest = Math.max(seq, this.seq.largest);
if (seq == this.seq.next) {
this.emitMessage(mesg, seq);
} else if (seq > this.seq.next) {
this.incoming[seq] = mesg;
this.fetchMissing();
}
};
private emitMessage = (mesg, seq) => {
if (this.seq === undefined) return;
if (seq != this.seq.next) {
throw Error("message sequence is wrong");
}
this.seq.next = seq + 1;
this.seq.emitted = seq;
delete mesg.headers?.[SOCKET_HEADER_SEQ];
this.emit("message", mesg);
this.reportReceived();
};
private fetchMissing = reuseInFlight(async () => {
if (this.seq === undefined || this.incoming === undefined) return;
const missing: number[] = [];
for (let seq = this.seq.next; seq <= this.seq.largest; seq++) {
if (this.incoming[seq] === undefined) {
missing.push(seq);
}
}
if (missing.length == 0) {
return;
}
missing.sort();
let resp;
try {
resp = await this.request({ socket: { missing } });
} catch (err) {
return;
}
if (this.seq == null) {
return;
}
if (resp.headers?.error) {
this.reset();
return;
}
for (const x of resp.data) {
this.process(messageData(null, x));
}
this.emitIncoming();
});
private emitIncoming = () => {
if (this.seq === undefined || this.incoming === undefined) return;
let seq = this.seq.next;
while (this.incoming[seq] != null && this.seq != null) {
const mesg = this.incoming[seq];
delete this.incoming[seq];
this.emitMessage(mesg, seq);
seq += 1;
}
this.reportReceived();
};
private reportReceived = async () => {
if (this.seq === undefined) return;
if (this.seq.reported >= this.seq.emitted) {
return;
}
const x = { socket: { emitted: this.seq.emitted } };
try {
await this.request(x);
if (this.seq == null) {
return;
}
this.seq.reported = x.socket.emitted;
} catch {
}
};
}
export class Sender extends EventEmitter {
private outgoing: { [id: number]: Message } = {};
private seq = 0;
timeout = DEFAULT_TIMEOUT;
private unsent: number = 0;
constructor(
private send: (mesg: Message) => void,
public readonly role: Role,
private size: number,
) {
super();
}
close = () => {
this.removeAllListeners();
delete this.outgoing;
delete this.seq;
};
process = (mesg) => {
if (this.unsent >= this.size) {
throw new ConatError(
`WRITE FAILED: socket buffer size ${this.size} exceeded`,
{ code: "ENOBUFS" },
);
}
this.seq += 1;
this.outgoing[this.seq] = mesg;
this.unsent++;
mesg.headers = { ...mesg.headers, [SOCKET_HEADER_SEQ]: this.seq };
this.send(mesg);
};
private lastAcked = (): boolean => {
return this.seq == 0 || this.outgoing[this.seq] === undefined;
};
private resendLast = () => {
if (this.lastAcked()) {
}
this.send(this.outgoing[this.seq]);
};
resendLastUntilAcked = reuseInFlight(async () => {
try {
await until(
() => {
if (this.outgoing === undefined || this.lastAcked()) {
return true;
}
this.resendLast();
return false;
},
{ start: 500, max: 15000, decay: 1.3, timeout: this.timeout },
);
} catch (_err) {
}
});
handleRequest = (mesg) => {
if (mesg.data?.socket == null || this.seq == null) {
return;
}
const { emitted, missing } = mesg.data.socket;
if (emitted != null) {
for (const id in this.outgoing) {
if (parseInt(id) <= emitted) {
delete this.outgoing[id];
this.unsent--;
if (this.unsent == 0) {
this.emit("drain");
}
}
}
mesg.respondSync({ emitted });
} else if (missing != null) {
const v: Message[] = [];
for (const id of missing) {
const x = this.outgoing[id];
if (x == null) {
mesg.respondSync(null, { headers: { error: "nodata" } });
return;
}
v.push(x);
}
mesg.respondSync(v);
}
};
waitUntilDrain = reuseInFlight(async () => {
if (this.unsent == 0) {
return;
}
try {
await once(this, "drain");
} catch (err) {
if (this.outgoing == null) {
return;
}
throw err;
}
});
}