Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/jupyter/execute/execute-code.ts
1447 views
1
/*
2
* This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.
3
* License: MS-RSL – see LICENSE.md for details
4
*/
5
6
/*
7
Send code to a kernel to be evaluated, then wait for
8
the results and gather them together.
9
*/
10
11
import { callback, delay } from "awaiting";
12
import { EventEmitter } from "events";
13
import { VERSION } from "@cocalc/jupyter/kernel/version";
14
import type { JupyterKernelInterface as JupyterKernel } from "@cocalc/jupyter/types/project-interface";
15
import type { MessageType } from "@cocalc/jupyter/zmq/types";
16
import { copy_with, deep_copy, uuid } from "@cocalc/util/misc";
17
import type {
18
CodeExecutionEmitterInterface,
19
OutputMessage,
20
ExecOpts,
21
StdinFunction,
22
} from "@cocalc/jupyter/types/project-interface";
23
import { getLogger } from "@cocalc/backend/logger";
24
import { EventIterator } from "@cocalc/util/event-iterator";
25
import { once } from "@cocalc/util/async-utils";
26
import { reuseInFlight } from "@cocalc/util/reuse-in-flight";
27
import type { Message } from "@cocalc/jupyter/zmq/message";
28
29
const log = getLogger("jupyter:execute-code");
30
31
type State = "init" | "running" | "done" | "closed";
32
33
export class CodeExecutionEmitter
34
extends EventEmitter
35
implements CodeExecutionEmitterInterface
36
{
37
readonly kernel: JupyterKernel;
38
readonly code: string;
39
readonly id?: string;
40
readonly stdin?: StdinFunction;
41
readonly halt_on_error: boolean;
42
// DO NOT set iopub_done or shell_done directly; instead
43
// set them using the function set_shell_done and set_iopub_done.
44
// This ensures that we call _finish when both vars have been set.
45
private iopub_done: boolean = false;
46
private shell_done: boolean = false;
47
private state: State = "init";
48
private _message: any;
49
private _go_cb: Function | undefined = undefined;
50
private timeout_ms?: number;
51
private timer?: any;
52
private killing: string = "";
53
private _iter?: EventIterator<OutputMessage>;
54
55
constructor(kernel: JupyterKernel, opts: ExecOpts) {
56
super();
57
this.kernel = kernel;
58
this.code = opts.code;
59
this.id = opts.id;
60
this.stdin = opts.stdin;
61
this.halt_on_error = !!opts.halt_on_error;
62
this.timeout_ms = opts.timeout_ms;
63
this._message = {
64
parent_header: {},
65
metadata: {},
66
channel: "shell",
67
header: {
68
msg_id: `execute_${uuid()}`,
69
username: "",
70
session: "",
71
msg_type: "execute_request" as MessageType,
72
version: VERSION,
73
date: new Date().toISOString(),
74
},
75
content: {
76
code: this.code,
77
silent: false,
78
store_history: true, // so execution_count is updated.
79
user_expressions: {},
80
allow_stdin: this.stdin != null,
81
},
82
};
83
}
84
85
// async interface:
86
iter = (): EventIterator<OutputMessage> => {
87
if (this.state == "closed") {
88
throw Error("closed");
89
}
90
if (this._iter == null) {
91
this._iter = new EventIterator<OutputMessage>(this, "output", {
92
map: (args) => {
93
if (args[0]?.done) {
94
setTimeout(() => this._iter?.close(), 1);
95
}
96
return args[0];
97
},
98
});
99
}
100
return this._iter;
101
};
102
103
waitUntilDone = reuseInFlight(async () => {
104
try {
105
await once(this, "done");
106
} catch {
107
// it throws on close, but that's also "done".
108
}
109
});
110
111
private setState = (state: State) => {
112
this.state = state;
113
this.emit(state);
114
};
115
116
// Emits a valid result, which is
117
// https://jupyter-client.readthedocs.io/en/stable/messaging.html#python-api
118
// Or an array of those when this.all is true
119
emit_output = (output: OutputMessage): void => {
120
this.emit("output", output);
121
if (output["done"]) {
122
this.setState("done");
123
}
124
};
125
126
// Call this to inform anybody listening that we've canceled
127
// this execution, and will NOT be doing it ever, and it
128
// was explicitly canceled.
129
cancel = (): void => {
130
this.emit("canceled");
131
this.setState("done");
132
this._iter?.close();
133
};
134
135
close = (): void => {
136
if (this.state == "closed") {
137
return;
138
}
139
this.setState("closed");
140
if (this.timer != null) {
141
clearTimeout(this.timer);
142
delete this.timer;
143
}
144
this._iter?.close();
145
delete this._iter;
146
// @ts-ignore
147
delete this._go_cb;
148
this.emit("closed");
149
this.removeAllListeners();
150
};
151
152
throw_error = (err): void => {
153
this.emit("error", err);
154
this.close();
155
};
156
157
private handleStdin = async (mesg: Message): Promise<void> => {
158
if (!this.stdin) {
159
throw Error("BUG -- stdin handling not supported");
160
}
161
log.silly("handleStdin: STDIN kernel --> server: ", mesg);
162
if (mesg.parent_header.msg_id !== this._message.header.msg_id) {
163
log.warn(
164
"handleStdin: STDIN msg_id mismatch:",
165
mesg.parent_header.msg_id,
166
this._message.header.msg_id,
167
);
168
return;
169
}
170
171
let response;
172
try {
173
response = await this.stdin(
174
mesg.content.prompt ? mesg.content.prompt : "",
175
!!mesg.content.password,
176
);
177
} catch (err) {
178
response = `ERROR -- ${err}`;
179
}
180
log.silly("handleStdin: STDIN client --> server", response);
181
const m = {
182
channel: "stdin",
183
parent_header: this._message.header,
184
metadata: {},
185
header: {
186
msg_id: uuid(), // this._message.header.msg_id
187
username: "",
188
session: "",
189
msg_type: "input_reply" as MessageType,
190
version: VERSION,
191
date: new Date().toISOString(),
192
},
193
content: {
194
value: response,
195
},
196
};
197
log.silly("handleStdin: STDIN server --> kernel:", m);
198
this.kernel.sockets?.send(m);
199
};
200
201
private handleShell = (mesg: Message): void => {
202
if (mesg.parent_header.msg_id !== this._message.header.msg_id) {
203
log.silly(
204
`handleShell: msg_id mismatch: ${mesg.parent_header.msg_id} != ${this._message.header.msg_id}`,
205
);
206
return;
207
}
208
log.silly("handleShell: got SHELL message -- ", mesg);
209
210
if (mesg.content?.status == "ok") {
211
this._push_mesg(mesg);
212
this.set_shell_done(true);
213
} else {
214
log.warn(`handleShell: status != ok: ${mesg.content?.status}`);
215
// NOTE: I'm adding support for "abort" status, since I was just reading
216
// the kernel docs and it exists but is deprecated. Some old kernels
217
// might use it and we should thus properly support it:
218
// https://jupyter-client.readthedocs.io/en/stable/messaging.html#request-reply
219
//
220
// 2023-05-11: this was conditional on mesg.content?.status == "error" or == "abort"
221
// but in reality, there was also "aborted". Hence this as an catch-all else.
222
if (this.halt_on_error) {
223
this.kernel.clear_execute_code_queue();
224
}
225
this.set_shell_done(true);
226
}
227
};
228
229
private set_shell_done = (value: boolean): void => {
230
this.shell_done = value;
231
if (this.iopub_done && this.shell_done) {
232
this._finish();
233
}
234
};
235
236
private set_iopub_done = (value: boolean): void => {
237
this.iopub_done = value;
238
if (this.iopub_done && this.shell_done) {
239
this._finish();
240
}
241
};
242
243
handleIOPub = (mesg: Message): void => {
244
if (mesg.parent_header.msg_id !== this._message.header.msg_id) {
245
// iopub message for a different execute request so ignore it.
246
return;
247
}
248
// these can be huge -- do not uncomment except for low level debugging!
249
// log.silly("handleIOPub: got IOPUB message -- ", mesg);
250
251
if (mesg.content?.comm_id != null) {
252
// A comm message that is a result of execution of this code.
253
// IGNORE here -- all comm messages are handles at a higher
254
// level in jupyter.ts. Also, this case should never happen, since
255
// we do not emit an event from jupyter.ts in this case anyways.
256
} else {
257
// A normal output message.
258
this._push_mesg(mesg);
259
}
260
261
this.set_iopub_done(
262
!!this.killing || mesg.content?.execution_state == "idle",
263
);
264
};
265
266
// Called if the kernel is closed for some reason, e.g., crashing.
267
private handleClosed = (): void => {
268
log.debug("CodeExecutionEmitter.handleClosed: kernel closed");
269
this.killing = "kernel crashed";
270
this._finish();
271
};
272
273
private _finish = (): void => {
274
if (this.state == "closed") {
275
return;
276
}
277
this.kernel.removeListener("iopub", this.handleIOPub);
278
if (this.stdin != null) {
279
this.kernel.removeListener("stdin", this.handleStdin);
280
}
281
this.kernel.removeListener("shell", this.handleShell);
282
if (this.kernel._execute_code_queue != null) {
283
this.kernel._execute_code_queue.shift(); // finished
284
this.kernel._process_execute_code_queue(); // start next exec
285
}
286
this.kernel.removeListener("closed", this.handleClosed);
287
this.kernel.removeListener("failed", this.handleClosed);
288
this._push_mesg({ done: true });
289
this.close();
290
291
// Finally call the callback that was setup in this._go.
292
// This is what makes it possible to await on the entire
293
// execution. Also it is important to explicitly
294
// signal an error if we had to kill execution due
295
// to hitting a timeout, since the kernel may or may
296
// not have randomly done so itself in output.
297
this._go_cb?.(this.killing);
298
this._go_cb = undefined;
299
};
300
301
_push_mesg = (mesg): void => {
302
// TODO: mesg isn't a normal javascript object;
303
// it's **silently** immutable, which
304
// is pretty annoying for our use. For now, we
305
// just copy it, which is a waste.
306
const header = mesg.header;
307
mesg = copy_with(mesg, ["metadata", "content", "buffers", "done"]);
308
mesg = deep_copy(mesg);
309
if (header !== undefined) {
310
mesg.msg_type = header.msg_type;
311
}
312
this.emit_output(mesg);
313
};
314
315
go = async (): Promise<void> => {
316
await callback(this._go);
317
};
318
319
private _go = (cb: Function): void => {
320
if (this.state != "init") {
321
cb("may only run once");
322
return;
323
}
324
this.state = "running";
325
log.silly("_execute_code", this.code);
326
const kernelState = this.kernel.get_state();
327
if (kernelState == "closed" || kernelState == "failed") {
328
log.silly("_execute_code", "kernel.get_state() is ", kernelState);
329
this.killing = kernelState;
330
this._finish();
331
cb(kernelState);
332
return;
333
}
334
335
this._go_cb = cb; // this._finish will call this.
336
337
if (this.stdin != null) {
338
this.kernel.on("stdin", this.handleStdin);
339
}
340
this.kernel.on("shell", this.handleShell);
341
this.kernel.on("iopub", this.handleIOPub);
342
343
this.kernel.once("closed", this.handleClosed);
344
this.kernel.once("failed", this.handleClosed);
345
346
if (this.timeout_ms) {
347
// setup a timeout at which point things will get killed if they don't finish
348
this.timer = setTimeout(this.timeout, this.timeout_ms);
349
}
350
351
log.debug("_execute_code: send the message to get things rolling");
352
if (this.kernel.sockets == null) {
353
throw Error("bug -- sockets must be defined");
354
}
355
this.kernel.sockets.send(this._message);
356
};
357
358
private timeout = async (): Promise<void> => {
359
if (this.state == "closed") {
360
log.debug(
361
"CodeExecutionEmitter.timeout: already finished, so nothing to worry about",
362
);
363
return;
364
}
365
this.killing =
366
"Timeout Error: execution time limit = " +
367
Math.round((this.timeout_ms ?? 0) / 1000) +
368
" seconds";
369
let tries = 3;
370
let d = 1000;
371
while (this.state != ("closed" as State) && tries > 0) {
372
log.debug(
373
"CodeExecutionEmitter.timeout: code still running, so try to interrupt it",
374
);
375
// Code still running but timeout reached.
376
// Keep sending interrupt signal, which will hopefully do something to
377
// stop running code (there is no guarantee, of course). We
378
// try a few times...
379
this.kernel.signal("SIGINT");
380
await delay(d);
381
d *= 1.3;
382
tries -= 1;
383
}
384
if (this.state != ("closed" as State)) {
385
log.debug(
386
"CodeExecutionEmitter.timeout: now try SIGKILL, which should kill things for sure.",
387
);
388
this.kernel.signal("SIGKILL");
389
this._finish();
390
}
391
};
392
}
393
394