Path: blob/master/src/packages/conat/socket/server-socket.ts
1453 views
import { EventEmitter } from "events";1import {2type Headers,3DEFAULT_REQUEST_TIMEOUT,4type Message,5messageData,6ConatError,7} from "@cocalc/conat/core/client";8import { reuseInFlight } from "@cocalc/util/reuse-in-flight";9import { once } from "@cocalc/util/async-utils";10import { SOCKET_HEADER_CMD, type State, clientSubject } from "./util";11import { type TCP, createTCP } from "./tcp";12import { type ConatSocketServer } from "./server";13import { keepAlive, KeepAlive } from "./keepalive";14import { getLogger } from "@cocalc/conat/client";1516const logger = getLogger("socket:server-socket");1718// One specific socket from the point of view of a server.19export class ServerSocket extends EventEmitter {20private conatSocket: ConatSocketServer;21public readonly id: string;22public lastPing = Date.now();2324private queuedWrites: { data: any; headers?: Headers }[] = [];25private clientSubject: string;2627public state: State = "ready";28// the non-pattern subject the client connected to29public readonly subject: string;3031// this is just for compat with conatSocket api:32public readonly address = { ip: "" };33// conn is just for compatibility with primus/socketio (?).34public readonly conn: { id: string };3536public tcp?: TCP;37private alive?: KeepAlive;3839constructor({ conatSocket, id, subject }) {40super();41this.subject = subject;42this.conatSocket = conatSocket;43this.clientSubject = clientSubject(subject);44this.id = id;45this.conn = { id };46this.initTCP();47if (this.tcp == null) {48throw Error("bug");49}50this.initKeepAlive();51}5253private initKeepAlive = () => {54this.alive?.close();55this.alive = keepAlive({56role: "server",57ping: async () => {58await this.request(null, {59headers: { [SOCKET_HEADER_CMD]: "ping" },60timeout: this.conatSocket.keepAliveTimeout,61});62},63disconnect: this.close,64keepAlive: this.conatSocket.keepAlive,65});66};6768initTCP() {69if (this.tcp != null) {70throw Error("this.tcp already initialized");71}72const request = async (mesg, opts?) =>73await this.conatSocket.client.request(this.clientSubject, mesg, {74...opts,75headers: { ...opts?.headers, [SOCKET_HEADER_CMD]: "socket" },76});77this.tcp = createTCP({78request,79role: this.conatSocket.role,80reset: this.close,81send: this.send,82size: this.conatSocket.maxQueueSize,83});84this.conatSocket.client.on(85"disconnected",86this.tcp.send.resendLastUntilAcked,87);8889this.tcp.recv.on("message", (mesg) => {90// console.log("tcp recv emitted message", mesg.data);91this.emit("data", mesg.data, mesg.headers);92});93this.tcp.send.on("drain", () => {94this.emit("drain");95});96}9798disconnect = () => {99this.setState("disconnected");100if (this.conatSocket.state == "ready") {101this.setState("ready");102} else {103this.conatSocket.once("ready", this.onServerSocketReady);104}105};106107private onServerSocketReady = () => {108if (this.state != "closed") {109this.setState("ready");110}111};112113private setState = (state: State) => {114this.state = state;115if (state == "ready") {116for (const mesg of this.queuedWrites) {117this.sendDataToClient(mesg);118}119this.queuedWrites = [];120}121this.emit(state);122};123124end = async ({ timeout = 3000 }: { timeout?: number } = {}) => {125if (this.state == "closed") {126return;127}128try {129await this.conatSocket.client.publish(this.clientSubject, null, {130headers: { [SOCKET_HEADER_CMD]: "close" },131timeout,132});133} catch (err) {134console.log(`WARNING: error closing socket - ${err}`);135}136this.close();137};138139destroy = () => this.close();140141close = () => {142if (this.state == "closed") {143return;144}145this.conatSocket.removeListener("ready", this.onServerSocketReady);146this.conatSocket.client.publishSync(this.clientSubject, null, {147headers: { [SOCKET_HEADER_CMD]: "close" },148});149150if (this.tcp != null) {151this.conatSocket.client.removeListener(152"disconnected",153this.tcp.send.resendLastUntilAcked,154);155this.tcp.send.close();156this.tcp.recv.close();157// @ts-ignore158delete this.tcp;159}160161this.alive?.close();162delete this.alive;163164this.queuedWrites = [];165this.setState("closed");166this.removeAllListeners();167delete this.conatSocket.sockets[this.id];168// @ts-ignore169delete this.conatSocket;170};171172receiveDataFromClient = (mesg) => {173this.alive?.recv();174this.tcp?.recv.process(mesg);175};176177private sendDataToClient = (mesg) => {178this.conatSocket.client.publishSync(this.clientSubject, null, {179raw: mesg.raw,180headers: mesg.headers,181});182};183184private send = (mesg: Message) => {185if (this.state != "ready") {186this.queuedWrites.push(mesg);187while (this.queuedWrites.length > this.conatSocket.maxQueueSize) {188this.queuedWrites.shift();189}190return;191}192// @ts-ignore193if (this.state == "closed") {194return;195}196this.sendDataToClient(mesg);197return true;198};199200// writes will raise an exception if: (1) the socket is closed, or (2)201// you hit maxQueueSize un-ACK'd messages.202write = (data, { headers }: { headers?: Headers } = {}) => {203if (this.state == "closed") {204throw new ConatError("closed", { code: "EPIPE" });205}206const mesg = messageData(data, { headers });207this.tcp?.send.process(mesg);208};209210// use request reply where the client responds211request = async (data, options?) => {212await this.waitUntilReady(options?.timeout);213logger.silly("server sending request to ", this.clientSubject);214return await this.conatSocket.client.request(215this.clientSubject,216data,217options,218);219};220221private waitUntilReady = reuseInFlight(async (timeout?: number) => {222if (this.state == "ready") {223return;224}225await once(this, "ready", timeout ?? DEFAULT_REQUEST_TIMEOUT);226if (this.state == "closed") {227throw Error("closed");228}229});230231waitUntilDrain = async () => {232await this.tcp?.send.waitUntilDrain();233};234}235236237