Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/conat/socket/client.ts
1452 views
1
import {
2
messageData,
3
type Subscription,
4
type Headers,
5
ConatError,
6
} from "@cocalc/conat/core/client";
7
import { ConatSocketBase } from "./base";
8
import { type TCP, createTCP } from "./tcp";
9
import {
10
SOCKET_HEADER_CMD,
11
DEFAULT_COMMAND_TIMEOUT,
12
type ConatSocketOptions,
13
} from "./util";
14
import { EventIterator } from "@cocalc/util/event-iterator";
15
import { keepAlive, KeepAlive } from "./keepalive";
16
import { getLogger } from "@cocalc/conat/client";
17
import { until } from "@cocalc/util/async-utils";
18
19
const logger = getLogger("socket:client");
20
21
// DO NOT directly instantiate here -- instead, call the
22
// socket.connect method on ConatClient.
23
24
export class ConatSocketClient extends ConatSocketBase {
25
queuedWrites: { data: any; headers?: Headers }[] = [];
26
private tcp?: TCP;
27
private alive?: KeepAlive;
28
29
constructor(opts: ConatSocketOptions) {
30
super(opts);
31
logger.silly("creating a client socket connecting to ", this.subject);
32
this.initTCP();
33
this.on("ready", () => {
34
for (const mesg of this.queuedWrites) {
35
this.sendDataToServer(mesg);
36
}
37
});
38
if (this.tcp == null) {
39
throw Error("bug");
40
}
41
}
42
43
channel(channel: string) {
44
return this.client.socket.connect(this.subject + "." + channel, {
45
desc: `${this.desc ?? ""}.channel('${channel}')`,
46
maxQueueSize: this.maxQueueSize,
47
}) as ConatSocketClient;
48
}
49
50
private initKeepAlive = () => {
51
this.alive?.close();
52
this.alive = keepAlive({
53
role: "client",
54
ping: async () =>
55
await this.request(null, {
56
headers: { [SOCKET_HEADER_CMD]: "ping" },
57
timeout: this.keepAliveTimeout,
58
}),
59
disconnect: this.disconnect,
60
keepAlive: this.keepAlive,
61
});
62
};
63
64
initTCP() {
65
if (this.tcp != null) {
66
throw Error("this.tcp already initialized");
67
}
68
// request = send a socket request mesg to the server side of the socket
69
// either ack what's received or asking for a resend of missing data.
70
const request = async (mesg, opts?) =>
71
await this.client.request(`${this.subject}.server.${this.id}`, mesg, {
72
...opts,
73
headers: { ...opts?.headers, [SOCKET_HEADER_CMD]: "socket" },
74
});
75
76
this.tcp = createTCP({
77
request,
78
role: this.role,
79
reset: this.disconnect,
80
send: this.sendToServer,
81
size: this.maxQueueSize,
82
});
83
84
this.client.on("disconnected", this.tcp.send.resendLastUntilAcked);
85
86
this.tcp.recv.on("message", (mesg) => {
87
this.emit("data", mesg.data, mesg.headers);
88
});
89
this.tcp.send.on("drain", () => {
90
this.emit("drain");
91
});
92
}
93
94
waitUntilDrain = async () => {
95
await this.tcp?.send.waitUntilDrain();
96
};
97
98
private sendCommandToServer = async (
99
cmd: "close" | "ping" | "connect",
100
timeout = DEFAULT_COMMAND_TIMEOUT,
101
) => {
102
logger.silly("sendCommandToServer", { cmd, timeout });
103
const headers = {
104
[SOCKET_HEADER_CMD]: cmd,
105
id: this.id,
106
};
107
const subject = `${this.subject}.server.${this.id}`;
108
const resp = await this.client.request(subject, null, {
109
headers,
110
timeout,
111
});
112
const value = resp.data;
113
logger.silly("sendCommandToServer: got resp", { cmd, value });
114
if (value?.error) {
115
throw Error(value?.error);
116
} else {
117
return value;
118
}
119
};
120
121
protected async run() {
122
if (this.state == "closed") {
123
return;
124
}
125
// console.log(
126
// "client socket -- subscribing to ",
127
// `${this.subject}.client.${this.id}`,
128
// );
129
try {
130
logger.silly("run: getting subscription");
131
const sub = await this.client.subscribe(
132
`${this.subject}.client.${this.id}`,
133
);
134
// @ts-ignore
135
if (this.state == "closed") {
136
sub.close();
137
return;
138
}
139
this.sub = sub;
140
let resp: any = undefined;
141
await until(
142
async () => {
143
if (this.state == "closed") {
144
logger.silly("closed -- giving up on connecting");
145
return true;
146
}
147
try {
148
logger.silly("sending connect command to server");
149
resp = await this.sendCommandToServer("connect");
150
this.alive?.recv();
151
return true;
152
} catch (err) {
153
logger.silly("failed to connect", err);
154
}
155
return false;
156
},
157
{ start: 500, decay: 1.3, max: 10000 },
158
);
159
160
if (resp != "connected") {
161
throw Error("failed to connect");
162
}
163
this.setState("ready");
164
this.initKeepAlive();
165
for await (const mesg of this.sub) {
166
this.alive?.recv();
167
const cmd = mesg.headers?.[SOCKET_HEADER_CMD];
168
if (cmd) {
169
logger.silly("client got cmd", cmd);
170
}
171
if (cmd == "socket") {
172
this.tcp?.send.handleRequest(mesg);
173
} else if (cmd == "close") {
174
this.close();
175
return;
176
} else if (cmd == "ping") {
177
// logger.silly("responding to ping from server", this.id);
178
mesg.respond(null);
179
} else if (mesg.isRequest()) {
180
// logger.silly("client got request");
181
this.emit("request", mesg);
182
} else {
183
// logger.silly("client got data"); //, { data: mesg.data });
184
this.tcp?.recv.process(mesg);
185
}
186
}
187
} catch (err) {
188
logger.silly("socket connect failed", err);
189
this.disconnect();
190
}
191
}
192
193
private sendDataToServer = (mesg) => {
194
const subject = `${this.subject}.server.${this.id}`;
195
this.client.publishSync(subject, null, {
196
raw: mesg.raw,
197
headers: mesg.headers,
198
});
199
};
200
201
private sendToServer = (mesg) => {
202
if (this.state != "ready") {
203
this.queuedWrites.push(mesg);
204
while (this.queuedWrites.length > this.maxQueueSize) {
205
this.queuedWrites.shift();
206
}
207
return;
208
}
209
// @ts-ignore
210
if (this.state == "closed") {
211
throw Error("closed");
212
}
213
if (this.role == "server") {
214
throw Error("sendToServer is only for use by the client");
215
} else {
216
// we are the client, so write to server
217
this.sendDataToServer(mesg);
218
}
219
};
220
221
request = async (data, options?) => {
222
await this.waitUntilReady(options?.timeout);
223
const subject = `${this.subject}.server.${this.id}`;
224
if (this.state == "closed") {
225
throw Error("closed");
226
}
227
// console.log("sending request from client ", { subject, data, options });
228
return await this.client.request(subject, data, options);
229
};
230
231
requestMany = async (data, options?): Promise<Subscription> => {
232
await this.waitUntilReady(options?.timeout);
233
const subject = `${this.subject}.server.${this.id}`;
234
return await this.client.requestMany(subject, data, options);
235
};
236
237
async end({ timeout = 3000 }: { timeout?: number } = {}) {
238
if (this.state == "closed") {
239
return;
240
}
241
this.reconnection = false;
242
this.ended = true;
243
// tell server we're done
244
try {
245
await this.sendCommandToServer("close", timeout);
246
} catch {}
247
this.close();
248
}
249
250
close() {
251
if (this.state == "closed") {
252
return;
253
}
254
this.sub?.close();
255
if (this.tcp != null) {
256
this.client.removeListener(
257
"disconnected",
258
this.tcp.send.resendLastUntilAcked,
259
);
260
}
261
this.queuedWrites = [];
262
// tell server we're gone (but don't wait)
263
(async () => {
264
try {
265
await this.sendCommandToServer("close");
266
} catch {}
267
})();
268
if (this.tcp != null) {
269
this.tcp.send.close();
270
this.tcp.recv.close();
271
// @ts-ignore
272
delete this.tcp;
273
}
274
this.alive?.close();
275
delete this.alive;
276
super.close();
277
}
278
279
// writes will raise an exception if: (1) the socket is closed code='EPIPE', or (2)
280
// you hit maxQueueSize un-ACK'd messages, code='ENOBUFS'
281
write = (data, { headers }: { headers?: Headers } = {}): void => {
282
// @ts-ignore
283
if (this.state == "closed") {
284
throw new ConatError("closed", { code: "EPIPE" });
285
}
286
const mesg = messageData(data, { headers });
287
this.tcp?.send.process(mesg);
288
};
289
290
iter = () => {
291
return new EventIterator<[any, Headers]>(this, "data");
292
};
293
}
294
295