Path: blob/master/src/packages/conat/socket/client.ts
1452 views
import {1messageData,2type Subscription,3type Headers,4ConatError,5} from "@cocalc/conat/core/client";6import { ConatSocketBase } from "./base";7import { type TCP, createTCP } from "./tcp";8import {9SOCKET_HEADER_CMD,10DEFAULT_COMMAND_TIMEOUT,11type ConatSocketOptions,12} from "./util";13import { EventIterator } from "@cocalc/util/event-iterator";14import { keepAlive, KeepAlive } from "./keepalive";15import { getLogger } from "@cocalc/conat/client";16import { until } from "@cocalc/util/async-utils";1718const logger = getLogger("socket:client");1920// DO NOT directly instantiate here -- instead, call the21// socket.connect method on ConatClient.2223export class ConatSocketClient extends ConatSocketBase {24queuedWrites: { data: any; headers?: Headers }[] = [];25private tcp?: TCP;26private alive?: KeepAlive;2728constructor(opts: ConatSocketOptions) {29super(opts);30logger.silly("creating a client socket connecting to ", this.subject);31this.initTCP();32this.on("ready", () => {33for (const mesg of this.queuedWrites) {34this.sendDataToServer(mesg);35}36});37if (this.tcp == null) {38throw Error("bug");39}40}4142channel(channel: string) {43return this.client.socket.connect(this.subject + "." + channel, {44desc: `${this.desc ?? ""}.channel('${channel}')`,45maxQueueSize: this.maxQueueSize,46}) as ConatSocketClient;47}4849private initKeepAlive = () => {50this.alive?.close();51this.alive = keepAlive({52role: "client",53ping: async () =>54await this.request(null, {55headers: { [SOCKET_HEADER_CMD]: "ping" },56timeout: this.keepAliveTimeout,57}),58disconnect: this.disconnect,59keepAlive: this.keepAlive,60});61};6263initTCP() {64if (this.tcp != null) {65throw Error("this.tcp already initialized");66}67// request = send a socket request mesg to the server side of the socket68// either ack what's received or asking for a resend of missing data.69const request = async (mesg, opts?) =>70await this.client.request(`${this.subject}.server.${this.id}`, mesg, {71...opts,72headers: { ...opts?.headers, [SOCKET_HEADER_CMD]: "socket" },73});7475this.tcp = createTCP({76request,77role: this.role,78reset: this.disconnect,79send: this.sendToServer,80size: this.maxQueueSize,81});8283this.client.on("disconnected", this.tcp.send.resendLastUntilAcked);8485this.tcp.recv.on("message", (mesg) => {86this.emit("data", mesg.data, mesg.headers);87});88this.tcp.send.on("drain", () => {89this.emit("drain");90});91}9293waitUntilDrain = async () => {94await this.tcp?.send.waitUntilDrain();95};9697private sendCommandToServer = async (98cmd: "close" | "ping" | "connect",99timeout = DEFAULT_COMMAND_TIMEOUT,100) => {101logger.silly("sendCommandToServer", { cmd, timeout });102const headers = {103[SOCKET_HEADER_CMD]: cmd,104id: this.id,105};106const subject = `${this.subject}.server.${this.id}`;107const resp = await this.client.request(subject, null, {108headers,109timeout,110});111const value = resp.data;112logger.silly("sendCommandToServer: got resp", { cmd, value });113if (value?.error) {114throw Error(value?.error);115} else {116return value;117}118};119120protected async run() {121if (this.state == "closed") {122return;123}124// console.log(125// "client socket -- subscribing to ",126// `${this.subject}.client.${this.id}`,127// );128try {129logger.silly("run: getting subscription");130const sub = await this.client.subscribe(131`${this.subject}.client.${this.id}`,132);133// @ts-ignore134if (this.state == "closed") {135sub.close();136return;137}138this.sub = sub;139let resp: any = undefined;140await until(141async () => {142if (this.state == "closed") {143logger.silly("closed -- giving up on connecting");144return true;145}146try {147logger.silly("sending connect command to server");148resp = await this.sendCommandToServer("connect");149this.alive?.recv();150return true;151} catch (err) {152logger.silly("failed to connect", err);153}154return false;155},156{ start: 500, decay: 1.3, max: 10000 },157);158159if (resp != "connected") {160throw Error("failed to connect");161}162this.setState("ready");163this.initKeepAlive();164for await (const mesg of this.sub) {165this.alive?.recv();166const cmd = mesg.headers?.[SOCKET_HEADER_CMD];167if (cmd) {168logger.silly("client got cmd", cmd);169}170if (cmd == "socket") {171this.tcp?.send.handleRequest(mesg);172} else if (cmd == "close") {173this.close();174return;175} else if (cmd == "ping") {176// logger.silly("responding to ping from server", this.id);177mesg.respond(null);178} else if (mesg.isRequest()) {179// logger.silly("client got request");180this.emit("request", mesg);181} else {182// logger.silly("client got data"); //, { data: mesg.data });183this.tcp?.recv.process(mesg);184}185}186} catch (err) {187logger.silly("socket connect failed", err);188this.disconnect();189}190}191192private sendDataToServer = (mesg) => {193const subject = `${this.subject}.server.${this.id}`;194this.client.publishSync(subject, null, {195raw: mesg.raw,196headers: mesg.headers,197});198};199200private sendToServer = (mesg) => {201if (this.state != "ready") {202this.queuedWrites.push(mesg);203while (this.queuedWrites.length > this.maxQueueSize) {204this.queuedWrites.shift();205}206return;207}208// @ts-ignore209if (this.state == "closed") {210throw Error("closed");211}212if (this.role == "server") {213throw Error("sendToServer is only for use by the client");214} else {215// we are the client, so write to server216this.sendDataToServer(mesg);217}218};219220request = async (data, options?) => {221await this.waitUntilReady(options?.timeout);222const subject = `${this.subject}.server.${this.id}`;223if (this.state == "closed") {224throw Error("closed");225}226// console.log("sending request from client ", { subject, data, options });227return await this.client.request(subject, data, options);228};229230requestMany = async (data, options?): Promise<Subscription> => {231await this.waitUntilReady(options?.timeout);232const subject = `${this.subject}.server.${this.id}`;233return await this.client.requestMany(subject, data, options);234};235236async end({ timeout = 3000 }: { timeout?: number } = {}) {237if (this.state == "closed") {238return;239}240this.reconnection = false;241this.ended = true;242// tell server we're done243try {244await this.sendCommandToServer("close", timeout);245} catch {}246this.close();247}248249close() {250if (this.state == "closed") {251return;252}253this.sub?.close();254if (this.tcp != null) {255this.client.removeListener(256"disconnected",257this.tcp.send.resendLastUntilAcked,258);259}260this.queuedWrites = [];261// tell server we're gone (but don't wait)262(async () => {263try {264await this.sendCommandToServer("close");265} catch {}266})();267if (this.tcp != null) {268this.tcp.send.close();269this.tcp.recv.close();270// @ts-ignore271delete this.tcp;272}273this.alive?.close();274delete this.alive;275super.close();276}277278// writes will raise an exception if: (1) the socket is closed code='EPIPE', or (2)279// you hit maxQueueSize un-ACK'd messages, code='ENOBUFS'280write = (data, { headers }: { headers?: Headers } = {}): void => {281// @ts-ignore282if (this.state == "closed") {283throw new ConatError("closed", { code: "EPIPE" });284}285const mesg = messageData(data, { headers });286this.tcp?.send.process(mesg);287};288289iter = () => {290return new EventIterator<[any, Headers]>(this, "data");291};292}293294295