Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/conat/socket/server-socket.ts
1453 views
1
import { EventEmitter } from "events";
2
import {
3
type Headers,
4
DEFAULT_REQUEST_TIMEOUT,
5
type Message,
6
messageData,
7
ConatError,
8
} from "@cocalc/conat/core/client";
9
import { reuseInFlight } from "@cocalc/util/reuse-in-flight";
10
import { once } from "@cocalc/util/async-utils";
11
import { SOCKET_HEADER_CMD, type State, clientSubject } from "./util";
12
import { type TCP, createTCP } from "./tcp";
13
import { type ConatSocketServer } from "./server";
14
import { keepAlive, KeepAlive } from "./keepalive";
15
import { getLogger } from "@cocalc/conat/client";
16
17
const logger = getLogger("socket:server-socket");
18
19
// One specific socket from the point of view of a server.
20
export class ServerSocket extends EventEmitter {
21
private conatSocket: ConatSocketServer;
22
public readonly id: string;
23
public lastPing = Date.now();
24
25
private queuedWrites: { data: any; headers?: Headers }[] = [];
26
private clientSubject: string;
27
28
public state: State = "ready";
29
// the non-pattern subject the client connected to
30
public readonly subject: string;
31
32
// this is just for compat with conatSocket api:
33
public readonly address = { ip: "" };
34
// conn is just for compatibility with primus/socketio (?).
35
public readonly conn: { id: string };
36
37
public tcp?: TCP;
38
private alive?: KeepAlive;
39
40
constructor({ conatSocket, id, subject }) {
41
super();
42
this.subject = subject;
43
this.conatSocket = conatSocket;
44
this.clientSubject = clientSubject(subject);
45
this.id = id;
46
this.conn = { id };
47
this.initTCP();
48
if (this.tcp == null) {
49
throw Error("bug");
50
}
51
this.initKeepAlive();
52
}
53
54
private initKeepAlive = () => {
55
this.alive?.close();
56
this.alive = keepAlive({
57
role: "server",
58
ping: async () => {
59
await this.request(null, {
60
headers: { [SOCKET_HEADER_CMD]: "ping" },
61
timeout: this.conatSocket.keepAliveTimeout,
62
});
63
},
64
disconnect: this.close,
65
keepAlive: this.conatSocket.keepAlive,
66
});
67
};
68
69
initTCP() {
70
if (this.tcp != null) {
71
throw Error("this.tcp already initialized");
72
}
73
const request = async (mesg, opts?) =>
74
await this.conatSocket.client.request(this.clientSubject, mesg, {
75
...opts,
76
headers: { ...opts?.headers, [SOCKET_HEADER_CMD]: "socket" },
77
});
78
this.tcp = createTCP({
79
request,
80
role: this.conatSocket.role,
81
reset: this.close,
82
send: this.send,
83
size: this.conatSocket.maxQueueSize,
84
});
85
this.conatSocket.client.on(
86
"disconnected",
87
this.tcp.send.resendLastUntilAcked,
88
);
89
90
this.tcp.recv.on("message", (mesg) => {
91
// console.log("tcp recv emitted message", mesg.data);
92
this.emit("data", mesg.data, mesg.headers);
93
});
94
this.tcp.send.on("drain", () => {
95
this.emit("drain");
96
});
97
}
98
99
disconnect = () => {
100
this.setState("disconnected");
101
if (this.conatSocket.state == "ready") {
102
this.setState("ready");
103
} else {
104
this.conatSocket.once("ready", this.onServerSocketReady);
105
}
106
};
107
108
private onServerSocketReady = () => {
109
if (this.state != "closed") {
110
this.setState("ready");
111
}
112
};
113
114
private setState = (state: State) => {
115
this.state = state;
116
if (state == "ready") {
117
for (const mesg of this.queuedWrites) {
118
this.sendDataToClient(mesg);
119
}
120
this.queuedWrites = [];
121
}
122
this.emit(state);
123
};
124
125
end = async ({ timeout = 3000 }: { timeout?: number } = {}) => {
126
if (this.state == "closed") {
127
return;
128
}
129
try {
130
await this.conatSocket.client.publish(this.clientSubject, null, {
131
headers: { [SOCKET_HEADER_CMD]: "close" },
132
timeout,
133
});
134
} catch (err) {
135
console.log(`WARNING: error closing socket - ${err}`);
136
}
137
this.close();
138
};
139
140
destroy = () => this.close();
141
142
close = () => {
143
if (this.state == "closed") {
144
return;
145
}
146
this.conatSocket.removeListener("ready", this.onServerSocketReady);
147
this.conatSocket.client.publishSync(this.clientSubject, null, {
148
headers: { [SOCKET_HEADER_CMD]: "close" },
149
});
150
151
if (this.tcp != null) {
152
this.conatSocket.client.removeListener(
153
"disconnected",
154
this.tcp.send.resendLastUntilAcked,
155
);
156
this.tcp.send.close();
157
this.tcp.recv.close();
158
// @ts-ignore
159
delete this.tcp;
160
}
161
162
this.alive?.close();
163
delete this.alive;
164
165
this.queuedWrites = [];
166
this.setState("closed");
167
this.removeAllListeners();
168
delete this.conatSocket.sockets[this.id];
169
// @ts-ignore
170
delete this.conatSocket;
171
};
172
173
receiveDataFromClient = (mesg) => {
174
this.alive?.recv();
175
this.tcp?.recv.process(mesg);
176
};
177
178
private sendDataToClient = (mesg) => {
179
this.conatSocket.client.publishSync(this.clientSubject, null, {
180
raw: mesg.raw,
181
headers: mesg.headers,
182
});
183
};
184
185
private send = (mesg: Message) => {
186
if (this.state != "ready") {
187
this.queuedWrites.push(mesg);
188
while (this.queuedWrites.length > this.conatSocket.maxQueueSize) {
189
this.queuedWrites.shift();
190
}
191
return;
192
}
193
// @ts-ignore
194
if (this.state == "closed") {
195
return;
196
}
197
this.sendDataToClient(mesg);
198
return true;
199
};
200
201
// writes will raise an exception if: (1) the socket is closed, or (2)
202
// you hit maxQueueSize un-ACK'd messages.
203
write = (data, { headers }: { headers?: Headers } = {}) => {
204
if (this.state == "closed") {
205
throw new ConatError("closed", { code: "EPIPE" });
206
}
207
const mesg = messageData(data, { headers });
208
this.tcp?.send.process(mesg);
209
};
210
211
// use request reply where the client responds
212
request = async (data, options?) => {
213
await this.waitUntilReady(options?.timeout);
214
logger.silly("server sending request to ", this.clientSubject);
215
return await this.conatSocket.client.request(
216
this.clientSubject,
217
data,
218
options,
219
);
220
};
221
222
private waitUntilReady = reuseInFlight(async (timeout?: number) => {
223
if (this.state == "ready") {
224
return;
225
}
226
await once(this, "ready", timeout ?? DEFAULT_REQUEST_TIMEOUT);
227
if (this.state == "closed") {
228
throw Error("closed");
229
}
230
});
231
232
waitUntilDrain = async () => {
233
await this.tcp?.send.waitUntilDrain();
234
};
235
}
236
237