Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/jupyter/kernel/kernel.ts
1447 views
1
/*
2
* This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.
3
* License: MS-RSL – see LICENSE.md for details
4
*/
5
6
/*
7
Jupyter Backend
8
9
For interactive testing:
10
11
$ node
12
13
> j = require('./dist/kernel'); k = j.kernel({name:'python3', path:'x.ipynb'});
14
> console.log(JSON.stringify(await k.execute_code_now({code:'2+3'}),0,2))
15
16
*/
17
18
// POOL VERSION - faster to restart but possible subtle issues
19
const USE_KERNEL_POOL = true;
20
21
// const DEBUG = true; // only for extreme debugging.
22
const DEBUG = false; // normal mode
23
if (DEBUG) {
24
console.log("Enabling low level Jupyter kernel debugging.");
25
}
26
27
// NOTE: we choose to use node-cleanup instead of the much more
28
// popular exit-hook, since node-cleanup actually works for us.
29
// https://github.com/jtlapp/node-cleanup/issues/16
30
// Also exit-hook is hard to import from commonjs.
31
import nodeCleanup from "node-cleanup";
32
import { reuseInFlight } from "@cocalc/util/reuse-in-flight";
33
import { callback } from "awaiting";
34
import type { MessageType } from "@cocalc/jupyter/zmq/types";
35
import { jupyterSockets, type JupyterSockets } from "@cocalc/jupyter/zmq";
36
import { EventEmitter } from "node:events";
37
import { unlink } from "@cocalc/backend/misc/async-utils-node";
38
import { remove_redundant_reps } from "@cocalc/jupyter/ipynb/import-from-ipynb";
39
import { JupyterActions } from "@cocalc/jupyter/redux/project-actions";
40
import {
41
type BlobStoreInterface,
42
CodeExecutionEmitterInterface,
43
ExecOpts,
44
JupyterKernelInterface,
45
KernelInfo,
46
} from "@cocalc/jupyter/types/project-interface";
47
import { JupyterStore } from "@cocalc/jupyter/redux/store";
48
import { JUPYTER_MIMETYPES } from "@cocalc/jupyter/util/misc";
49
import type { SyncDB } from "@cocalc/sync/editor/db/sync";
50
import { retry_until_success, until } from "@cocalc/util/async-utils";
51
import createChdirCommand from "@cocalc/util/jupyter-api/chdir-commands";
52
import { key_value_store } from "@cocalc/util/key-value-store";
53
import {
54
copy,
55
deep_copy,
56
is_array,
57
len,
58
merge,
59
original_path,
60
path_split,
61
uuid,
62
uint8ArrayToBase64,
63
} from "@cocalc/util/misc";
64
import { CodeExecutionEmitter } from "@cocalc/jupyter/execute/execute-code";
65
import {
66
getLanguage,
67
get_kernel_data_by_name,
68
} from "@cocalc/jupyter/kernel/kernel-data";
69
70
import launchJupyterKernel, {
71
LaunchJupyterOpts,
72
SpawnedKernel,
73
killKernel,
74
} from "@cocalc/jupyter/pool/pool";
75
// non-pool version
76
import launchJupyterKernelNoPool from "@cocalc/jupyter/kernel/launch-kernel";
77
import { kernels } from "./kernels";
78
import { getAbsolutePathFromHome } from "@cocalc/jupyter/util/fs";
79
import type { KernelParams } from "@cocalc/jupyter/types/kernel";
80
import { redux_name } from "@cocalc/util/redux/name";
81
import { redux } from "@cocalc/jupyter/redux/app";
82
import { VERSION } from "@cocalc/jupyter/kernel/version";
83
import type { NbconvertParams } from "@cocalc/util/jupyter/types";
84
import type { Client } from "@cocalc/sync/client/types";
85
import { getLogger } from "@cocalc/backend/logger";
86
import { base64ToBuffer } from "@cocalc/util/base64";
87
import { sha1 as misc_node_sha1 } from "@cocalc/backend/misc_node";
88
import { join } from "path";
89
import { readFile } from "fs/promises";
90
91
const MAX_KERNEL_SPAWN_TIME = 120 * 1000;
92
93
type State = "failed" | "off" | "spawning" | "starting" | "running" | "closed";
94
95
const logger = getLogger("jupyter:kernel");
96
97
// We make it so nbconvert functionality can be dynamically enabled
98
// by calling this at runtime. The reason is because some users of
99
// this code (e.g., remote kernels) don't need to provide nbconvert
100
// functionality, and our implementation has some heavy dependencies,
101
// e.g., on a big chunk of the react frontend.
102
let nbconvert: (opts: NbconvertParams) => Promise<void> = async () => {
103
throw Error("nbconvert is not enabled");
104
};
105
export function initNbconvert(f) {
106
nbconvert = f;
107
}
108
109
/*
110
We set a few extra user-specific options for the environment in which
111
Sage-based Jupyter kernels run; these are more multi-user friendly.
112
*/
113
const SAGE_JUPYTER_ENV = merge(copy(process.env), {
114
PYTHONUSERBASE: `${process.env.HOME}/.local`,
115
PYTHON_EGG_CACHE: `${process.env.HOME}/.sage/.python-eggs`,
116
R_MAKEVARS_USER: `${process.env.HOME}/.sage/R/Makevars.user`,
117
});
118
119
// Initialize the actions and store for working with a specific
120
// Jupyter notebook. The syncdb is the syncdoc associated to
121
// the ipynb file, and this function creates the corresponding
122
// actions and store, which make it possible to work with this
123
// notebook.
124
export async function initJupyterRedux(syncdb: SyncDB, client: Client) {
125
const project_id = syncdb.project_id;
126
if (project_id == null) {
127
throw Error("project_id must be defined");
128
}
129
if (syncdb.get_state() == "closed") {
130
throw Error("syncdb must not be closed");
131
}
132
133
// This path is the file we will watch for changes and save to, which is in the original
134
// official ipynb format:
135
const path = original_path(syncdb.get_path());
136
logger.debug("initJupyterRedux", path);
137
138
const name = redux_name(project_id, path);
139
if (redux.getStore(name) != null && redux.getActions(name) != null) {
140
logger.debug(
141
"initJupyterRedux",
142
path,
143
" -- existing actions, so removing them",
144
);
145
// The redux info for this notebook already exists, so don't
146
// try to make it again without first deleting the existing one.
147
// Having two at once basically results in things feeling hung.
148
// This should never happen, but we ensure it
149
// See https://github.com/sagemathinc/cocalc/issues/4331
150
await removeJupyterRedux(path, project_id);
151
}
152
const store = redux.createStore(name, JupyterStore);
153
const actions = redux.createActions(name, JupyterActions);
154
155
actions._init(project_id, path, syncdb, store, client);
156
157
syncdb.once("error", (err) =>
158
logger.error("initJupyterRedux", path, "syncdb ERROR", err),
159
);
160
syncdb.once("ready", () =>
161
logger.debug("initJupyterRedux", path, "syncdb ready"),
162
);
163
}
164
165
export async function getJupyterRedux(syncdb: SyncDB) {
166
const project_id = syncdb.project_id;
167
const path = original_path(syncdb.get_path());
168
const name = redux_name(project_id, path);
169
return { actions: redux.getActions(name), store: redux.getStore(name) };
170
}
171
172
// Remove the store/actions for a given Jupyter notebook,
173
// and also close the kernel if it is running.
174
export async function removeJupyterRedux(
175
path: string,
176
project_id: string,
177
): Promise<void> {
178
logger.debug("removeJupyterRedux", path);
179
// if there is a kernel, close it
180
try {
181
await kernels.get(path)?.close();
182
} catch (_err) {
183
// ignore
184
}
185
const name = redux_name(project_id, path);
186
const actions = redux.getActions(name);
187
if (actions != null) {
188
try {
189
await actions.close();
190
} catch (err) {
191
logger.debug(
192
"removeJupyterRedux",
193
path,
194
" WARNING -- issue closing actions",
195
err,
196
);
197
}
198
}
199
redux.removeStore(name);
200
redux.removeActions(name);
201
}
202
203
export function kernel(opts: KernelParams): JupyterKernel {
204
return new JupyterKernel(opts.name, opts.path, opts.actions, opts.ulimit);
205
}
206
207
/*
208
Jupyter Kernel interface.
209
210
The kernel does *NOT* start up until either spawn is explicitly called, or
211
code execution is explicitly requested. This makes it possible to
212
call process_output without spawning an actual kernel.
213
*/
214
215
// Ensure that the kernels all get killed when the process exits.
216
nodeCleanup(() => {
217
for (const kernelPath in kernels.kernels) {
218
// We do NOT await the close since that's not really
219
// supported or possible in general.
220
const { _kernel } = kernels.kernels[kernelPath];
221
if (_kernel) {
222
killKernel(_kernel);
223
}
224
}
225
});
226
227
// NOTE: keep JupyterKernel implementation private -- use the kernel function
228
// above, and the interface defined in types.
229
export class JupyterKernel
230
extends EventEmitter
231
implements JupyterKernelInterface
232
{
233
// name -- if undefined that means "no actual Jupyter kernel" (i.e., this JupyterKernel exists
234
// here, but there is no actual separate real Jupyter kernel process and one won't be created).
235
// Everything should work, except you can't *spawn* such a kernel.
236
public name: string | undefined;
237
238
// this is a key:value store used mainly for stdin support right now. NOTHING TO DO WITH REDUX!
239
public store: any;
240
241
public readonly identity: string = uuid();
242
243
private stderr: string = "";
244
private ulimit?: string;
245
private _path: string;
246
private _actions?: JupyterActions;
247
private _state: State;
248
private _directory: string;
249
private _filename: string;
250
public _kernel?: SpawnedKernel;
251
private _kernel_info?: KernelInfo;
252
public _execute_code_queue: CodeExecutionEmitter[] = [];
253
public sockets?: JupyterSockets;
254
private has_ensured_running: boolean = false;
255
private failedError: string = "";
256
257
constructor(
258
name: string | undefined,
259
_path: string,
260
_actions: JupyterActions | undefined,
261
ulimit: string | undefined,
262
) {
263
super();
264
265
this.ulimit = ulimit;
266
267
this.name = name;
268
this._path = _path;
269
this._actions = _actions;
270
271
this.store = key_value_store();
272
const { head, tail } = path_split(getAbsolutePathFromHome(this._path));
273
this._directory = head;
274
this._filename = tail;
275
this.setState("off");
276
this._execute_code_queue = [];
277
if (kernels.get(this._path) !== undefined) {
278
// This happens when we change the kernel for a given file, e.g.,
279
// from python2 to python3.
280
// Obviously, it is important to clean up after the old kernel.
281
kernels.get(this._path)?.close();
282
}
283
kernels.set(this._path, this);
284
this.setMaxListeners(100);
285
const dbg = this.dbg("constructor");
286
dbg("done");
287
}
288
289
get_path = () => {
290
return this._path;
291
};
292
293
// no-op if calling it doesn't change the state.
294
private setState = (state: State): void => {
295
// state = 'off' --> 'spawning' --> 'starting' --> 'running' --> 'closed'
296
// 'failed'
297
if (this._state == state) return;
298
this._state = state;
299
this.emit("state", this._state);
300
this.emit(this._state); // we *SHOULD* use this everywhere, not above.
301
};
302
303
private setFailed = (error: string): void => {
304
this.failedError = error;
305
this.emit("kernel_error", error);
306
this.setState("failed");
307
};
308
309
get_state = (): string => {
310
return this._state;
311
};
312
313
private spawnedAlready = false;
314
spawn = async (spawn_opts?: {
315
env?: { [key: string]: string };
316
}): Promise<void> => {
317
if (this._state === "closed") {
318
// game over!
319
throw Error("closed -- kernel spawn");
320
}
321
if (!this.name) {
322
// spawning not allowed.
323
throw Error("cannot spawn since no kernel is set");
324
}
325
if (["running", "starting"].includes(this._state)) {
326
// Already spawned, so no need to do it again.
327
return;
328
}
329
330
if (this.spawnedAlready) {
331
return;
332
}
333
this.spawnedAlready = true;
334
335
this.setState("spawning");
336
const dbg = this.dbg("spawn");
337
dbg("spawning kernel...");
338
339
// ****
340
// CRITICAL: anything added to opts better not be specific
341
// to the kernel path or it will completely break using a
342
// pool, which makes things massively slower.
343
// ****
344
345
const opts: LaunchJupyterOpts = {
346
env: spawn_opts?.env ?? {},
347
ulimit: this.ulimit,
348
};
349
350
try {
351
const kernelData = await get_kernel_data_by_name(this.name);
352
// This matches "sage", "sage-x.y", and Sage Python3 ("sage -python -m ipykernel")
353
if (kernelData.argv[0].startsWith("sage")) {
354
dbg("setting special environment for Sage kernels");
355
opts.env = merge(opts.env, SAGE_JUPYTER_ENV);
356
}
357
} catch (err) {
358
dbg(`No kernelData available for ${this.name}`);
359
}
360
361
// Make cocalc default to the colab renderer for cocalc-jupyter, since
362
// this one happens to work best for us, and they don't have a custom
363
// one for us. See https://plot.ly/python/renderers/ and
364
// https://github.com/sagemathinc/cocalc/issues/4259
365
opts.env.PLOTLY_RENDERER = "colab";
366
opts.env.COCALC_JUPYTER_KERNELNAME = this.name;
367
368
// !!! WARNING: do NOT add anything new here that depends on that path!!!!
369
// Otherwise the pool will switch to falling back to not being used, and
370
// cocalc would then be massively slower.
371
// Non-uniform customization.
372
// launchJupyterKernel is explicitly smart enough to deal with opts.cwd
373
if (this._directory) {
374
opts.cwd = this._directory;
375
}
376
// launchJupyterKernel is explicitly smart enough to deal with opts.env.COCALC_JUPYTER_FILENAME
377
opts.env.COCALC_JUPYTER_FILENAME = this._path;
378
// and launchJupyterKernel is NOT smart enough to deal with anything else!
379
380
try {
381
if (USE_KERNEL_POOL) {
382
dbg("launching Jupyter kernel, possibly from pool");
383
this._kernel = await launchJupyterKernel(this.name, opts);
384
} else {
385
dbg("launching Jupyter kernel, NOT using pool");
386
this._kernel = await launchJupyterKernelNoPool(this.name, opts);
387
}
388
dbg("finishing kernel setup");
389
await this.finishSpawningKernel();
390
} catch (err) {
391
dbg(`ERROR spawning kernel - ${err}, ${err.stack}`);
392
// @ts-ignore
393
if (this._state == "closed") {
394
throw Error("closed");
395
}
396
// console.trace(err);
397
this.setFailed(
398
`**Unable to Spawn Jupyter Kernel:**\n\n${err} \n\nTry this in a terminal to help debug this (or contact support): \`jupyter console --kernel=${this.name}\`\n\nOnce you fix the problem, explicitly restart this kernel to test here.`,
399
);
400
}
401
};
402
403
get_spawned_kernel = () => {
404
return this._kernel;
405
};
406
407
get_connection_file = (): string | undefined => {
408
return this._kernel?.connectionFile;
409
};
410
411
private finishSpawningKernel = async () => {
412
const dbg = this.dbg("finishSpawningKernel");
413
dbg("now finishing spawn of kernel...");
414
415
if (DEBUG) {
416
this.low_level_dbg();
417
}
418
419
if (!this._kernel) {
420
throw Error("_kernel must be defined");
421
}
422
this._kernel.spawn.on("error", (err) => {
423
const error = `${err}\n${this.stderr}`;
424
dbg("kernel error", error);
425
this.setFailed(error);
426
});
427
428
// Track stderr from the subprocess itself (the kernel).
429
// This is useful for debugging broken kernels, etc., and is especially
430
// useful since it exists even if the kernel sends nothing over any
431
// zmq sockets (e.g., due to being very broken).
432
this.stderr = "";
433
this._kernel.spawn.stderr.on("data", (data) => {
434
const s = data.toString();
435
this.stderr += s;
436
if (this.stderr.length > 5000) {
437
// truncate if gets long for some reason -- only the end will
438
// be useful...
439
this.stderr = this.stderr.slice(this.stderr.length - 4000);
440
}
441
});
442
443
this._kernel.spawn.stdout.on("data", (_data) => {
444
// NOTE: it is very important to read stdout (and stderr above)
445
// even if we **totally ignore** the data. Otherwise, exec
446
// might overflow
447
// https://github.com/sagemathinc/cocalc/issues/5065
448
});
449
450
dbg("create main channel...", this._kernel.config);
451
452
// This horrible code is because jupyterSockets will just "hang
453
// forever" if the kernel doesn't get spawned for some reason.
454
// (TODO: now that I completely rewrote jupytersockets, we could
455
// just put a timeout there or better checks? not sure.)
456
// Thus we do some tests, waiting for at least 2 seconds for there
457
// to be a pid. This is complicated and ugly, and I'm sorry about that,
458
// but sometimes that's life.
459
try {
460
await until(
461
() => {
462
if (this._state != "spawning") {
463
// gave up
464
return true;
465
}
466
if (this.pid()) {
467
// there's a process :-)
468
return true;
469
}
470
return false;
471
},
472
{ start: 100, max: 100, timeout: 3000 },
473
);
474
} catch (err) {
475
// timed out
476
this.setFailed(`Failed to start kernel process. ${err}`);
477
return;
478
}
479
if (this._state != "spawning") {
480
// got canceled
481
return;
482
}
483
const pid = this.pid();
484
if (!pid) {
485
throw Error("bug");
486
}
487
let success = false;
488
let gaveUp = false;
489
setTimeout(() => {
490
if (!success) {
491
gaveUp = true;
492
// it's been 30s and the channels didn't work. Let's give up.
493
// probably the kernel process just failed.
494
this.setFailed("Failed to start kernel process -- timeout");
495
// We can't yet "cancel" createMainChannel itself -- that will require
496
// rewriting that dependency.
497
// https://github.com/sagemathinc/cocalc/issues/7040
498
// I did rewrite that -- so let's revisit this!
499
}
500
}, MAX_KERNEL_SPAWN_TIME);
501
const sockets = await jupyterSockets(this._kernel.config, this.identity);
502
if (gaveUp) {
503
process.kill(-pid, 9);
504
return;
505
}
506
this.sockets = sockets;
507
success = true;
508
dbg("created main channel");
509
sockets.on("shell", (mesg) => this.emit("shell", mesg));
510
sockets.on("stdin", (mesg) => this.emit("stdin", mesg));
511
sockets.on("iopub", (mesg) => {
512
this.setState("running");
513
if (mesg.content != null && mesg.content.execution_state != null) {
514
this.emit("execution_state", mesg.content.execution_state);
515
}
516
517
if (mesg.content?.comm_id != null) {
518
// A comm message, which gets handled directly.
519
this.process_comm_message_from_kernel(mesg);
520
return;
521
}
522
523
if (this._actions?.capture_output_message(mesg)) {
524
// captured an output message -- do not process further
525
return;
526
}
527
528
this.emit("iopub", mesg);
529
});
530
531
this._kernel.spawn.once("exit", (exit_code, signal) => {
532
if (this._state === "closed") {
533
return;
534
}
535
this.dbg("kernel_exit")(
536
`spawned kernel terminated with exit code ${exit_code} (signal=${signal}); stderr=${this.stderr}`,
537
);
538
const stderr = this.stderr ? `\n...\n${this.stderr}` : "";
539
if (signal != null) {
540
this.setFailed(`Kernel last terminated by signal ${signal}.${stderr}`);
541
} else if (exit_code != null) {
542
this.setFailed(`Kernel last exited with code ${exit_code}.${stderr}`);
543
}
544
this.close();
545
});
546
547
if (this._state == "spawning") {
548
// so we can start sending code execution to the kernel, etc.
549
this.setState("starting");
550
}
551
};
552
553
pid = (): number | undefined => {
554
return this._kernel?.spawn?.pid;
555
};
556
557
// Signal should be a string like "SIGINT", "SIGKILL".
558
// See https://nodejs.org/api/process.html#process_process_kill_pid_signal
559
signal = (signal: string): void => {
560
const dbg = this.dbg("signal");
561
const pid = this.pid();
562
dbg(`pid=${pid}, signal=${signal}`);
563
if (!pid) {
564
return;
565
}
566
try {
567
this.clear_execute_code_queue();
568
process.kill(-pid, signal); // negative to kill the process group
569
} catch (err) {
570
dbg(`error: ${err}`);
571
}
572
};
573
574
close = (): void => {
575
this.dbg("close")();
576
if (this._state === "closed") {
577
return;
578
}
579
if (this.sockets != null) {
580
this.sockets.close();
581
delete this.sockets;
582
}
583
this.setState("closed");
584
if (this.store != null) {
585
this.store.close();
586
delete this.store;
587
}
588
const kernel = kernels.get(this._path);
589
if (kernel != null && kernel.identity === this.identity) {
590
kernels.delete(this._path);
591
}
592
this.removeAllListeners();
593
if (this._kernel != null) {
594
killKernel(this._kernel);
595
delete this._kernel;
596
delete this.sockets;
597
}
598
if (this._execute_code_queue != null) {
599
for (const runningCode of this._execute_code_queue) {
600
runningCode.close();
601
}
602
this._execute_code_queue = [];
603
}
604
};
605
606
// public, since we do use it from some other places...
607
dbg = (f: string): Function => {
608
return (...args) => {
609
//console.log(
610
logger.debug(
611
`jupyter.Kernel('${this.name ?? "no kernel"}',path='${
612
this._path
613
}').${f}`,
614
...args,
615
);
616
};
617
};
618
619
low_level_dbg = (): void => {
620
const dbg = (...args) => logger.silly("low_level_debug", ...args);
621
dbg("Enabling");
622
if (this._kernel) {
623
this._kernel.spawn.all?.on("data", (data) =>
624
dbg("STDIO", data.toString()),
625
);
626
}
627
};
628
629
ensure_running = reuseInFlight(async (): Promise<void> => {
630
const dbg = this.dbg("ensure_running");
631
dbg(this._state);
632
if (this._state == "closed") {
633
throw Error("closed so not possible to ensure running");
634
}
635
if (this._state == "running") {
636
return;
637
}
638
dbg("spawning");
639
await this.spawn();
640
if (this.get_state() != "starting" && this.get_state() != "running") {
641
return;
642
}
643
if (this._kernel?.initCode != null) {
644
for (const code of this._kernel?.initCode ?? []) {
645
dbg("initCode ", code);
646
this.execute_code({ code }, true);
647
}
648
}
649
if (!this.has_ensured_running) {
650
this.has_ensured_running = true;
651
}
652
});
653
654
execute_code = (
655
opts: ExecOpts,
656
skipToFront = false,
657
): CodeExecutionEmitterInterface => {
658
if (opts.halt_on_error === undefined) {
659
// if not specified, default to true.
660
opts.halt_on_error = true;
661
}
662
if (this._state === "closed") {
663
throw Error("closed -- kernel -- execute_code");
664
}
665
const code = new CodeExecutionEmitter(this, opts);
666
if (skipToFront) {
667
this._execute_code_queue.unshift(code);
668
} else {
669
this._execute_code_queue.push(code);
670
}
671
if (this._execute_code_queue.length == 1) {
672
// start it going!
673
this._process_execute_code_queue();
674
}
675
return code;
676
};
677
678
cancel_execute = (id: string): void => {
679
if (this._state === "closed") {
680
return;
681
}
682
const dbg = this.dbg(`cancel_execute(id='${id}')`);
683
if (
684
this._execute_code_queue == null ||
685
this._execute_code_queue.length === 0
686
) {
687
dbg("nothing to do");
688
return;
689
}
690
if (this._execute_code_queue.length > 1) {
691
dbg(
692
"mutate this._execute_code_queue removing everything with the given id",
693
);
694
for (let i = this._execute_code_queue.length - 1; i--; i >= 1) {
695
const code = this._execute_code_queue[i];
696
if (code.id === id) {
697
dbg(`removing entry ${i} from queue`);
698
this._execute_code_queue.splice(i, 1);
699
code.cancel();
700
}
701
}
702
}
703
// if the currently running computation involves this id, send an
704
// interrupt signal (that's the best we can do)
705
if (this._execute_code_queue[0].id === id) {
706
dbg("interrupting running computation");
707
this.signal("SIGINT");
708
}
709
};
710
711
_process_execute_code_queue = async (): Promise<void> => {
712
const dbg = this.dbg("_process_execute_code_queue");
713
dbg(`state='${this._state}'`);
714
if (this._state === "closed") {
715
dbg("closed");
716
return;
717
}
718
if (this._execute_code_queue == null) {
719
dbg("no queue");
720
return;
721
}
722
const n = this._execute_code_queue.length;
723
if (n === 0) {
724
dbg("queue is empty");
725
return;
726
}
727
dbg(
728
`queue has ${n} items; ensure kernel running`,
729
this._execute_code_queue,
730
);
731
try {
732
await this.ensure_running();
733
await this._execute_code_queue[0].go();
734
} catch (err) {
735
dbg(`WARNING: error running kernel -- ${err}`);
736
for (const code of this._execute_code_queue) {
737
code.throw_error(err);
738
}
739
this._execute_code_queue = [];
740
}
741
};
742
743
clear_execute_code_queue = (): void => {
744
const dbg = this.dbg("_clear_execute_code_queue");
745
// ensure no future queued up evaluation occurs (currently running
746
// one will complete and new executions could happen)
747
if (this._state === "closed") {
748
dbg("no op since state is closed");
749
return;
750
}
751
if (this._execute_code_queue == null) {
752
dbg("nothing to do since queue is null");
753
return;
754
}
755
dbg(`clearing queue of size ${this._execute_code_queue.length}`);
756
const mesg = { done: true };
757
for (const code_execution_emitter of this._execute_code_queue.slice(1)) {
758
code_execution_emitter.emit_output(mesg);
759
code_execution_emitter.close();
760
}
761
this._execute_code_queue = [];
762
};
763
764
// This is like execute_code, but async and returns all the results.
765
// This is used for unit testing and interactive work at
766
// the terminal and nbgrader and the stateless api.
767
execute_code_now = async (opts: ExecOpts): Promise<object[]> => {
768
this.dbg("execute_code_now")();
769
if (this._state == "closed") {
770
throw Error("closed");
771
}
772
if (this.failedError) {
773
throw Error(this.failedError);
774
}
775
const output = this.execute_code({ halt_on_error: true, ...opts });
776
const v: object[] = [];
777
for await (const mesg of output.iter()) {
778
v.push(mesg);
779
}
780
if (this.failedError) {
781
// kernel failed during call
782
throw Error(this.failedError);
783
}
784
return v;
785
};
786
787
private saveBlob = (data: string, type: string) => {
788
const blobs = this._actions?.blobs;
789
if (blobs == null) {
790
throw Error("blob store not available");
791
}
792
const buf: Buffer = !type.startsWith("text/")
793
? Buffer.from(data, "base64")
794
: Buffer.from(data);
795
796
const sha1: string = misc_node_sha1(buf);
797
blobs.set(sha1, buf);
798
return sha1;
799
};
800
801
process_output = (content: any): void => {
802
if (this._state === "closed") {
803
return;
804
}
805
const dbg = this.dbg("process_output");
806
if (content.data == null) {
807
// No data -- https://github.com/sagemathinc/cocalc/issues/6665
808
// NO do not do this sort of thing. This is exactly the sort of situation where
809
// content could be very large, and JSON.stringify could use huge amounts of memory.
810
// If you need to see this for debugging, uncomment it.
811
// dbg(trunc(JSON.stringify(content), 300));
812
// todo: FOR now -- later may remove large stdout, stderr, etc...
813
// dbg("no data, so nothing to do");
814
return;
815
}
816
817
remove_redundant_reps(content.data);
818
819
const saveBlob = (data, type) => {
820
try {
821
return this.saveBlob(data, type);
822
} catch (err) {
823
dbg(`WARNING: Jupyter blob store not working -- ${err}`);
824
// i think it'll just send the large data on in the usual way instead
825
// via the output, instead of using the blob store. It's probably just
826
// less efficient.
827
}
828
};
829
830
let type: string;
831
for (type of JUPYTER_MIMETYPES) {
832
if (content.data[type] == null) {
833
continue;
834
}
835
if (
836
type.split("/")[0] === "image" ||
837
type === "application/pdf" ||
838
type === "text/html"
839
) {
840
// Store all images and PDF and text/html in a binary blob store, so we don't have
841
// to involve it in realtime sync. It tends to be large, etc.
842
const sha1 = saveBlob(content.data[type], type);
843
if (type == "text/html") {
844
// NOTE: in general, this may or may not get rendered as an iframe --
845
// we use iframe for backward compatibility.
846
content.data["iframe"] = sha1;
847
delete content.data["text/html"];
848
} else {
849
content.data[type] = sha1;
850
}
851
}
852
}
853
};
854
855
call = async (msg_type: string, content?: any): Promise<any> => {
856
this.dbg("call")(msg_type);
857
if (!this.has_ensured_running) {
858
await this.ensure_running();
859
}
860
// Do a paranoid double check anyways...
861
if (this.sockets == null || this._state == "closed") {
862
throw Error("not running, so can't call");
863
}
864
865
const message = {
866
parent_header: {},
867
metadata: {},
868
channel: "shell",
869
content,
870
header: {
871
msg_id: uuid(),
872
username: "",
873
session: "",
874
msg_type: msg_type as MessageType,
875
version: VERSION,
876
date: new Date().toISOString(),
877
},
878
};
879
880
// Send the message
881
this.sockets.send(message);
882
883
// Wait for the response that has the right msg_id.
884
let the_mesg: any = undefined;
885
const wait_for_response = (cb) => {
886
const f = (mesg) => {
887
if (mesg.parent_header.msg_id === message.header.msg_id) {
888
this.removeListener("shell", f);
889
this.removeListener("closed", g);
890
mesg = deep_copy(mesg.content);
891
if (len(mesg.metadata) === 0) {
892
delete mesg.metadata;
893
}
894
the_mesg = mesg;
895
cb();
896
}
897
};
898
const g = () => {
899
this.removeListener("shell", f);
900
this.removeListener("closed", g);
901
cb("closed - jupyter - kernel - call");
902
};
903
this.on("shell", f);
904
this.on("closed", g);
905
};
906
await callback(wait_for_response);
907
return the_mesg;
908
};
909
910
complete = async (opts: { code: any; cursor_pos: any }): Promise<any> => {
911
const dbg = this.dbg("complete");
912
dbg(`code='${opts.code}', cursor_pos='${opts.cursor_pos}'`);
913
return await this.call("complete_request", opts);
914
};
915
916
introspect = async (opts: {
917
code: any;
918
cursor_pos: any;
919
detail_level: any;
920
}): Promise<any> => {
921
const dbg = this.dbg("introspect");
922
dbg(
923
`code='${opts.code}', cursor_pos='${opts.cursor_pos}', detail_level=${opts.detail_level}`,
924
);
925
return await this.call("inspect_request", opts);
926
};
927
928
kernel_info = reuseInFlight(async (): Promise<KernelInfo> => {
929
if (this._kernel_info !== undefined) {
930
return this._kernel_info;
931
}
932
const info = await this.call("kernel_info_request");
933
info.nodejs_version = process.version;
934
if (this._actions != null) {
935
info.start_time = this._actions.store.get("start_time");
936
}
937
this._kernel_info = info;
938
return info;
939
});
940
941
save_ipynb_file = async (opts?): Promise<void> => {
942
if (this._actions != null) {
943
await this._actions.save_ipynb_file(opts);
944
} else {
945
throw Error("save_ipynb_file -- ERROR: actions not known");
946
}
947
};
948
949
more_output = (id: string): any[] => {
950
if (id == null) {
951
throw new Error("must specify id");
952
}
953
if (this._actions == null) {
954
throw new Error("must have redux actions");
955
}
956
return this._actions.store.get_more_output(id) ?? [];
957
};
958
959
nbconvert = reuseInFlight(
960
async (args: string[], timeout?: number): Promise<void> => {
961
if (timeout === undefined) {
962
timeout = 60; // seconds
963
}
964
if (!is_array(args)) {
965
throw new Error("args must be an array");
966
}
967
args = copy(args);
968
args.push("--");
969
args.push(this._filename);
970
await nbconvert({
971
args,
972
timeout,
973
directory: this._directory,
974
});
975
},
976
);
977
978
load_attachment = async (path: string): Promise<string> => {
979
const dbg = this.dbg("load_attachment");
980
dbg(`path='${path}'`);
981
if (path[0] !== "/") {
982
path = join(process.env.HOME ?? "", path);
983
}
984
const f = async (): Promise<string> => {
985
const bs = this.get_blob_store();
986
if (bs == null) {
987
throw new Error("BlobStore not available");
988
}
989
return await bs.readFile(path);
990
};
991
try {
992
return await retry_until_success({
993
f,
994
max_time: 30000,
995
});
996
} catch (err) {
997
unlink(path); // TODO: think through again if this is the right thing to do.
998
throw err;
999
}
1000
};
1001
1002
// This is called by project-actions when exporting the notebook
1003
// to an ipynb file:
1004
get_blob_store = (): BlobStoreInterface | undefined => {
1005
const blobs = this._actions?.blobs;
1006
if (blobs == null) {
1007
return;
1008
}
1009
const t = new TextDecoder();
1010
return {
1011
getBase64: (sha1: string): string | undefined => {
1012
const buf = blobs.get(sha1);
1013
if (buf === undefined) {
1014
return buf;
1015
}
1016
return uint8ArrayToBase64(buf);
1017
},
1018
1019
getString: (sha1: string): string | undefined => {
1020
const buf = blobs.get(sha1);
1021
if (buf === undefined) {
1022
return buf;
1023
}
1024
return t.decode(buf);
1025
},
1026
1027
readFile: async (path: string): Promise<string> => {
1028
const buf = await readFile(path);
1029
const sha1: string = misc_node_sha1(buf);
1030
blobs.set(sha1, buf);
1031
return sha1;
1032
},
1033
1034
saveBase64: (data: string) => {
1035
const buf = Buffer.from(data, "base64");
1036
const sha1: string = misc_node_sha1(buf);
1037
blobs.set(sha1, buf);
1038
return sha1;
1039
},
1040
};
1041
};
1042
1043
process_comm_message_from_kernel = (mesg): void => {
1044
if (this._actions == null) {
1045
return;
1046
}
1047
const dbg = this.dbg("process_comm_message_from_kernel");
1048
// This can be HUGE so don't print out the entire message; e.g., it could contain
1049
// massive binary data!
1050
dbg(mesg.header);
1051
this._actions.process_comm_message_from_kernel(mesg);
1052
};
1053
1054
ipywidgetsGetBuffer = (
1055
model_id: string,
1056
// buffer_path is the string[] *or* the JSON of that.
1057
buffer_path: string | string[],
1058
): Buffer | undefined => {
1059
if (typeof buffer_path != "string") {
1060
buffer_path = JSON.stringify(buffer_path);
1061
}
1062
return this._actions?.syncdb.ipywidgets_state?.getBuffer(
1063
model_id,
1064
buffer_path,
1065
);
1066
};
1067
1068
send_comm_message_to_kernel = ({
1069
msg_id,
1070
comm_id,
1071
target_name,
1072
data,
1073
buffers64,
1074
buffers,
1075
}: {
1076
msg_id: string;
1077
comm_id: string;
1078
target_name: string;
1079
data: any;
1080
buffers64?: string[];
1081
buffers?: Buffer[];
1082
}): void => {
1083
if (this.sockets == null) {
1084
throw Error("sockets not initialized");
1085
}
1086
const dbg = this.dbg("send_comm_message_to_kernel");
1087
// this is HUGE
1088
// dbg({ msg_id, comm_id, target_name, data, buffers64 });
1089
if (buffers64 != null && buffers64.length > 0) {
1090
buffers = buffers64?.map((x) => Buffer.from(base64ToBuffer(x))) ?? [];
1091
dbg(
1092
"buffers lengths = ",
1093
buffers.map((x) => x.byteLength),
1094
);
1095
if (this._actions?.syncdb.ipywidgets_state != null) {
1096
this._actions.syncdb.ipywidgets_state.setModelBuffers(
1097
comm_id,
1098
data.buffer_paths,
1099
buffers,
1100
false,
1101
);
1102
}
1103
}
1104
1105
const message = {
1106
parent_header: {},
1107
metadata: {},
1108
channel: "shell",
1109
content: { comm_id, target_name, data },
1110
header: {
1111
msg_id,
1112
username: "user",
1113
session: "",
1114
msg_type: "comm_msg" as MessageType,
1115
version: VERSION,
1116
date: new Date().toISOString(),
1117
},
1118
buffers,
1119
};
1120
1121
// HUGE
1122
// dbg(message);
1123
// "The Kernel listens for these messages on the Shell channel,
1124
// and the Frontend listens for them on the IOPub channel." -- docs
1125
this.sockets.send(message);
1126
};
1127
1128
chdir = async (path: string): Promise<void> => {
1129
if (!this.name) return; // no kernel, no current directory
1130
const dbg = this.dbg("chdir");
1131
dbg({ path });
1132
let lang;
1133
try {
1134
// using probably cached data, so likely very fast
1135
lang = await getLanguage(this.name);
1136
} catch (err) {
1137
dbg("WARNING ", err);
1138
const info = await this.kernel_info();
1139
lang = info.language_info?.name ?? "";
1140
}
1141
1142
const absPath = getAbsolutePathFromHome(path);
1143
const code = createChdirCommand(lang, absPath);
1144
// code = '' if no command needed, e.g., for sparql.
1145
if (code) {
1146
await this.execute_code_now({ code });
1147
}
1148
};
1149
}
1150
1151
export function get_kernel_by_pid(pid: number): JupyterKernel | undefined {
1152
for (const kernel of Object.values(kernels.kernels)) {
1153
if (kernel.get_spawned_kernel()?.spawn.pid === pid) {
1154
return kernel;
1155
}
1156
}
1157
return;
1158
}
1159
1160