import * as awaiting from "awaiting";
import { reuseInFlight } from "./reuse-in-flight";
interface RetryOptions {
start?: number;
decay?: number;
max?: number;
min?: number;
timeout?: number;
log?: (...args) => void;
}
export async function until(
f: (() => Promise<boolean>) | (() => boolean),
{
start = 500,
decay = 1.3,
max = 15000,
min = 50,
timeout = 0,
log,
}: RetryOptions = {},
) {
const end = timeout ? Date.now() + timeout : undefined;
let d = start;
while (end === undefined || Date.now() < end) {
const x = await f();
if (x) {
return;
}
if (end) {
d = Math.max(min, Math.min(end - Date.now(), Math.min(max, d * decay)));
} else {
d = Math.max(min, Math.min(max, d * decay));
}
log?.(`will retry in ${Math.round(d / 1000)} seconds`);
await awaiting.delay(d);
}
log?.("FAILED: timeout");
throw Error("timeout");
}
export { asyncDebounce, asyncThrottle } from "./async-debounce-throttle";
export function callback_opts(f: Function) {
return async function (opts?: any): Promise<any> {
if (opts === undefined) {
opts = {};
}
function g(cb: Function) {
opts.cb = cb;
f(opts);
}
return await awaiting.callback(g);
};
}
interface RetryUntilSuccess<T> {
f: () => Promise<T>;
start_delay?: number;
max_delay?: number;
max_tries?: number;
max_time?: number;
factor?: number;
log?: Function;
desc?: string;
}
export async function retry_until_success<T>(
opts: RetryUntilSuccess<T>,
): Promise<T> {
if (!opts.start_delay) opts.start_delay = 100;
if (!opts.max_delay) opts.max_delay = 20000;
if (!opts.factor) opts.factor = 1.4;
let next_delay: number = opts.start_delay;
let tries: number = 0;
const start_time: number = Date.now();
let last_exc: Error | undefined;
function check_done(): string {
if (opts.max_time && next_delay + Date.now() - start_time > opts.max_time) {
return "maximum time exceeded";
}
if (opts.max_tries && tries >= opts.max_tries) {
return "maximum tries exceeded";
}
return "";
}
while (true) {
try {
return await opts.f();
} catch (exc) {
if (opts.log !== undefined) {
opts.log("failed ", exc);
}
tries += 1;
next_delay = Math.min(opts.max_delay, opts.factor * next_delay);
const err = check_done();
if (err) {
let e;
if (last_exc) {
e = Error(`${err} -- last error was '${last_exc}' -- ${opts.desc}`);
} else {
e = Error(`${err} -- ${opts.desc}`);
}
throw e;
}
last_exc = exc;
await awaiting.delay(next_delay);
}
}
}
import { EventEmitter } from "events";
import { CB } from "./types/database";
export async function once(
obj: EventEmitter,
event: string,
timeout_ms: number = 0,
): Promise<any> {
if (obj == null) throw Error("once -- obj is undefined");
if (typeof obj.once != "function")
throw Error("once -- obj.once must be a function");
return new Promise((resolve, reject) => {
let timer: NodeJS.Timeout | undefined;
function cleanup() {
obj.removeListener(event, onEvent);
obj.removeListener("closed", onClosed);
if (timer) clearTimeout(timer);
}
function onEvent(...args: any[]) {
cleanup();
resolve(args);
}
function onClosed() {
cleanup();
reject(new Error(`once: "${event}" not emitted before "closed"`));
}
function onTimeout() {
cleanup();
reject(
new Error(`once: timeout of ${timeout_ms}ms waiting for "${event}"`),
);
}
obj.once(event, onEvent);
obj.once("closed", onClosed);
if (timeout_ms > 0) {
timer = setTimeout(onTimeout, timeout_ms);
}
});
}
export async function callback2<R = any>(
f: (opts) => void,
opts?: object,
): Promise<R> {
const optsCB = (opts ?? {}) as typeof opts & { cb: CB<R> };
function g(cb: CB<R>): void {
optsCB.cb = cb;
f(optsCB);
}
return await awaiting.callback(g);
}
export function reuse_in_flight_methods(
obj: any,
method_names: string[],
): void {
for (const method_name of method_names) {
obj[method_name] = reuseInFlight(obj[method_name].bind(obj));
}
}
export function cancel_scheduled(f: any): void {
if (f != null && f.cancel != null) {
f.cancel();
}
}
export async function async_as_callback(
f: Function,
cb: Function,
...args
): Promise<void> {
try {
await f(...args);
cb();
} catch (err) {
cb(err);
}
}
export async function mapParallelLimit(values, fn, max = 10) {
const promises = new Set();
for (const i in values) {
while (promises.size >= max) {
await Promise.race(promises.values());
}
let promise = fn(values[i], i).finally(() => promises.delete(promise));
promises.add(promise);
}
return Promise.all(promises.values());
}
export async function parallelHandler({
iterable,
limit,
handle,
}: {
iterable: AsyncIterable<any>;
limit: number;
handle: (any) => Promise<void>;
}) {
const promiseQueue: Promise<void>[] = [];
for await (const mesg of iterable) {
const promise = handle(mesg).then(() => {
promiseQueue.splice(promiseQueue.indexOf(promise), 1);
});
promiseQueue.push(promise);
if (promiseQueue.length >= limit) {
await Promise.race(promiseQueue);
}
}
await Promise.all(promiseQueue);
}
export async function withTimeout(p: Promise<any>, ms: number) {
let afterFired = false;
p.catch((err) => {
if (afterFired) {
console.warn("WARNING: withTimeout promise rejected", err);
}
});
let to;
return Promise.race([
p,
new Promise(
(_, reject) =>
(to = setTimeout(() => {
afterFired = true;
reject(new Error("timeout"));
}, ms)),
),
]).finally(() => clearTimeout(to));
}