Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/conat/socket/tcp.ts
1452 views
1
/*
2
This is an implementation of the core idea of TCP, i.e.,
3
it is a "transmission control protocol", which ensures
4
in order exactly once message delivery.
5
*/
6
7
import { SOCKET_HEADER_SEQ, type Role } from "./util";
8
import { EventEmitter } from "events";
9
import {
10
type Message,
11
messageData,
12
type MessageData,
13
ConatError,
14
} from "@cocalc/conat/core/client";
15
import { reuseInFlight } from "@cocalc/util/reuse-in-flight";
16
import { once, until } from "@cocalc/util/async-utils";
17
18
const DEFAULT_TIMEOUT = 2 * 60 * 1000;
19
20
export interface TCP {
21
send: Sender;
22
recv: Receiver;
23
}
24
25
export function createTCP({ request, send, reset, role, size }): TCP {
26
return {
27
send: new Sender(send, role, size),
28
recv: new Receiver(request, reset, role),
29
};
30
}
31
32
export class Receiver extends EventEmitter {
33
private incoming?: { [id: number]: MessageData } = {};
34
private seq?: {
35
// next = seq of the next message we should emit
36
next: number;
37
// emitted = seq of the last message we actually did emit
38
emitted: number;
39
// reported = seq of last message we reported received to caller
40
reported: number;
41
// largest = largest seq of any message we have received
42
largest: number;
43
} = { next: 1, emitted: 0, reported: 0, largest: 0 };
44
45
constructor(
46
private request,
47
private reset,
48
public readonly role: Role,
49
) {
50
super();
51
}
52
53
close = () => {
54
this.removeAllListeners();
55
delete this.incoming;
56
delete this.seq;
57
};
58
59
process = (mesg: MessageData) => {
60
if (this.seq === undefined || this.incoming === undefined) return;
61
const seq = mesg.headers?.[SOCKET_HEADER_SEQ];
62
// console.log(this.role, "recv", { data: mesg.data, seq });
63
if (typeof seq != "number" || seq < 1) {
64
console.log(
65
`WARNING: ${this.role} discarding message -- seq must be a positive integer`,
66
{ seq, mesg: mesg.data, headers: mesg.headers },
67
);
68
return;
69
}
70
this.seq.largest = Math.max(seq, this.seq.largest);
71
// console.log("process", { seq, next: this.seq.next });
72
if (seq == this.seq.next) {
73
this.emitMessage(mesg, seq);
74
} else if (seq > this.seq.next) {
75
// in the future -- save until we get this.seq.next:
76
this.incoming[seq] = mesg;
77
// console.log("doing fetchMissing because: ", { seq, next: this.seq.next });
78
this.fetchMissing();
79
}
80
};
81
82
private emitMessage = (mesg, seq) => {
83
if (this.seq === undefined) return;
84
if (seq != this.seq.next) {
85
throw Error("message sequence is wrong");
86
}
87
this.seq.next = seq + 1;
88
this.seq.emitted = seq;
89
delete mesg.headers?.[SOCKET_HEADER_SEQ];
90
// console.log("emitMessage", mesg.data, {
91
// seq,
92
// next: this.seq.next,
93
// emitted: this.seq.emitted,
94
// });
95
// console.log(this.role, "tcp recv", seq, mesg.data);
96
this.emit("message", mesg);
97
this.reportReceived();
98
};
99
100
private fetchMissing = reuseInFlight(async () => {
101
if (this.seq === undefined || this.incoming === undefined) return;
102
const missing: number[] = [];
103
for (let seq = this.seq.next; seq <= this.seq.largest; seq++) {
104
if (this.incoming[seq] === undefined) {
105
missing.push(seq);
106
}
107
}
108
if (missing.length == 0) {
109
return;
110
}
111
missing.sort();
112
let resp;
113
try {
114
resp = await this.request({ socket: { missing } });
115
} catch (err) {
116
// 503 happens when the other side is temporarily not available
117
// if (err.code != 503) {
118
// console.log("WARNING: error requesting missing messages", missing, err);
119
// }
120
return;
121
}
122
if (this.seq == null) {
123
return;
124
}
125
if (resp.headers?.error) {
126
// missing data doesn't exist -- must reset
127
this.reset();
128
return;
129
}
130
// console.log("got missing", resp.data);
131
for (const x of resp.data) {
132
this.process(messageData(null, x));
133
}
134
this.emitIncoming();
135
});
136
137
private emitIncoming = () => {
138
if (this.seq === undefined || this.incoming === undefined) return;
139
// also emit any incoming that comes next
140
let seq = this.seq.next;
141
while (this.incoming[seq] != null && this.seq != null) {
142
const mesg = this.incoming[seq];
143
delete this.incoming[seq];
144
this.emitMessage(mesg, seq);
145
seq += 1;
146
}
147
this.reportReceived();
148
};
149
150
private reportReceived = async () => {
151
if (this.seq === undefined) return;
152
if (this.seq.reported >= this.seq.emitted) {
153
// nothing to report
154
return;
155
}
156
const x = { socket: { emitted: this.seq.emitted } };
157
try {
158
await this.request(x);
159
if (this.seq == null) {
160
return;
161
}
162
this.seq.reported = x.socket.emitted;
163
} catch {
164
// When things are broken this should throw, and the natura of tcp is that
165
// things should sometimes be broken.
166
}
167
};
168
}
169
170
export class Sender extends EventEmitter {
171
private outgoing: { [id: number]: Message } = {};
172
private seq = 0;
173
timeout = DEFAULT_TIMEOUT;
174
private unsent: number = 0;
175
176
constructor(
177
private send: (mesg: Message) => void,
178
public readonly role: Role,
179
private size: number,
180
) {
181
super();
182
}
183
184
close = () => {
185
this.removeAllListeners();
186
// @ts-ignore
187
delete this.outgoing;
188
// @ts-ignore
189
delete this.seq;
190
};
191
192
process = (mesg) => {
193
if (this.unsent >= this.size) {
194
throw new ConatError(
195
`WRITE FAILED: socket buffer size ${this.size} exceeded`,
196
{ code: "ENOBUFS" },
197
);
198
}
199
this.seq += 1;
200
// console.log("Sender.process", mesg.data, this.seq);
201
this.outgoing[this.seq] = mesg;
202
this.unsent++;
203
mesg.headers = { ...mesg.headers, [SOCKET_HEADER_SEQ]: this.seq };
204
// console.log(this.role, "send", { data: mesg.data, seq: this.seq });
205
this.send(mesg);
206
};
207
208
private lastAcked = (): boolean => {
209
return this.seq == 0 || this.outgoing[this.seq] === undefined;
210
};
211
212
// if socket is suspicious that the most recently sent message may
213
// have been dropped, they call this. If indeed it was not acknowledged,
214
// the last message will get sent again, which also will trigger the
215
// other side of the socket to fetch anything else that it did not receive.
216
private resendLast = () => {
217
if (this.lastAcked()) {
218
// console.log("resendLast -- nothing to do");
219
// no-op
220
}
221
// console.log("resendLast -- resending");
222
this.send(this.outgoing[this.seq]);
223
};
224
225
// this gets tested in backend/conat/test/socket/restarts.test.ts
226
resendLastUntilAcked = reuseInFlight(async () => {
227
try {
228
await until(
229
() => {
230
if (this.outgoing === undefined || this.lastAcked()) {
231
// done -- condition satisfied
232
return true;
233
}
234
this.resendLast();
235
return false;
236
},
237
{ start: 500, max: 15000, decay: 1.3, timeout: this.timeout },
238
);
239
} catch (_err) {
240
// it will throw if it hits the timeout -- silently ignore, since
241
// there's no guarantee resendLastUntilAcked actually succeeds
242
}
243
});
244
245
handleRequest = (mesg) => {
246
if (mesg.data?.socket == null || this.seq == null) {
247
return;
248
}
249
const { emitted, missing } = mesg.data.socket;
250
if (emitted != null) {
251
for (const id in this.outgoing) {
252
if (parseInt(id) <= emitted) {
253
delete this.outgoing[id];
254
this.unsent--;
255
if (this.unsent == 0) {
256
this.emit("drain");
257
}
258
}
259
}
260
mesg.respondSync({ emitted });
261
} else if (missing != null) {
262
const v: Message[] = [];
263
for (const id of missing) {
264
const x = this.outgoing[id];
265
if (x == null) {
266
// the data does not exist on this client. This should only happen, e.g.,
267
// on automatic failover with the sticky load balancer... ?
268
mesg.respondSync(null, { headers: { error: "nodata" } });
269
return;
270
}
271
v.push(x);
272
}
273
//console.log("sending missing", v);
274
mesg.respondSync(v);
275
}
276
};
277
278
waitUntilDrain = reuseInFlight(async () => {
279
if (this.unsent == 0) {
280
return;
281
}
282
try {
283
await once(this, "drain");
284
} catch (err) {
285
if (this.outgoing == null) {
286
return;
287
}
288
throw err;
289
}
290
});
291
}
292
293