Path: blob/master/src/packages/util/event-iterator.ts
1447 views
/*1LICENSE: MIT23This is a slight fork of45https://github.com/sapphiredev/utilities/tree/main/packages/event-iterator67because upstream is slightly broken and what it actually does doesn't8agree with the docs. I can see why. Upstream would capture ['arg1','arg2']]9for an event emitter doing this1011emitter.emit('foo', 'arg1', 'arg2')1213But for our application we only want 'arg1'. I thus added a map option,14which makes it easy to do what we want.15*/1617import type { EventEmitter } from "node:events";1819/**20* A filter for an EventIterator.21*/22export type EventIteratorFilter<V> = (value: V) => boolean;2324/**25* Options to be passed to an EventIterator.26*/27export interface EventIteratorOptions<V> {28/**29* The filter.30*/31filter?: EventIteratorFilter<V>;3233// maps the array of args emitted by the event emitter a V34map?: (args: any[]) => V;3536/**37* The timeout in ms before ending the EventIterator.38*/39idle?: number;4041/**42* The limit of events that pass the filter to iterate.43*/44limit?: number;4546// called when iterator ends -- use to do cleanup.47onEnd?: (iter?: EventIterator<V>) => void;48}4950/**51* An EventIterator, used for asynchronously iterating over received values.52*/53export class EventIterator<V extends unknown>54implements AsyncIterableIterator<V>55{56/**57* The emitter to listen to.58*/59public readonly emitter: EventEmitter;6061/**62* The event the event iterator is listening for to receive values from.63*/64public readonly event: string;6566/**67* The filter used to filter out values.68*/69public filter: EventIteratorFilter<V>;7071public map;7273/**74* Whether or not the EventIterator has ended.75*/76#ended = false;7778private onEnd?: (iter?: EventIterator<V>) => void;7980/**81* The amount of idle time in ms before moving on.82*/83readonly #idle?: number;8485/**86* The queue of received values.87*/88#queue: V[] = [];8990private err: any = undefined;9192/**93* The amount of events that have passed the filter.94*/95#passed = 0;9697/**98* The limit before ending the EventIterator.99*/100readonly #limit: number;101102/**103* The timer to track when this will idle out.104*/105#idleTimer: NodeJS.Timeout | undefined | null = null;106107/**108* The push handler with context bound to the instance.109*/110readonly #push: (this: EventIterator<V>, ...value: unknown[]) => void;111112/**113* @param emitter The event emitter to listen to.114* @param event The event we're listening for to receives values from.115* @param options Any extra options.116*/117public constructor(118emitter: EventEmitter,119event: string,120options: EventIteratorOptions<V> = {},121) {122this.emitter = emitter;123this.event = event;124this.map = options.map ?? ((args) => args);125this.#limit = options.limit ?? Infinity;126this.#idle = options.idle;127this.filter = options.filter ?? ((): boolean => true);128this.onEnd = options.onEnd;129130// This timer is to idle out on lack of valid responses131if (this.#idle) {132// NOTE: this same code is in next in case when we can't use refresh133this.#idleTimer = setTimeout(this.end.bind(this), this.#idle);134}135this.#push = this.push.bind(this);136const maxListeners = this.emitter.getMaxListeners();137if (maxListeners !== 0) this.emitter.setMaxListeners(maxListeners + 1);138139this.emitter.on(this.event, this.#push);140}141142/**143* Whether or not the EventIterator has ended.144*/145public get ended(): boolean {146return this.#ended;147}148149/**150* Ends the EventIterator.151*/152public end(): void {153if (this.#ended) return;154this.#ended = true;155this.#queue = [];156157this.emitter.off(this.event, this.#push);158const maxListeners = this.emitter.getMaxListeners();159if (maxListeners !== 0) {160this.emitter.setMaxListeners(maxListeners - 1);161}162this.onEnd?.(this);163}164// aliases to match usage in NATS and CoCalc.165close = this.end;166stop = this.end;167168drain(): void {169// just immediately end170this.end();171// [ ] TODO: for compat. I'm not sure what this should be172// or if it matters...173// console.log("WARNING: TODO -- event-iterator drain not implemented");174}175176/**177* The next value that's received from the EventEmitter.178*/179public async next(): Promise<IteratorResult<V>> {180if (this.err) {181const err = this.err;182delete this.err;183this.end();184throw err;185}186// If there are elements in the queue, return an undone response:187if (this.#queue.length) {188const value = this.#queue.shift()!;189if (!this.filter(value)) {190return this.next();191}192if (++this.#passed >= this.#limit) {193this.end();194}195if (this.#idleTimer) {196if (this.#idleTimer.refresh != null) {197this.#idleTimer.refresh();198} else {199clearTimeout(this.#idleTimer);200this.#idleTimer = setTimeout(this.end.bind(this), this.#idle);201}202}203204return { done: false, value };205}206207// If the iterator ended, clean-up timer and return a done response:208if (this.#ended) {209if (this.#idleTimer) clearTimeout(this.#idleTimer);210return { done: true, value: undefined as never };211}212213// Listen for a new element from the emitter:214return new Promise<IteratorResult<V>>((resolve) => {215let idleTimer: NodeJS.Timeout | undefined | null = null;216217// If there is an idle time set, we will create a temporary timer,218// which will cause the iterator to end if no new elements are received:219if (this.#idle) {220idleTimer = setTimeout(() => {221this.end();222resolve(this.next());223}, this.#idle);224}225226// Once it has received at least one value, we will clear the timer (if defined),227// and resolve with the new value:228this.emitter.once(this.event, () => {229if (idleTimer) clearTimeout(idleTimer);230resolve(this.next());231});232});233}234235/**236* Handles what happens when you break or return from a loop.237*/238public return(): Promise<IteratorResult<V>> {239this.end();240return Promise.resolve({ done: true, value: undefined as never });241}242243public throw(err): Promise<IteratorResult<V>> {244this.err = err;245// fake event to trigger handling of err246this.emitter.emit(this.event);247this.end();248return Promise.resolve({ done: true, value: undefined as never });249}250251/**252* The symbol allowing EventIterators to be used in for-await-of loops.253*/254public [Symbol.asyncIterator](): AsyncIterableIterator<V> {255return this;256}257258/**259* Pushes a value into the queue.260*/261protected push(...args): void {262try {263const value = this.map(args);264this.#queue.push(value);265} catch (err) {266this.err = err;267// fake event to trigger handling of err268this.emitter.emit(this.event);269}270}271}272273274