Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/util/event-iterator.ts
1447 views
1
/*
2
LICENSE: MIT
3
4
This is a slight fork of
5
6
https://github.com/sapphiredev/utilities/tree/main/packages/event-iterator
7
8
because upstream is slightly broken and what it actually does doesn't
9
agree with the docs. I can see why. Upstream would capture ['arg1','arg2']]
10
for an event emitter doing this
11
12
emitter.emit('foo', 'arg1', 'arg2')
13
14
But for our application we only want 'arg1'. I thus added a map option,
15
which makes it easy to do what we want.
16
*/
17
18
import type { EventEmitter } from "node:events";
19
20
/**
21
* A filter for an EventIterator.
22
*/
23
export type EventIteratorFilter<V> = (value: V) => boolean;
24
25
/**
26
* Options to be passed to an EventIterator.
27
*/
28
export interface EventIteratorOptions<V> {
29
/**
30
* The filter.
31
*/
32
filter?: EventIteratorFilter<V>;
33
34
// maps the array of args emitted by the event emitter a V
35
map?: (args: any[]) => V;
36
37
/**
38
* The timeout in ms before ending the EventIterator.
39
*/
40
idle?: number;
41
42
/**
43
* The limit of events that pass the filter to iterate.
44
*/
45
limit?: number;
46
47
// called when iterator ends -- use to do cleanup.
48
onEnd?: (iter?: EventIterator<V>) => void;
49
}
50
51
/**
52
* An EventIterator, used for asynchronously iterating over received values.
53
*/
54
export class EventIterator<V extends unknown>
55
implements AsyncIterableIterator<V>
56
{
57
/**
58
* The emitter to listen to.
59
*/
60
public readonly emitter: EventEmitter;
61
62
/**
63
* The event the event iterator is listening for to receive values from.
64
*/
65
public readonly event: string;
66
67
/**
68
* The filter used to filter out values.
69
*/
70
public filter: EventIteratorFilter<V>;
71
72
public map;
73
74
/**
75
* Whether or not the EventIterator has ended.
76
*/
77
#ended = false;
78
79
private onEnd?: (iter?: EventIterator<V>) => void;
80
81
/**
82
* The amount of idle time in ms before moving on.
83
*/
84
readonly #idle?: number;
85
86
/**
87
* The queue of received values.
88
*/
89
#queue: V[] = [];
90
91
private err: any = undefined;
92
93
/**
94
* The amount of events that have passed the filter.
95
*/
96
#passed = 0;
97
98
/**
99
* The limit before ending the EventIterator.
100
*/
101
readonly #limit: number;
102
103
/**
104
* The timer to track when this will idle out.
105
*/
106
#idleTimer: NodeJS.Timeout | undefined | null = null;
107
108
/**
109
* The push handler with context bound to the instance.
110
*/
111
readonly #push: (this: EventIterator<V>, ...value: unknown[]) => void;
112
113
/**
114
* @param emitter The event emitter to listen to.
115
* @param event The event we're listening for to receives values from.
116
* @param options Any extra options.
117
*/
118
public constructor(
119
emitter: EventEmitter,
120
event: string,
121
options: EventIteratorOptions<V> = {},
122
) {
123
this.emitter = emitter;
124
this.event = event;
125
this.map = options.map ?? ((args) => args);
126
this.#limit = options.limit ?? Infinity;
127
this.#idle = options.idle;
128
this.filter = options.filter ?? ((): boolean => true);
129
this.onEnd = options.onEnd;
130
131
// This timer is to idle out on lack of valid responses
132
if (this.#idle) {
133
// NOTE: this same code is in next in case when we can't use refresh
134
this.#idleTimer = setTimeout(this.end.bind(this), this.#idle);
135
}
136
this.#push = this.push.bind(this);
137
const maxListeners = this.emitter.getMaxListeners();
138
if (maxListeners !== 0) this.emitter.setMaxListeners(maxListeners + 1);
139
140
this.emitter.on(this.event, this.#push);
141
}
142
143
/**
144
* Whether or not the EventIterator has ended.
145
*/
146
public get ended(): boolean {
147
return this.#ended;
148
}
149
150
/**
151
* Ends the EventIterator.
152
*/
153
public end(): void {
154
if (this.#ended) return;
155
this.#ended = true;
156
this.#queue = [];
157
158
this.emitter.off(this.event, this.#push);
159
const maxListeners = this.emitter.getMaxListeners();
160
if (maxListeners !== 0) {
161
this.emitter.setMaxListeners(maxListeners - 1);
162
}
163
this.onEnd?.(this);
164
}
165
// aliases to match usage in NATS and CoCalc.
166
close = this.end;
167
stop = this.end;
168
169
drain(): void {
170
// just immediately end
171
this.end();
172
// [ ] TODO: for compat. I'm not sure what this should be
173
// or if it matters...
174
// console.log("WARNING: TODO -- event-iterator drain not implemented");
175
}
176
177
/**
178
* The next value that's received from the EventEmitter.
179
*/
180
public async next(): Promise<IteratorResult<V>> {
181
if (this.err) {
182
const err = this.err;
183
delete this.err;
184
this.end();
185
throw err;
186
}
187
// If there are elements in the queue, return an undone response:
188
if (this.#queue.length) {
189
const value = this.#queue.shift()!;
190
if (!this.filter(value)) {
191
return this.next();
192
}
193
if (++this.#passed >= this.#limit) {
194
this.end();
195
}
196
if (this.#idleTimer) {
197
if (this.#idleTimer.refresh != null) {
198
this.#idleTimer.refresh();
199
} else {
200
clearTimeout(this.#idleTimer);
201
this.#idleTimer = setTimeout(this.end.bind(this), this.#idle);
202
}
203
}
204
205
return { done: false, value };
206
}
207
208
// If the iterator ended, clean-up timer and return a done response:
209
if (this.#ended) {
210
if (this.#idleTimer) clearTimeout(this.#idleTimer);
211
return { done: true, value: undefined as never };
212
}
213
214
// Listen for a new element from the emitter:
215
return new Promise<IteratorResult<V>>((resolve) => {
216
let idleTimer: NodeJS.Timeout | undefined | null = null;
217
218
// If there is an idle time set, we will create a temporary timer,
219
// which will cause the iterator to end if no new elements are received:
220
if (this.#idle) {
221
idleTimer = setTimeout(() => {
222
this.end();
223
resolve(this.next());
224
}, this.#idle);
225
}
226
227
// Once it has received at least one value, we will clear the timer (if defined),
228
// and resolve with the new value:
229
this.emitter.once(this.event, () => {
230
if (idleTimer) clearTimeout(idleTimer);
231
resolve(this.next());
232
});
233
});
234
}
235
236
/**
237
* Handles what happens when you break or return from a loop.
238
*/
239
public return(): Promise<IteratorResult<V>> {
240
this.end();
241
return Promise.resolve({ done: true, value: undefined as never });
242
}
243
244
public throw(err): Promise<IteratorResult<V>> {
245
this.err = err;
246
// fake event to trigger handling of err
247
this.emitter.emit(this.event);
248
this.end();
249
return Promise.resolve({ done: true, value: undefined as never });
250
}
251
252
/**
253
* The symbol allowing EventIterators to be used in for-await-of loops.
254
*/
255
public [Symbol.asyncIterator](): AsyncIterableIterator<V> {
256
return this;
257
}
258
259
/**
260
* Pushes a value into the queue.
261
*/
262
protected push(...args): void {
263
try {
264
const value = this.map(args);
265
this.#queue.push(value);
266
} catch (err) {
267
this.err = err;
268
// fake event to trigger handling of err
269
this.emitter.emit(this.event);
270
}
271
}
272
}
273
274