Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/conat/socket/base.ts
1453 views
1
import { EventEmitter } from "events";
2
import {
3
type Client,
4
type Subscription,
5
DEFAULT_REQUEST_TIMEOUT,
6
} from "@cocalc/conat/core/client";
7
import { reuseInFlight } from "@cocalc/util/reuse-in-flight";
8
import { once } from "@cocalc/util/async-utils";
9
import {
10
type Role,
11
type State,
12
DEFAULT_MAX_QUEUE_SIZE,
13
type ConatSocketOptions,
14
RECONNECT_DELAY,
15
DEFAULT_KEEP_ALIVE,
16
DEFAULT_KEEP_ALIVE_TIMEOUT,
17
} from "./util";
18
import { type ServerSocket } from "./server-socket";
19
20
export abstract class ConatSocketBase extends EventEmitter {
21
public readonly desc?: string;
22
subject: string;
23
client: Client;
24
role: Role;
25
id: string;
26
subscribe: string;
27
sockets: { [id: string]: ServerSocket } = {};
28
subjects: {
29
server: string;
30
client: string;
31
};
32
33
sub?: Subscription;
34
state: State = "disconnected";
35
reconnection: boolean;
36
ended: boolean = false;
37
maxQueueSize: number;
38
keepAlive: number;
39
keepAliveTimeout: number;
40
41
// the following is all for compat with primus's api and has no meaning here.
42
address = { ip: "" };
43
conn: { id: string };
44
OPEN = 1;
45
CLOSE = 0;
46
readyState: 0;
47
// end compat
48
49
constructor({
50
subject,
51
client,
52
role,
53
id,
54
reconnection = true,
55
maxQueueSize = DEFAULT_MAX_QUEUE_SIZE,
56
keepAlive = DEFAULT_KEEP_ALIVE,
57
keepAliveTimeout = DEFAULT_KEEP_ALIVE_TIMEOUT,
58
desc,
59
}: ConatSocketOptions) {
60
super();
61
this.maxQueueSize = maxQueueSize;
62
this.reconnection = reconnection;
63
this.subject = subject;
64
this.client = client;
65
this.role = role;
66
this.id = id;
67
this.keepAlive = keepAlive;
68
this.keepAliveTimeout = keepAliveTimeout;
69
this.desc = desc;
70
this.conn = { id };
71
this.connect();
72
this.setMaxListeners(100);
73
}
74
75
abstract channel(channel: string);
76
77
protected abstract run();
78
79
abstract end(opts: { timeout?: number });
80
81
protected abstract initTCP();
82
83
protected setState = (state: State) => {
84
this.state = state;
85
this.emit(state);
86
};
87
88
destroy = () => this.close();
89
90
close() {
91
if (this.state == "closed") {
92
return;
93
}
94
this.setState("closed");
95
this.removeAllListeners();
96
97
this.sub?.close();
98
delete this.sub;
99
for (const id in this.sockets) {
100
this.sockets[id].destroy();
101
}
102
this.sockets = {};
103
// @ts-ignore
104
delete this.client;
105
}
106
107
disconnect = () => {
108
if (this.state == "closed") {
109
return;
110
}
111
this.setState("disconnected");
112
this.sub?.close();
113
delete this.sub;
114
for (const id in this.sockets) {
115
this.sockets[id].disconnect();
116
}
117
if (this.reconnection) {
118
setTimeout(() => {
119
this.connect();
120
}, RECONNECT_DELAY);
121
}
122
};
123
124
connect = async () => {
125
if (this.state != "disconnected") {
126
// already connected
127
return;
128
}
129
this.setState("connecting");
130
await this.client.waitUntilConnected();
131
try {
132
await this.run();
133
} catch (err) {
134
// console.log(`WARNING: ${this.role} socket connect error -- ${err}`);
135
this.disconnect();
136
}
137
};
138
139
// usually all the timeouts are the same, so this reuseInFlight is very helpful
140
waitUntilReady = reuseInFlight(async (timeout?: number) => {
141
if (this.state == "ready") {
142
return;
143
}
144
await once(this, "ready", timeout ?? DEFAULT_REQUEST_TIMEOUT);
145
if (this.state == "closed") {
146
throw Error("closed");
147
}
148
});
149
}
150
151