Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/conat/service/service.ts
1452 views
1
/*
2
Simple to use UI to connect anything in cocalc via request/reply services.
3
4
- callConatService
5
- createConatService
6
7
The input is basically where the service is (account, project, public),
8
and either what message to send or how to handle messages.
9
Also if the handler throws an error, the caller will throw
10
an error too.
11
*/
12
13
import { type Location } from "@cocalc/conat/types";
14
import { conat, getLogger } from "@cocalc/conat/client";
15
import { randomId } from "@cocalc/conat/names";
16
import { EventEmitter } from "events";
17
import { encodeBase64 } from "@cocalc/conat/util";
18
import { type Client } from "@cocalc/conat/core/client";
19
import { until } from "@cocalc/util/async-utils";
20
21
const DEFAULT_TIMEOUT = 10 * 1000;
22
23
const logger = getLogger("conat:service");
24
25
export interface ServiceDescription extends Location {
26
service: string;
27
28
description?: string;
29
30
// if true and multiple servers are setup in same "location", then they ALL get to respond (sender gets first response).
31
all?: boolean;
32
33
// DEFAULT: ENABLE_SERVICE_FRAMEWORK
34
enableServiceFramework?: boolean;
35
36
subject?: string;
37
}
38
39
export interface ServiceCall extends ServiceDescription {
40
mesg: any;
41
timeout?: number;
42
43
// if it fails with error.code 503, we wait for service to be ready and try again,
44
// unless this is set -- e.g., when waiting for the service in the first
45
// place we set this to avoid an infinite loop.
46
// This now just uses the waitForInterest option to request.
47
noRetry?: boolean;
48
49
client?: Client;
50
}
51
52
export async function callConatService(opts: ServiceCall): Promise<any> {
53
// console.log("callConatService", opts);
54
const cn = opts.client ?? (await conat());
55
const subject = serviceSubject(opts);
56
let resp;
57
const timeout = opts.timeout ?? DEFAULT_TIMEOUT;
58
// ensure not undefined, since undefined can't be published.
59
const data = opts.mesg ?? null;
60
61
const doRequest = async () => {
62
resp = await cn.request(subject, data, {
63
timeout,
64
waitForInterest: !opts.noRetry,
65
});
66
const result = resp.data;
67
if (result?.error) {
68
throw Error(result.error);
69
}
70
return result;
71
};
72
return await doRequest();
73
}
74
75
export type CallConatServiceFunction = typeof callConatService;
76
77
export interface Options extends ServiceDescription {
78
description?: string;
79
version?: string;
80
handler: (mesg) => Promise<any>;
81
client?: Client;
82
}
83
84
export function createConatService(options: Options) {
85
return new ConatService(options);
86
}
87
88
export type CreateConatServiceFunction = typeof createConatService;
89
90
export function serviceSubject({
91
service,
92
93
account_id,
94
browser_id,
95
96
project_id,
97
compute_server_id,
98
99
path,
100
101
subject,
102
}: ServiceDescription): string {
103
if (subject) {
104
return subject;
105
}
106
let segments;
107
path = path ? encodeBase64(path) : "_";
108
if (!project_id && !account_id) {
109
segments = ["public", service];
110
} else if (account_id) {
111
segments = [
112
"services",
113
`account-${account_id}`,
114
browser_id ?? "_",
115
project_id ?? "_",
116
path ?? "_",
117
service,
118
];
119
} else if (project_id) {
120
segments = [
121
"services",
122
`project-${project_id}`,
123
compute_server_id ?? "_",
124
service,
125
path,
126
];
127
}
128
return segments.join(".");
129
}
130
131
export function serviceName({
132
service,
133
134
account_id,
135
browser_id,
136
137
project_id,
138
compute_server_id,
139
}: ServiceDescription): string {
140
let segments;
141
if (!project_id && !account_id) {
142
segments = [service];
143
} else if (account_id) {
144
segments = [`account-${account_id}`, browser_id ?? "-", service];
145
} else if (project_id) {
146
segments = [`project-${project_id}`, compute_server_id ?? "-", service];
147
}
148
return segments.join("-");
149
}
150
151
export function serviceDescription({
152
description,
153
path,
154
}: ServiceDescription): string {
155
return [description, path ? `\nPath: ${path}` : ""].join("");
156
}
157
158
export class ConatService extends EventEmitter {
159
private options: Options;
160
public readonly subject: string;
161
public readonly name: string;
162
private sub?;
163
164
constructor(options: Options) {
165
super();
166
this.options = options;
167
this.name = serviceName(this.options);
168
this.subject = serviceSubject(options);
169
this.runService();
170
}
171
172
private log = (...args) => {
173
logger.debug(`service:subject='${this.subject}' -- `, ...args);
174
};
175
176
// create and run the service until something goes wrong, when this
177
// willl return. It does not throw an error.
178
private runService = async () => {
179
this.emit("starting");
180
this.log("starting service", {
181
name: this.name,
182
description: this.options.description,
183
version: this.options.version,
184
});
185
const cn = this.options.client ?? (await conat());
186
const queue = this.options.all ? randomId() : "0";
187
// service=true so upon disconnect the socketio backend server
188
// immediately stops routing traffic to this.
189
this.sub = await cn.subscribe(this.subject, { queue });
190
this.emit("running");
191
await this.listen();
192
};
193
194
private listen = async () => {
195
for await (const mesg of this.sub) {
196
const request = mesg.data ?? {};
197
198
// console.logger.debug("handle conat service call", request);
199
let resp;
200
if (request == "ping") {
201
resp = "pong";
202
} else {
203
try {
204
resp = await this.options.handler(request);
205
} catch (err) {
206
resp = { error: `${err}` };
207
}
208
}
209
try {
210
await mesg.respond(resp);
211
} catch (err) {
212
const data = { error: `${err}` };
213
await mesg.respond(data);
214
}
215
}
216
};
217
218
close = () => {
219
if (!this.subject) {
220
return;
221
}
222
this.emit("closed");
223
this.removeAllListeners();
224
this.sub?.stop();
225
delete this.sub;
226
// @ts-ignore
227
delete this.subject;
228
// @ts-ignore
229
delete this.options;
230
};
231
}
232
233
interface ServiceClientOpts {
234
options: ServiceDescription;
235
maxWait?: number;
236
id?: string;
237
}
238
239
export async function pingConatService({
240
options,
241
maxWait = 3000,
242
}: ServiceClientOpts): Promise<string[]> {
243
const pong = await callConatService({
244
...options,
245
mesg: "ping",
246
timeout: Math.max(3000, maxWait),
247
// set no-retry to avoid infinite loop
248
noRetry: true,
249
});
250
return [pong];
251
}
252
253
// NOTE: anything that has to rely on waitForConatService should
254
// likely be rewritten differently...
255
export async function waitForConatService({
256
options,
257
maxWait = 60000,
258
}: {
259
options: ServiceDescription;
260
maxWait?: number;
261
}) {
262
let ping: string[] = [];
263
let pingMaxWait = 250;
264
await until(
265
async () => {
266
pingMaxWait = Math.min(3000, pingMaxWait * 1.4);
267
try {
268
ping = await pingConatService({ options, maxWait: pingMaxWait });
269
return ping.length > 0;
270
} catch {
271
return false;
272
}
273
},
274
{
275
start: 1000,
276
max: 10000,
277
decay: 1.3,
278
timeout: maxWait,
279
},
280
);
281
return ping;
282
}
283
284