Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/util/async-utils.ts
1447 views
1
/*
2
* This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.
3
* License: MS-RSL – see LICENSE.md for details
4
*/
5
6
/*
7
Some async utils.
8
9
(Obviously should be moved somewhere else when the dust settles!)
10
11
The two helpful async/await libraries I found are:
12
13
- https://github.com/hunterloftis/awaiting
14
- https://github.com/masotime/async-await-utils
15
16
*/
17
18
import * as awaiting from "awaiting";
19
import { reuseInFlight } from "./reuse-in-flight";
20
21
interface RetryOptions {
22
start?: number;
23
decay?: number;
24
max?: number;
25
min?: number;
26
timeout?: number;
27
log?: (...args) => void;
28
}
29
30
// loop calling the async function f until it returns true.
31
// It optionally can take a timeout, which if hit it will
32
// throw Error('timeout'). retry_until_success below is an
33
// a variant of this pattern keeps retrying until f doesn't throw.
34
// The input function f must always return true or false,
35
// which helps a lot to avoid bugs.
36
export async function until(
37
f: (() => Promise<boolean>) | (() => boolean),
38
{
39
start = 500,
40
decay = 1.3,
41
max = 15000,
42
min = 50,
43
timeout = 0,
44
log,
45
}: RetryOptions = {},
46
) {
47
const end = timeout ? Date.now() + timeout : undefined;
48
let d = start;
49
while (end === undefined || Date.now() < end) {
50
const x = await f();
51
if (x) {
52
return;
53
}
54
if (end) {
55
d = Math.max(min, Math.min(end - Date.now(), Math.min(max, d * decay)));
56
} else {
57
d = Math.max(min, Math.min(max, d * decay));
58
}
59
log?.(`will retry in ${Math.round(d / 1000)} seconds`);
60
await awaiting.delay(d);
61
}
62
log?.("FAILED: timeout");
63
throw Error("timeout");
64
}
65
66
export { asyncDebounce, asyncThrottle } from "./async-debounce-throttle";
67
68
// turns a function of opts, which has a cb input into
69
// an async function that takes an opts with no cb as input; this is just like
70
// awaiting.callback, but for our functions that take opts.
71
// WARNING: this is different than callback from awaiting, which
72
// on which you do: callback(f, args...)
73
// With callback_opts, you do: callback_opts(f)(opts)
74
// TODO: maybe change this everwhere to callback_opts(f, opts) for consistency!
75
export function callback_opts(f: Function) {
76
return async function (opts?: any): Promise<any> {
77
if (opts === undefined) {
78
opts = {};
79
}
80
function g(cb: Function) {
81
opts.cb = cb;
82
f(opts);
83
}
84
return await awaiting.callback(g);
85
};
86
}
87
88
/* retry_until_success keeps calling an async function f with
89
exponential backoff until f does NOT raise an exception.
90
Then retry_until_success returns whatever f returned.
91
*/
92
93
interface RetryUntilSuccess<T> {
94
f: () => Promise<T>; // an async function that takes no input.
95
start_delay?: number; // milliseconds -- delay before calling second time.
96
max_delay?: number; // milliseconds -- delay at most this amount between calls
97
max_tries?: number; // maximum number of times to call f
98
max_time?: number; // milliseconds -- don't call f again if the call would start after this much time from first call
99
factor?: number; // multiply delay by this each time
100
log?: Function; // optional verbose logging function
101
desc?: string; // useful for making error messages better.
102
}
103
104
export async function retry_until_success<T>(
105
opts: RetryUntilSuccess<T>,
106
): Promise<T> {
107
if (!opts.start_delay) opts.start_delay = 100;
108
if (!opts.max_delay) opts.max_delay = 20000;
109
if (!opts.factor) opts.factor = 1.4;
110
111
let next_delay: number = opts.start_delay;
112
let tries: number = 0;
113
const start_time: number = Date.now();
114
let last_exc: Error | undefined;
115
116
// Return nonempty string if time or tries exceeded.
117
function check_done(): string {
118
if (opts.max_time && next_delay + Date.now() - start_time > opts.max_time) {
119
return "maximum time exceeded";
120
}
121
if (opts.max_tries && tries >= opts.max_tries) {
122
return "maximum tries exceeded";
123
}
124
return "";
125
}
126
127
while (true) {
128
try {
129
return await opts.f();
130
} catch (exc) {
131
//console.warn('retry_until_success', exc);
132
if (opts.log !== undefined) {
133
opts.log("failed ", exc);
134
}
135
// might try again -- update state...
136
tries += 1;
137
next_delay = Math.min(opts.max_delay, opts.factor * next_delay);
138
// check if too long or too many tries
139
const err = check_done();
140
if (err) {
141
// yep -- game over, throw an error
142
let e;
143
if (last_exc) {
144
e = Error(`${err} -- last error was '${last_exc}' -- ${opts.desc}`);
145
} else {
146
e = Error(`${err} -- ${opts.desc}`);
147
}
148
//console.warn(e);
149
throw e;
150
}
151
// record exception so can use it later.
152
last_exc = exc;
153
154
// wait before trying again
155
await awaiting.delay(next_delay);
156
}
157
}
158
}
159
160
import { EventEmitter } from "events";
161
import { CB } from "./types/database";
162
163
/* Wait for an event emitter to emit any event at all once.
164
Returns array of args emitted by that event.
165
If timeout_ms is 0 (the default) this can wait an unbounded
166
amount of time. That's intentional and does make sense
167
in our applications.
168
If timeout_ms is nonzero and event doesn't happen an
169
exception is thrown.
170
If the obj throws 'closed' before the event is emitted,
171
then this throws an error, since clearly event can never be emitted.
172
*/
173
export async function once(
174
obj: EventEmitter,
175
event: string,
176
timeout_ms: number = 0,
177
): Promise<any> {
178
if (obj == null) throw Error("once -- obj is undefined");
179
if (typeof obj.once != "function")
180
throw Error("once -- obj.once must be a function");
181
182
return new Promise((resolve, reject) => {
183
let timer: NodeJS.Timeout | undefined;
184
185
function cleanup() {
186
obj.removeListener(event, onEvent);
187
obj.removeListener("closed", onClosed);
188
if (timer) clearTimeout(timer);
189
}
190
191
function onEvent(...args: any[]) {
192
cleanup();
193
resolve(args);
194
}
195
196
function onClosed() {
197
cleanup();
198
reject(new Error(`once: "${event}" not emitted before "closed"`));
199
}
200
201
function onTimeout() {
202
cleanup();
203
reject(
204
new Error(`once: timeout of ${timeout_ms}ms waiting for "${event}"`),
205
);
206
}
207
208
obj.once(event, onEvent);
209
obj.once("closed", onClosed);
210
211
if (timeout_ms > 0) {
212
timer = setTimeout(onTimeout, timeout_ms);
213
}
214
});
215
}
216
217
// Alternative to callback_opts that behaves like the callback defined in awaiting.
218
// Pass in the type of the returned value, and it will be inferred.
219
export async function callback2<R = any>(
220
f: (opts) => void,
221
opts?: object,
222
): Promise<R> {
223
const optsCB = (opts ?? {}) as typeof opts & { cb: CB<R> };
224
function g(cb: CB<R>): void {
225
optsCB.cb = cb;
226
f(optsCB);
227
}
228
return await awaiting.callback(g);
229
}
230
231
export function reuse_in_flight_methods(
232
obj: any,
233
method_names: string[],
234
): void {
235
for (const method_name of method_names) {
236
obj[method_name] = reuseInFlight(obj[method_name].bind(obj));
237
}
238
}
239
240
// Cancel pending throttle or debounce, where f is the
241
// output of underscore.throttle (or debounce). Safe to call
242
// with f null or a normal function.
243
export function cancel_scheduled(f: any): void {
244
if (f != null && f.cancel != null) {
245
f.cancel();
246
}
247
}
248
249
// WARNING -- not tested
250
export async function async_as_callback(
251
f: Function,
252
cb: Function,
253
...args
254
): Promise<void> {
255
try {
256
await f(...args);
257
cb();
258
} catch (err) {
259
cb(err);
260
}
261
}
262
263
// From https://stackoverflow.com/questions/70470728/how-can-i-execute-some-async-tasks-in-parallel-with-limit-in-generator-function
264
export async function mapParallelLimit(values, fn, max = 10) {
265
const promises = new Set();
266
267
for (const i in values) {
268
while (promises.size >= max) {
269
await Promise.race(promises.values());
270
}
271
272
let promise = fn(values[i], i).finally(() => promises.delete(promise));
273
promises.add(promise);
274
}
275
276
return Promise.all(promises.values());
277
}
278
279
export async function parallelHandler({
280
iterable,
281
limit,
282
handle,
283
}: {
284
iterable: AsyncIterable<any>;
285
limit: number;
286
handle: (any) => Promise<void>;
287
}) {
288
const promiseQueue: Promise<void>[] = [];
289
for await (const mesg of iterable) {
290
const promise = handle(mesg).then(() => {
291
// Remove the promise from the promiseQueue once done
292
promiseQueue.splice(promiseQueue.indexOf(promise), 1);
293
});
294
promiseQueue.push(promise);
295
// If we reached the PARALLEL limit, wait for one of the
296
// promises to resolve
297
if (promiseQueue.length >= limit) {
298
await Promise.race(promiseQueue);
299
}
300
}
301
// Wait for all remaining promises to finish
302
await Promise.all(promiseQueue);
303
}
304
305
// use it like this:
306
// resp = await withTimeout(promise, 3000);
307
// and if will throw a timeout if promise takes more than 3s to resolve,
308
// though of course whatever code is running in promise doesn't actually
309
// get interrupted.
310
export async function withTimeout(p: Promise<any>, ms: number) {
311
let afterFired = false;
312
p.catch((err) => {
313
if (afterFired) {
314
console.warn("WARNING: withTimeout promise rejected", err);
315
}
316
});
317
let to;
318
return Promise.race([
319
p,
320
new Promise(
321
(_, reject) =>
322
(to = setTimeout(() => {
323
afterFired = true;
324
reject(new Error("timeout"));
325
}, ms)),
326
),
327
]).finally(() => clearTimeout(to));
328
}
329
330