Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/jupyter/execute/output-handler.ts
1447 views
1
/*
2
* This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.
3
* License: MS-RSL – see LICENSE.md for details
4
*/
5
6
/*
7
Class that handles output messages generated for evaluation of code
8
for a particular cell.
9
10
WARNING: For efficiency reasons (involving syncdb patch sizes),
11
outputs is a map from the (string representations of) the numbers
12
from 0 to n-1, where there are n messages. So watch out.
13
14
OutputHandler emits these events:
15
16
- 'change' -- (save), called when we change cell; if save=true, recommend
17
broadcasting this change to other users ASAP.
18
19
- 'done' -- emited once when finished; after this, everything is cleaned up
20
21
- 'more_output' -- If we exceed the message limit, emit more_output
22
(mesg, mesg_length) with extra messages.
23
24
- 'process' -- Gets called on any incoming message; it may
25
**mutate** the message, e.g., removing images uses this.
26
27
*/
28
29
import { callback } from "awaiting";
30
import { EventEmitter } from "events";
31
import {
32
close,
33
defaults,
34
required,
35
server_time,
36
len,
37
to_json,
38
is_object,
39
} from "@cocalc/util/misc";
40
41
const now = () => server_time().valueOf() - 0;
42
43
const MIN_SAVE_INTERVAL_MS = 500;
44
const MAX_SAVE_INTERVAL_MS = 45000;
45
46
export class OutputHandler extends EventEmitter {
47
private _opts: any;
48
private _n: number;
49
private _clear_before_next_output: boolean;
50
private _output_length: number;
51
private _in_more_output_mode: any;
52
private _state: any;
53
private _stdin_cb: any;
54
55
// Never commit output to send to the frontend more frequently than this.saveIntervalMs
56
// Otherwise, we'll end up with a large number of patches.
57
// We start out with MIN_SAVE_INTERVAL_MS and exponentially back it off to
58
// MAX_SAVE_INTERVAL_MS.
59
private lastSave: number = 0;
60
private saveIntervalMs = MIN_SAVE_INTERVAL_MS;
61
62
constructor(opts: any) {
63
super();
64
this._opts = defaults(opts, {
65
cell: required, // object; the cell whose output (etc.) will get mutated
66
// If given, used to truncate, discard output messages; extra
67
// messages are saved and made available.
68
max_output_length: undefined,
69
max_output_messages: undefined,
70
report_started_ms: undefined, // If no messages for this many ms, then we update via set to indicate
71
// that cell is being run.
72
dbg: undefined,
73
});
74
const { cell } = this._opts;
75
cell.output = null;
76
cell.exec_count = null;
77
cell.state = "run";
78
cell.start = null;
79
cell.end = null;
80
// Internal state
81
this._n = 0;
82
this._clear_before_next_output = false;
83
this._output_length = 0;
84
this._in_more_output_mode = false;
85
this._state = "ready";
86
// Report that computation started if there is no output soon.
87
if (this._opts.report_started_ms != null) {
88
setTimeout(this._report_started, this._opts.report_started_ms);
89
}
90
91
this.stdin = this.stdin.bind(this);
92
}
93
94
close = (): void => {
95
if (this._state == "closed") return;
96
this._state = "closed";
97
this.emit("done");
98
this.removeAllListeners();
99
close(this, new Set(["_state", "close"]));
100
};
101
102
_clear_output = (save?: any): void => {
103
if (this._state === "closed") {
104
return;
105
}
106
this._clear_before_next_output = false;
107
// clear output message -- we delete all the outputs
108
// reset the counter n, save, and are done.
109
// IMPORTANT: In Jupyter the clear_output message and everything
110
// before it is NOT saved in the notebook output itself
111
// (like in Sage worksheets).
112
this._opts.cell.output = null;
113
this._n = 0;
114
this._output_length = 0;
115
this.emit("change", save);
116
};
117
118
_report_started = (): void => {
119
if (this._state == "closed" || this._n > 0) {
120
// do nothing -- already getting output or done.
121
return;
122
}
123
this.emit("change", true);
124
};
125
126
// Call when computation starts
127
start = () => {
128
if (this._state === "closed") {
129
return;
130
}
131
this._opts.cell.start = (new Date() as any) - 0;
132
this._opts.cell.state = "busy";
133
this.emit("change", true);
134
};
135
136
// Call error if an error occurs. An appropriate error message is generated.
137
// Computation is considered done.
138
error = (err: any): void => {
139
if (err === "closed") {
140
// See https://github.com/sagemathinc/cocalc/issues/2388
141
this.message({
142
data: {
143
"text/markdown":
144
"<font color='red'>**Jupyter Kernel terminated:**</font> This might be caused by running out of memory or hitting a bug in some library (e.g., forking too many processes, trying to access invalid memory, etc.). Consider restarting or upgrading your project or running the relevant code directly in a terminal to track down the cause, as [explained here](https://github.com/sagemathinc/cocalc/wiki/KernelTerminated).",
145
},
146
});
147
} else {
148
this.message({
149
text: `${err}`,
150
name: "stderr",
151
});
152
}
153
this.done();
154
};
155
156
// Call done exactly once when done
157
done = (): void => {
158
if (this._state === "closed") {
159
return;
160
}
161
this._opts.cell.state = "done";
162
if (this._opts.cell.start == null) {
163
this._opts.cell.start = now();
164
}
165
this._opts.cell.end = now();
166
this.emit("change", true);
167
this.close();
168
};
169
170
// Handle clear
171
clear = (wait: any): void => {
172
if (wait) {
173
// wait until next output before clearing.
174
this._clear_before_next_output = true;
175
return;
176
}
177
this._clear_output();
178
};
179
180
_clean_mesg = (mesg: any): void => {
181
delete mesg.execution_state;
182
delete mesg.code;
183
delete mesg.status;
184
delete mesg.source;
185
for (const k in mesg) {
186
const v = mesg[k];
187
if (is_object(v) && len(v) === 0) {
188
delete mesg[k];
189
}
190
}
191
};
192
193
private _push_mesg = (mesg: any, save?: boolean): void => {
194
if (this._state === "closed") {
195
return;
196
}
197
198
if (save == null) {
199
const n = now();
200
if (n - this.lastSave > this.saveIntervalMs) {
201
save = true;
202
this.lastSave = n;
203
this.saveIntervalMs = Math.min(
204
MAX_SAVE_INTERVAL_MS,
205
this.saveIntervalMs * 1.1,
206
);
207
}
208
} else if (save == true) {
209
this.lastSave = now();
210
}
211
212
if (this._opts.cell.output === null) {
213
this._opts.cell.output = {};
214
}
215
this._opts.cell.output[`${this._n}`] = mesg;
216
this._n += 1;
217
this.emit("change", save);
218
};
219
220
set_input = (input: any, save = true): void => {
221
if (this._state === "closed") {
222
return;
223
}
224
this._opts.cell.input = input;
225
this.emit("change", save);
226
};
227
228
// Process incoming messages. This may mutate mesg.
229
message = (mesg: any): void => {
230
let has_exec_count: any;
231
if (this._state === "closed") {
232
return;
233
}
234
235
if (this._opts.cell.end) {
236
// ignore any messages once we're done.
237
return;
238
}
239
240
// record execution_count, if there.
241
if (mesg.execution_count != null) {
242
has_exec_count = true;
243
this._opts.cell.exec_count = mesg.execution_count;
244
delete mesg.execution_count;
245
} else {
246
has_exec_count = false;
247
}
248
249
// delete useless fields
250
this._clean_mesg(mesg);
251
252
if (len(mesg) === 0) {
253
// don't even bother saving this message; nothing useful here.
254
return;
255
}
256
257
if (has_exec_count) {
258
// message that has an execution count
259
mesg.exec_count = this._opts.cell.exec_count;
260
}
261
262
// hook to process message (e.g., this may mutate mesg,
263
// e.g., to remove big images)
264
this.emit("process", mesg);
265
266
if (this._clear_before_next_output) {
267
this._clear_output(false);
268
}
269
270
const s = JSON.stringify(mesg);
271
const mesg_length = s.length;
272
273
if (this._in_more_output_mode) {
274
this.emit("more_output", mesg, mesg_length);
275
return;
276
}
277
278
// check if limits exceeded:
279
280
this._output_length += mesg_length;
281
282
const notTooLong =
283
this._opts.max_output_length == null ||
284
this._output_length <= this._opts.max_output_length;
285
const notTooMany =
286
this._opts.max_output_messages == null ||
287
this._n < this._opts.max_output_messages;
288
289
if (notTooLong && notTooMany) {
290
// limits NOT exceeded
291
this._push_mesg(mesg);
292
return;
293
}
294
295
// Switch to too much output mode:
296
this._push_mesg({ more_output: true });
297
this._in_more_output_mode = true;
298
this.emit("more_output", mesg, mesg_length);
299
};
300
301
async stdin(prompt: string, password: boolean): Promise<string> {
302
// See docs for stdin option to execute_code in backend jupyter.coffee
303
this._push_mesg({ name: "input", opts: { prompt, password } });
304
// Now we wait until the output message we just included has its
305
// value set. Then we call cb with that value.
306
// This weird thing below sets this._stdin_cb, then
307
// waits for this._stdin_cb to be called, which happens
308
// when cell_changed gets called.
309
return await callback((cb) => (this._stdin_cb = cb));
310
}
311
312
// Call this when the cell changes; only used for stdin right now.
313
cell_changed = (cell: any, get_password: any): void => {
314
if (this._state === "closed") {
315
return;
316
}
317
if (this._stdin_cb == null) {
318
return;
319
}
320
const output = cell != null ? cell.get("output") : undefined;
321
if (output == null) {
322
return;
323
}
324
const value = output.getIn([`${output.size - 1}`, "value"]);
325
if (value != null) {
326
let x = value;
327
if (this._opts.cell.output) {
328
const n = `${len(this._opts.cell.output) - 1}`;
329
if (
330
get_password != null &&
331
this._opts.cell.output[n] &&
332
this._opts.cell.output[n].opts != null &&
333
this._opts.cell.output[n].opts.password
334
) {
335
// In case of a password, the value is NEVER placed in the document.
336
// Instead the value is submitted to the backend via https, with
337
// a random identifier put in the value.
338
x = get_password(); // get actual password
339
}
340
if (this._opts.cell.output[`${n}`] != null) {
341
this._opts.cell.output[`${n}`].value = value;
342
} // sync output-handler view of output with syncdb
343
}
344
this._stdin_cb(undefined, x);
345
delete this._stdin_cb;
346
}
347
};
348
349
payload = (payload: any): void => {
350
if (this._state === "closed") {
351
return;
352
}
353
if (payload.source === "set_next_input") {
354
this.set_input(payload.text);
355
} else if (payload.source === "page") {
356
// Just handle as a normal message; and we don't show in the pager,
357
// which doesn't make sense for multiple users.
358
// This happens when requesting help for r:
359
// https://github.com/sagemathinc/cocalc/issues/1933
360
this.message(payload);
361
} else {
362
// No idea what to do with this...
363
if (typeof this._opts.dbg === "function") {
364
this._opts.dbg(`Unknown PAYLOAD: ${to_json(payload)}`);
365
}
366
}
367
};
368
}
369
370