Path: blob/master/src/packages/conat/socket/server.ts
1453 views
import { ConatSocketBase } from "./base";1import {2PING_PONG_INTERVAL,3type Command,4SOCKET_HEADER_CMD,5clientSubject,6} from "./util";7import { ServerSocket } from "./server-socket";8import { delay } from "awaiting";9import { type Headers } from "@cocalc/conat/core/client";10import { getLogger } from "@cocalc/conat/client";1112const logger = getLogger("socket:server");1314// DO NOT directly instantiate here -- instead, call the15// socket.listen method on ConatClient.1617export class ConatSocketServer extends ConatSocketBase {18initTCP() {}1920channel(channel: string) {21return this.client.socket.listen(this.subject + "." + channel, {22desc: `${this.desc ?? ""}.channel('${channel}')`,23}) as ConatSocketServer;24}2526forEach = (f: (socket: ServerSocket, id: string) => void) => {27for (const id in this.sockets) {28f(this.sockets[id], id);29}30};3132protected async run() {33this.deleteDeadSockets();34const sub = await this.client.subscribe(`${this.subject}.server.*`, {35sticky: true,36});37if (this.state == "closed") {38sub.close();39return;40}41this.sub = sub;42this.setState("ready");43for await (const mesg of this.sub) {44// console.log("got mesg", mesg.data, mesg.headers);45if (this.state == ("closed" as any)) {46return;47}48const cmd = mesg.headers?.[SOCKET_HEADER_CMD];49const id = mesg.subject.split(".").slice(-1)[0];50let socket = this.sockets[id];5152if (socket === undefined) {53if (cmd == "close") {54// already closed55continue;56}57// not connected yet -- anything except a connect message is ignored.58if (cmd != "connect") {59logger.debug(60"ignoring data from not-connected socket -- telling it to close",61{ id, cmd },62);63this.client.publishSync(clientSubject(mesg.subject), null, {64headers: { [SOCKET_HEADER_CMD]: "close" },65});66continue;67}68// new connection69socket = new ServerSocket({70conatSocket: this,71id,72subject: mesg.subject,73});74this.sockets[id] = socket;75this.emit("connection", socket);76}7778if (cmd !== undefined) {79// note: test this first since it is also a request80// a special internal control command81this.handleCommandFromClient({ socket, cmd: cmd as Command, mesg });82} else if (mesg.isRequest()) {83// a request to support the socket.on('request', (mesg) => ...) protocol:84socket.emit("request", mesg);85} else {86socket.receiveDataFromClient(mesg);87}88}89}9091private async deleteDeadSockets() {92while (this.state != "closed") {93for (const id in this.sockets) {94const socket = this.sockets[id];95if (Date.now() - socket.lastPing > PING_PONG_INTERVAL * 2.5) {96socket.destroy();97}98}99await delay(PING_PONG_INTERVAL);100}101}102103request = async (data, options?) => {104await this.waitUntilReady(options?.timeout);105106// we call all connected sockets in parallel,107// then return array of responses.108// Unless race is set, then we return first result109const v: any[] = [];110for (const id in this.sockets) {111const f = async () => {112if (this.state == "closed") {113throw Error("closed");114}115try {116return await this.sockets[id].request(data, options);117} catch (err) {118return err;119}120};121v.push(f());122}123if (options?.race) {124return await Promise.race(v);125} else {126return await Promise.all(v);127}128};129130write = (data, { headers }: { headers?: Headers } = {}): void => {131// @ts-ignore132if (this.state == "closed") {133throw Error("closed");134}135// write to all the sockets that are connected.136for (const id in this.sockets) {137this.sockets[id].write(data, headers);138}139};140141handleCommandFromClient = ({142socket,143cmd,144mesg,145}: {146socket: ServerSocket;147cmd: Command;148mesg;149}) => {150socket.lastPing = Date.now();151if (cmd == "socket") {152socket.tcp?.send.handleRequest(mesg);153} else if (cmd == "ping") {154if (socket.state == "ready") {155// ONLY respond to ping for a server socket if that socket is156// actually ready! ping's are meant to check whether the server157// socket views itself as connected right now. If not, connected,158// ping should timeout159logger.silly("responding to ping from client", socket.id);160mesg.respondSync(null);161}162} else if (cmd == "close") {163const id = socket.id;164socket.close();165delete this.sockets[id];166mesg.respondSync("closed");167} else if (cmd == "connect") {168mesg.respondSync("connected");169} else {170mesg.respondSync({ error: `unknown command - '${cmd}'` });171}172};173174async end({ timeout = 3000 }: { timeout?: number } = {}) {175if (this.state == "closed") {176return;177}178this.reconnection = false;179this.ended = true;180// tell all clients to end181const end = async (id) => {182const socket = this.sockets[id];183delete this.sockets[id];184try {185await socket.end({ timeout });186} catch (err) {187console.log("WARNING: error ending socket -- ${err}");188}189};190await Promise.all(Object.keys(this.sockets).map(end));191this.close();192}193}194195196