Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/conat/socket/server.ts
1453 views
1
import { ConatSocketBase } from "./base";
2
import {
3
PING_PONG_INTERVAL,
4
type Command,
5
SOCKET_HEADER_CMD,
6
clientSubject,
7
} from "./util";
8
import { ServerSocket } from "./server-socket";
9
import { delay } from "awaiting";
10
import { type Headers } from "@cocalc/conat/core/client";
11
import { getLogger } from "@cocalc/conat/client";
12
13
const logger = getLogger("socket:server");
14
15
// DO NOT directly instantiate here -- instead, call the
16
// socket.listen method on ConatClient.
17
18
export class ConatSocketServer extends ConatSocketBase {
19
initTCP() {}
20
21
channel(channel: string) {
22
return this.client.socket.listen(this.subject + "." + channel, {
23
desc: `${this.desc ?? ""}.channel('${channel}')`,
24
}) as ConatSocketServer;
25
}
26
27
forEach = (f: (socket: ServerSocket, id: string) => void) => {
28
for (const id in this.sockets) {
29
f(this.sockets[id], id);
30
}
31
};
32
33
protected async run() {
34
this.deleteDeadSockets();
35
const sub = await this.client.subscribe(`${this.subject}.server.*`, {
36
sticky: true,
37
});
38
if (this.state == "closed") {
39
sub.close();
40
return;
41
}
42
this.sub = sub;
43
this.setState("ready");
44
for await (const mesg of this.sub) {
45
// console.log("got mesg", mesg.data, mesg.headers);
46
if (this.state == ("closed" as any)) {
47
return;
48
}
49
const cmd = mesg.headers?.[SOCKET_HEADER_CMD];
50
const id = mesg.subject.split(".").slice(-1)[0];
51
let socket = this.sockets[id];
52
53
if (socket === undefined) {
54
if (cmd == "close") {
55
// already closed
56
continue;
57
}
58
// not connected yet -- anything except a connect message is ignored.
59
if (cmd != "connect") {
60
logger.debug(
61
"ignoring data from not-connected socket -- telling it to close",
62
{ id, cmd },
63
);
64
this.client.publishSync(clientSubject(mesg.subject), null, {
65
headers: { [SOCKET_HEADER_CMD]: "close" },
66
});
67
continue;
68
}
69
// new connection
70
socket = new ServerSocket({
71
conatSocket: this,
72
id,
73
subject: mesg.subject,
74
});
75
this.sockets[id] = socket;
76
this.emit("connection", socket);
77
}
78
79
if (cmd !== undefined) {
80
// note: test this first since it is also a request
81
// a special internal control command
82
this.handleCommandFromClient({ socket, cmd: cmd as Command, mesg });
83
} else if (mesg.isRequest()) {
84
// a request to support the socket.on('request', (mesg) => ...) protocol:
85
socket.emit("request", mesg);
86
} else {
87
socket.receiveDataFromClient(mesg);
88
}
89
}
90
}
91
92
private async deleteDeadSockets() {
93
while (this.state != "closed") {
94
for (const id in this.sockets) {
95
const socket = this.sockets[id];
96
if (Date.now() - socket.lastPing > PING_PONG_INTERVAL * 2.5) {
97
socket.destroy();
98
}
99
}
100
await delay(PING_PONG_INTERVAL);
101
}
102
}
103
104
request = async (data, options?) => {
105
await this.waitUntilReady(options?.timeout);
106
107
// we call all connected sockets in parallel,
108
// then return array of responses.
109
// Unless race is set, then we return first result
110
const v: any[] = [];
111
for (const id in this.sockets) {
112
const f = async () => {
113
if (this.state == "closed") {
114
throw Error("closed");
115
}
116
try {
117
return await this.sockets[id].request(data, options);
118
} catch (err) {
119
return err;
120
}
121
};
122
v.push(f());
123
}
124
if (options?.race) {
125
return await Promise.race(v);
126
} else {
127
return await Promise.all(v);
128
}
129
};
130
131
write = (data, { headers }: { headers?: Headers } = {}): void => {
132
// @ts-ignore
133
if (this.state == "closed") {
134
throw Error("closed");
135
}
136
// write to all the sockets that are connected.
137
for (const id in this.sockets) {
138
this.sockets[id].write(data, headers);
139
}
140
};
141
142
handleCommandFromClient = ({
143
socket,
144
cmd,
145
mesg,
146
}: {
147
socket: ServerSocket;
148
cmd: Command;
149
mesg;
150
}) => {
151
socket.lastPing = Date.now();
152
if (cmd == "socket") {
153
socket.tcp?.send.handleRequest(mesg);
154
} else if (cmd == "ping") {
155
if (socket.state == "ready") {
156
// ONLY respond to ping for a server socket if that socket is
157
// actually ready! ping's are meant to check whether the server
158
// socket views itself as connected right now. If not, connected,
159
// ping should timeout
160
logger.silly("responding to ping from client", socket.id);
161
mesg.respondSync(null);
162
}
163
} else if (cmd == "close") {
164
const id = socket.id;
165
socket.close();
166
delete this.sockets[id];
167
mesg.respondSync("closed");
168
} else if (cmd == "connect") {
169
mesg.respondSync("connected");
170
} else {
171
mesg.respondSync({ error: `unknown command - '${cmd}'` });
172
}
173
};
174
175
async end({ timeout = 3000 }: { timeout?: number } = {}) {
176
if (this.state == "closed") {
177
return;
178
}
179
this.reconnection = false;
180
this.ended = true;
181
// tell all clients to end
182
const end = async (id) => {
183
const socket = this.sockets[id];
184
delete this.sockets[id];
185
try {
186
await socket.end({ timeout });
187
} catch (err) {
188
console.log("WARNING: error ending socket -- ${err}");
189
}
190
};
191
await Promise.all(Object.keys(this.sockets).map(end));
192
this.close();
193
}
194
}
195
196