Path: blob/master/src/packages/jupyter/kernel/kernel.ts
1447 views
/*1* This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.2* License: MS-RSL – see LICENSE.md for details3*/45/*6Jupyter Backend78For interactive testing:910$ node1112> j = require('./dist/kernel'); k = j.kernel({name:'python3', path:'x.ipynb'});13> console.log(JSON.stringify(await k.execute_code_now({code:'2+3'}),0,2))1415*/1617// POOL VERSION - faster to restart but possible subtle issues18const USE_KERNEL_POOL = true;1920// const DEBUG = true; // only for extreme debugging.21const DEBUG = false; // normal mode22if (DEBUG) {23console.log("Enabling low level Jupyter kernel debugging.");24}2526// NOTE: we choose to use node-cleanup instead of the much more27// popular exit-hook, since node-cleanup actually works for us.28// https://github.com/jtlapp/node-cleanup/issues/1629// Also exit-hook is hard to import from commonjs.30import nodeCleanup from "node-cleanup";31import { reuseInFlight } from "@cocalc/util/reuse-in-flight";32import { callback } from "awaiting";33import type { MessageType } from "@cocalc/jupyter/zmq/types";34import { jupyterSockets, type JupyterSockets } from "@cocalc/jupyter/zmq";35import { EventEmitter } from "node:events";36import { unlink } from "@cocalc/backend/misc/async-utils-node";37import { remove_redundant_reps } from "@cocalc/jupyter/ipynb/import-from-ipynb";38import { JupyterActions } from "@cocalc/jupyter/redux/project-actions";39import {40type BlobStoreInterface,41CodeExecutionEmitterInterface,42ExecOpts,43JupyterKernelInterface,44KernelInfo,45} from "@cocalc/jupyter/types/project-interface";46import { JupyterStore } from "@cocalc/jupyter/redux/store";47import { JUPYTER_MIMETYPES } from "@cocalc/jupyter/util/misc";48import type { SyncDB } from "@cocalc/sync/editor/db/sync";49import { retry_until_success, until } from "@cocalc/util/async-utils";50import createChdirCommand from "@cocalc/util/jupyter-api/chdir-commands";51import { key_value_store } from "@cocalc/util/key-value-store";52import {53copy,54deep_copy,55is_array,56len,57merge,58original_path,59path_split,60uuid,61uint8ArrayToBase64,62} from "@cocalc/util/misc";63import { CodeExecutionEmitter } from "@cocalc/jupyter/execute/execute-code";64import {65getLanguage,66get_kernel_data_by_name,67} from "@cocalc/jupyter/kernel/kernel-data";6869import launchJupyterKernel, {70LaunchJupyterOpts,71SpawnedKernel,72killKernel,73} from "@cocalc/jupyter/pool/pool";74// non-pool version75import launchJupyterKernelNoPool from "@cocalc/jupyter/kernel/launch-kernel";76import { kernels } from "./kernels";77import { getAbsolutePathFromHome } from "@cocalc/jupyter/util/fs";78import type { KernelParams } from "@cocalc/jupyter/types/kernel";79import { redux_name } from "@cocalc/util/redux/name";80import { redux } from "@cocalc/jupyter/redux/app";81import { VERSION } from "@cocalc/jupyter/kernel/version";82import type { NbconvertParams } from "@cocalc/util/jupyter/types";83import type { Client } from "@cocalc/sync/client/types";84import { getLogger } from "@cocalc/backend/logger";85import { base64ToBuffer } from "@cocalc/util/base64";86import { sha1 as misc_node_sha1 } from "@cocalc/backend/misc_node";87import { join } from "path";88import { readFile } from "fs/promises";8990const MAX_KERNEL_SPAWN_TIME = 120 * 1000;9192type State = "failed" | "off" | "spawning" | "starting" | "running" | "closed";9394const logger = getLogger("jupyter:kernel");9596// We make it so nbconvert functionality can be dynamically enabled97// by calling this at runtime. The reason is because some users of98// this code (e.g., remote kernels) don't need to provide nbconvert99// functionality, and our implementation has some heavy dependencies,100// e.g., on a big chunk of the react frontend.101let nbconvert: (opts: NbconvertParams) => Promise<void> = async () => {102throw Error("nbconvert is not enabled");103};104export function initNbconvert(f) {105nbconvert = f;106}107108/*109We set a few extra user-specific options for the environment in which110Sage-based Jupyter kernels run; these are more multi-user friendly.111*/112const SAGE_JUPYTER_ENV = merge(copy(process.env), {113PYTHONUSERBASE: `${process.env.HOME}/.local`,114PYTHON_EGG_CACHE: `${process.env.HOME}/.sage/.python-eggs`,115R_MAKEVARS_USER: `${process.env.HOME}/.sage/R/Makevars.user`,116});117118// Initialize the actions and store for working with a specific119// Jupyter notebook. The syncdb is the syncdoc associated to120// the ipynb file, and this function creates the corresponding121// actions and store, which make it possible to work with this122// notebook.123export async function initJupyterRedux(syncdb: SyncDB, client: Client) {124const project_id = syncdb.project_id;125if (project_id == null) {126throw Error("project_id must be defined");127}128if (syncdb.get_state() == "closed") {129throw Error("syncdb must not be closed");130}131132// This path is the file we will watch for changes and save to, which is in the original133// official ipynb format:134const path = original_path(syncdb.get_path());135logger.debug("initJupyterRedux", path);136137const name = redux_name(project_id, path);138if (redux.getStore(name) != null && redux.getActions(name) != null) {139logger.debug(140"initJupyterRedux",141path,142" -- existing actions, so removing them",143);144// The redux info for this notebook already exists, so don't145// try to make it again without first deleting the existing one.146// Having two at once basically results in things feeling hung.147// This should never happen, but we ensure it148// See https://github.com/sagemathinc/cocalc/issues/4331149await removeJupyterRedux(path, project_id);150}151const store = redux.createStore(name, JupyterStore);152const actions = redux.createActions(name, JupyterActions);153154actions._init(project_id, path, syncdb, store, client);155156syncdb.once("error", (err) =>157logger.error("initJupyterRedux", path, "syncdb ERROR", err),158);159syncdb.once("ready", () =>160logger.debug("initJupyterRedux", path, "syncdb ready"),161);162}163164export async function getJupyterRedux(syncdb: SyncDB) {165const project_id = syncdb.project_id;166const path = original_path(syncdb.get_path());167const name = redux_name(project_id, path);168return { actions: redux.getActions(name), store: redux.getStore(name) };169}170171// Remove the store/actions for a given Jupyter notebook,172// and also close the kernel if it is running.173export async function removeJupyterRedux(174path: string,175project_id: string,176): Promise<void> {177logger.debug("removeJupyterRedux", path);178// if there is a kernel, close it179try {180await kernels.get(path)?.close();181} catch (_err) {182// ignore183}184const name = redux_name(project_id, path);185const actions = redux.getActions(name);186if (actions != null) {187try {188await actions.close();189} catch (err) {190logger.debug(191"removeJupyterRedux",192path,193" WARNING -- issue closing actions",194err,195);196}197}198redux.removeStore(name);199redux.removeActions(name);200}201202export function kernel(opts: KernelParams): JupyterKernel {203return new JupyterKernel(opts.name, opts.path, opts.actions, opts.ulimit);204}205206/*207Jupyter Kernel interface.208209The kernel does *NOT* start up until either spawn is explicitly called, or210code execution is explicitly requested. This makes it possible to211call process_output without spawning an actual kernel.212*/213214// Ensure that the kernels all get killed when the process exits.215nodeCleanup(() => {216for (const kernelPath in kernels.kernels) {217// We do NOT await the close since that's not really218// supported or possible in general.219const { _kernel } = kernels.kernels[kernelPath];220if (_kernel) {221killKernel(_kernel);222}223}224});225226// NOTE: keep JupyterKernel implementation private -- use the kernel function227// above, and the interface defined in types.228export class JupyterKernel229extends EventEmitter230implements JupyterKernelInterface231{232// name -- if undefined that means "no actual Jupyter kernel" (i.e., this JupyterKernel exists233// here, but there is no actual separate real Jupyter kernel process and one won't be created).234// Everything should work, except you can't *spawn* such a kernel.235public name: string | undefined;236237// this is a key:value store used mainly for stdin support right now. NOTHING TO DO WITH REDUX!238public store: any;239240public readonly identity: string = uuid();241242private stderr: string = "";243private ulimit?: string;244private _path: string;245private _actions?: JupyterActions;246private _state: State;247private _directory: string;248private _filename: string;249public _kernel?: SpawnedKernel;250private _kernel_info?: KernelInfo;251public _execute_code_queue: CodeExecutionEmitter[] = [];252public sockets?: JupyterSockets;253private has_ensured_running: boolean = false;254private failedError: string = "";255256constructor(257name: string | undefined,258_path: string,259_actions: JupyterActions | undefined,260ulimit: string | undefined,261) {262super();263264this.ulimit = ulimit;265266this.name = name;267this._path = _path;268this._actions = _actions;269270this.store = key_value_store();271const { head, tail } = path_split(getAbsolutePathFromHome(this._path));272this._directory = head;273this._filename = tail;274this.setState("off");275this._execute_code_queue = [];276if (kernels.get(this._path) !== undefined) {277// This happens when we change the kernel for a given file, e.g.,278// from python2 to python3.279// Obviously, it is important to clean up after the old kernel.280kernels.get(this._path)?.close();281}282kernels.set(this._path, this);283this.setMaxListeners(100);284const dbg = this.dbg("constructor");285dbg("done");286}287288get_path = () => {289return this._path;290};291292// no-op if calling it doesn't change the state.293private setState = (state: State): void => {294// state = 'off' --> 'spawning' --> 'starting' --> 'running' --> 'closed'295// 'failed'296if (this._state == state) return;297this._state = state;298this.emit("state", this._state);299this.emit(this._state); // we *SHOULD* use this everywhere, not above.300};301302private setFailed = (error: string): void => {303this.failedError = error;304this.emit("kernel_error", error);305this.setState("failed");306};307308get_state = (): string => {309return this._state;310};311312private spawnedAlready = false;313spawn = async (spawn_opts?: {314env?: { [key: string]: string };315}): Promise<void> => {316if (this._state === "closed") {317// game over!318throw Error("closed -- kernel spawn");319}320if (!this.name) {321// spawning not allowed.322throw Error("cannot spawn since no kernel is set");323}324if (["running", "starting"].includes(this._state)) {325// Already spawned, so no need to do it again.326return;327}328329if (this.spawnedAlready) {330return;331}332this.spawnedAlready = true;333334this.setState("spawning");335const dbg = this.dbg("spawn");336dbg("spawning kernel...");337338// ****339// CRITICAL: anything added to opts better not be specific340// to the kernel path or it will completely break using a341// pool, which makes things massively slower.342// ****343344const opts: LaunchJupyterOpts = {345env: spawn_opts?.env ?? {},346ulimit: this.ulimit,347};348349try {350const kernelData = await get_kernel_data_by_name(this.name);351// This matches "sage", "sage-x.y", and Sage Python3 ("sage -python -m ipykernel")352if (kernelData.argv[0].startsWith("sage")) {353dbg("setting special environment for Sage kernels");354opts.env = merge(opts.env, SAGE_JUPYTER_ENV);355}356} catch (err) {357dbg(`No kernelData available for ${this.name}`);358}359360// Make cocalc default to the colab renderer for cocalc-jupyter, since361// this one happens to work best for us, and they don't have a custom362// one for us. See https://plot.ly/python/renderers/ and363// https://github.com/sagemathinc/cocalc/issues/4259364opts.env.PLOTLY_RENDERER = "colab";365opts.env.COCALC_JUPYTER_KERNELNAME = this.name;366367// !!! WARNING: do NOT add anything new here that depends on that path!!!!368// Otherwise the pool will switch to falling back to not being used, and369// cocalc would then be massively slower.370// Non-uniform customization.371// launchJupyterKernel is explicitly smart enough to deal with opts.cwd372if (this._directory) {373opts.cwd = this._directory;374}375// launchJupyterKernel is explicitly smart enough to deal with opts.env.COCALC_JUPYTER_FILENAME376opts.env.COCALC_JUPYTER_FILENAME = this._path;377// and launchJupyterKernel is NOT smart enough to deal with anything else!378379try {380if (USE_KERNEL_POOL) {381dbg("launching Jupyter kernel, possibly from pool");382this._kernel = await launchJupyterKernel(this.name, opts);383} else {384dbg("launching Jupyter kernel, NOT using pool");385this._kernel = await launchJupyterKernelNoPool(this.name, opts);386}387dbg("finishing kernel setup");388await this.finishSpawningKernel();389} catch (err) {390dbg(`ERROR spawning kernel - ${err}, ${err.stack}`);391// @ts-ignore392if (this._state == "closed") {393throw Error("closed");394}395// console.trace(err);396this.setFailed(397`**Unable to Spawn Jupyter Kernel:**\n\n${err} \n\nTry this in a terminal to help debug this (or contact support): \`jupyter console --kernel=${this.name}\`\n\nOnce you fix the problem, explicitly restart this kernel to test here.`,398);399}400};401402get_spawned_kernel = () => {403return this._kernel;404};405406get_connection_file = (): string | undefined => {407return this._kernel?.connectionFile;408};409410private finishSpawningKernel = async () => {411const dbg = this.dbg("finishSpawningKernel");412dbg("now finishing spawn of kernel...");413414if (DEBUG) {415this.low_level_dbg();416}417418if (!this._kernel) {419throw Error("_kernel must be defined");420}421this._kernel.spawn.on("error", (err) => {422const error = `${err}\n${this.stderr}`;423dbg("kernel error", error);424this.setFailed(error);425});426427// Track stderr from the subprocess itself (the kernel).428// This is useful for debugging broken kernels, etc., and is especially429// useful since it exists even if the kernel sends nothing over any430// zmq sockets (e.g., due to being very broken).431this.stderr = "";432this._kernel.spawn.stderr.on("data", (data) => {433const s = data.toString();434this.stderr += s;435if (this.stderr.length > 5000) {436// truncate if gets long for some reason -- only the end will437// be useful...438this.stderr = this.stderr.slice(this.stderr.length - 4000);439}440});441442this._kernel.spawn.stdout.on("data", (_data) => {443// NOTE: it is very important to read stdout (and stderr above)444// even if we **totally ignore** the data. Otherwise, exec445// might overflow446// https://github.com/sagemathinc/cocalc/issues/5065447});448449dbg("create main channel...", this._kernel.config);450451// This horrible code is because jupyterSockets will just "hang452// forever" if the kernel doesn't get spawned for some reason.453// (TODO: now that I completely rewrote jupytersockets, we could454// just put a timeout there or better checks? not sure.)455// Thus we do some tests, waiting for at least 2 seconds for there456// to be a pid. This is complicated and ugly, and I'm sorry about that,457// but sometimes that's life.458try {459await until(460() => {461if (this._state != "spawning") {462// gave up463return true;464}465if (this.pid()) {466// there's a process :-)467return true;468}469return false;470},471{ start: 100, max: 100, timeout: 3000 },472);473} catch (err) {474// timed out475this.setFailed(`Failed to start kernel process. ${err}`);476return;477}478if (this._state != "spawning") {479// got canceled480return;481}482const pid = this.pid();483if (!pid) {484throw Error("bug");485}486let success = false;487let gaveUp = false;488setTimeout(() => {489if (!success) {490gaveUp = true;491// it's been 30s and the channels didn't work. Let's give up.492// probably the kernel process just failed.493this.setFailed("Failed to start kernel process -- timeout");494// We can't yet "cancel" createMainChannel itself -- that will require495// rewriting that dependency.496// https://github.com/sagemathinc/cocalc/issues/7040497// I did rewrite that -- so let's revisit this!498}499}, MAX_KERNEL_SPAWN_TIME);500const sockets = await jupyterSockets(this._kernel.config, this.identity);501if (gaveUp) {502process.kill(-pid, 9);503return;504}505this.sockets = sockets;506success = true;507dbg("created main channel");508sockets.on("shell", (mesg) => this.emit("shell", mesg));509sockets.on("stdin", (mesg) => this.emit("stdin", mesg));510sockets.on("iopub", (mesg) => {511this.setState("running");512if (mesg.content != null && mesg.content.execution_state != null) {513this.emit("execution_state", mesg.content.execution_state);514}515516if (mesg.content?.comm_id != null) {517// A comm message, which gets handled directly.518this.process_comm_message_from_kernel(mesg);519return;520}521522if (this._actions?.capture_output_message(mesg)) {523// captured an output message -- do not process further524return;525}526527this.emit("iopub", mesg);528});529530this._kernel.spawn.once("exit", (exit_code, signal) => {531if (this._state === "closed") {532return;533}534this.dbg("kernel_exit")(535`spawned kernel terminated with exit code ${exit_code} (signal=${signal}); stderr=${this.stderr}`,536);537const stderr = this.stderr ? `\n...\n${this.stderr}` : "";538if (signal != null) {539this.setFailed(`Kernel last terminated by signal ${signal}.${stderr}`);540} else if (exit_code != null) {541this.setFailed(`Kernel last exited with code ${exit_code}.${stderr}`);542}543this.close();544});545546if (this._state == "spawning") {547// so we can start sending code execution to the kernel, etc.548this.setState("starting");549}550};551552pid = (): number | undefined => {553return this._kernel?.spawn?.pid;554};555556// Signal should be a string like "SIGINT", "SIGKILL".557// See https://nodejs.org/api/process.html#process_process_kill_pid_signal558signal = (signal: string): void => {559const dbg = this.dbg("signal");560const pid = this.pid();561dbg(`pid=${pid}, signal=${signal}`);562if (!pid) {563return;564}565try {566this.clear_execute_code_queue();567process.kill(-pid, signal); // negative to kill the process group568} catch (err) {569dbg(`error: ${err}`);570}571};572573close = (): void => {574this.dbg("close")();575if (this._state === "closed") {576return;577}578if (this.sockets != null) {579this.sockets.close();580delete this.sockets;581}582this.setState("closed");583if (this.store != null) {584this.store.close();585delete this.store;586}587const kernel = kernels.get(this._path);588if (kernel != null && kernel.identity === this.identity) {589kernels.delete(this._path);590}591this.removeAllListeners();592if (this._kernel != null) {593killKernel(this._kernel);594delete this._kernel;595delete this.sockets;596}597if (this._execute_code_queue != null) {598for (const runningCode of this._execute_code_queue) {599runningCode.close();600}601this._execute_code_queue = [];602}603};604605// public, since we do use it from some other places...606dbg = (f: string): Function => {607return (...args) => {608//console.log(609logger.debug(610`jupyter.Kernel('${this.name ?? "no kernel"}',path='${611this._path612}').${f}`,613...args,614);615};616};617618low_level_dbg = (): void => {619const dbg = (...args) => logger.silly("low_level_debug", ...args);620dbg("Enabling");621if (this._kernel) {622this._kernel.spawn.all?.on("data", (data) =>623dbg("STDIO", data.toString()),624);625}626};627628ensure_running = reuseInFlight(async (): Promise<void> => {629const dbg = this.dbg("ensure_running");630dbg(this._state);631if (this._state == "closed") {632throw Error("closed so not possible to ensure running");633}634if (this._state == "running") {635return;636}637dbg("spawning");638await this.spawn();639if (this.get_state() != "starting" && this.get_state() != "running") {640return;641}642if (this._kernel?.initCode != null) {643for (const code of this._kernel?.initCode ?? []) {644dbg("initCode ", code);645this.execute_code({ code }, true);646}647}648if (!this.has_ensured_running) {649this.has_ensured_running = true;650}651});652653execute_code = (654opts: ExecOpts,655skipToFront = false,656): CodeExecutionEmitterInterface => {657if (opts.halt_on_error === undefined) {658// if not specified, default to true.659opts.halt_on_error = true;660}661if (this._state === "closed") {662throw Error("closed -- kernel -- execute_code");663}664const code = new CodeExecutionEmitter(this, opts);665if (skipToFront) {666this._execute_code_queue.unshift(code);667} else {668this._execute_code_queue.push(code);669}670if (this._execute_code_queue.length == 1) {671// start it going!672this._process_execute_code_queue();673}674return code;675};676677cancel_execute = (id: string): void => {678if (this._state === "closed") {679return;680}681const dbg = this.dbg(`cancel_execute(id='${id}')`);682if (683this._execute_code_queue == null ||684this._execute_code_queue.length === 0685) {686dbg("nothing to do");687return;688}689if (this._execute_code_queue.length > 1) {690dbg(691"mutate this._execute_code_queue removing everything with the given id",692);693for (let i = this._execute_code_queue.length - 1; i--; i >= 1) {694const code = this._execute_code_queue[i];695if (code.id === id) {696dbg(`removing entry ${i} from queue`);697this._execute_code_queue.splice(i, 1);698code.cancel();699}700}701}702// if the currently running computation involves this id, send an703// interrupt signal (that's the best we can do)704if (this._execute_code_queue[0].id === id) {705dbg("interrupting running computation");706this.signal("SIGINT");707}708};709710_process_execute_code_queue = async (): Promise<void> => {711const dbg = this.dbg("_process_execute_code_queue");712dbg(`state='${this._state}'`);713if (this._state === "closed") {714dbg("closed");715return;716}717if (this._execute_code_queue == null) {718dbg("no queue");719return;720}721const n = this._execute_code_queue.length;722if (n === 0) {723dbg("queue is empty");724return;725}726dbg(727`queue has ${n} items; ensure kernel running`,728this._execute_code_queue,729);730try {731await this.ensure_running();732await this._execute_code_queue[0].go();733} catch (err) {734dbg(`WARNING: error running kernel -- ${err}`);735for (const code of this._execute_code_queue) {736code.throw_error(err);737}738this._execute_code_queue = [];739}740};741742clear_execute_code_queue = (): void => {743const dbg = this.dbg("_clear_execute_code_queue");744// ensure no future queued up evaluation occurs (currently running745// one will complete and new executions could happen)746if (this._state === "closed") {747dbg("no op since state is closed");748return;749}750if (this._execute_code_queue == null) {751dbg("nothing to do since queue is null");752return;753}754dbg(`clearing queue of size ${this._execute_code_queue.length}`);755const mesg = { done: true };756for (const code_execution_emitter of this._execute_code_queue.slice(1)) {757code_execution_emitter.emit_output(mesg);758code_execution_emitter.close();759}760this._execute_code_queue = [];761};762763// This is like execute_code, but async and returns all the results.764// This is used for unit testing and interactive work at765// the terminal and nbgrader and the stateless api.766execute_code_now = async (opts: ExecOpts): Promise<object[]> => {767this.dbg("execute_code_now")();768if (this._state == "closed") {769throw Error("closed");770}771if (this.failedError) {772throw Error(this.failedError);773}774const output = this.execute_code({ halt_on_error: true, ...opts });775const v: object[] = [];776for await (const mesg of output.iter()) {777v.push(mesg);778}779if (this.failedError) {780// kernel failed during call781throw Error(this.failedError);782}783return v;784};785786private saveBlob = (data: string, type: string) => {787const blobs = this._actions?.blobs;788if (blobs == null) {789throw Error("blob store not available");790}791const buf: Buffer = !type.startsWith("text/")792? Buffer.from(data, "base64")793: Buffer.from(data);794795const sha1: string = misc_node_sha1(buf);796blobs.set(sha1, buf);797return sha1;798};799800process_output = (content: any): void => {801if (this._state === "closed") {802return;803}804const dbg = this.dbg("process_output");805if (content.data == null) {806// No data -- https://github.com/sagemathinc/cocalc/issues/6665807// NO do not do this sort of thing. This is exactly the sort of situation where808// content could be very large, and JSON.stringify could use huge amounts of memory.809// If you need to see this for debugging, uncomment it.810// dbg(trunc(JSON.stringify(content), 300));811// todo: FOR now -- later may remove large stdout, stderr, etc...812// dbg("no data, so nothing to do");813return;814}815816remove_redundant_reps(content.data);817818const saveBlob = (data, type) => {819try {820return this.saveBlob(data, type);821} catch (err) {822dbg(`WARNING: Jupyter blob store not working -- ${err}`);823// i think it'll just send the large data on in the usual way instead824// via the output, instead of using the blob store. It's probably just825// less efficient.826}827};828829let type: string;830for (type of JUPYTER_MIMETYPES) {831if (content.data[type] == null) {832continue;833}834if (835type.split("/")[0] === "image" ||836type === "application/pdf" ||837type === "text/html"838) {839// Store all images and PDF and text/html in a binary blob store, so we don't have840// to involve it in realtime sync. It tends to be large, etc.841const sha1 = saveBlob(content.data[type], type);842if (type == "text/html") {843// NOTE: in general, this may or may not get rendered as an iframe --844// we use iframe for backward compatibility.845content.data["iframe"] = sha1;846delete content.data["text/html"];847} else {848content.data[type] = sha1;849}850}851}852};853854call = async (msg_type: string, content?: any): Promise<any> => {855this.dbg("call")(msg_type);856if (!this.has_ensured_running) {857await this.ensure_running();858}859// Do a paranoid double check anyways...860if (this.sockets == null || this._state == "closed") {861throw Error("not running, so can't call");862}863864const message = {865parent_header: {},866metadata: {},867channel: "shell",868content,869header: {870msg_id: uuid(),871username: "",872session: "",873msg_type: msg_type as MessageType,874version: VERSION,875date: new Date().toISOString(),876},877};878879// Send the message880this.sockets.send(message);881882// Wait for the response that has the right msg_id.883let the_mesg: any = undefined;884const wait_for_response = (cb) => {885const f = (mesg) => {886if (mesg.parent_header.msg_id === message.header.msg_id) {887this.removeListener("shell", f);888this.removeListener("closed", g);889mesg = deep_copy(mesg.content);890if (len(mesg.metadata) === 0) {891delete mesg.metadata;892}893the_mesg = mesg;894cb();895}896};897const g = () => {898this.removeListener("shell", f);899this.removeListener("closed", g);900cb("closed - jupyter - kernel - call");901};902this.on("shell", f);903this.on("closed", g);904};905await callback(wait_for_response);906return the_mesg;907};908909complete = async (opts: { code: any; cursor_pos: any }): Promise<any> => {910const dbg = this.dbg("complete");911dbg(`code='${opts.code}', cursor_pos='${opts.cursor_pos}'`);912return await this.call("complete_request", opts);913};914915introspect = async (opts: {916code: any;917cursor_pos: any;918detail_level: any;919}): Promise<any> => {920const dbg = this.dbg("introspect");921dbg(922`code='${opts.code}', cursor_pos='${opts.cursor_pos}', detail_level=${opts.detail_level}`,923);924return await this.call("inspect_request", opts);925};926927kernel_info = reuseInFlight(async (): Promise<KernelInfo> => {928if (this._kernel_info !== undefined) {929return this._kernel_info;930}931const info = await this.call("kernel_info_request");932info.nodejs_version = process.version;933if (this._actions != null) {934info.start_time = this._actions.store.get("start_time");935}936this._kernel_info = info;937return info;938});939940save_ipynb_file = async (opts?): Promise<void> => {941if (this._actions != null) {942await this._actions.save_ipynb_file(opts);943} else {944throw Error("save_ipynb_file -- ERROR: actions not known");945}946};947948more_output = (id: string): any[] => {949if (id == null) {950throw new Error("must specify id");951}952if (this._actions == null) {953throw new Error("must have redux actions");954}955return this._actions.store.get_more_output(id) ?? [];956};957958nbconvert = reuseInFlight(959async (args: string[], timeout?: number): Promise<void> => {960if (timeout === undefined) {961timeout = 60; // seconds962}963if (!is_array(args)) {964throw new Error("args must be an array");965}966args = copy(args);967args.push("--");968args.push(this._filename);969await nbconvert({970args,971timeout,972directory: this._directory,973});974},975);976977load_attachment = async (path: string): Promise<string> => {978const dbg = this.dbg("load_attachment");979dbg(`path='${path}'`);980if (path[0] !== "/") {981path = join(process.env.HOME ?? "", path);982}983const f = async (): Promise<string> => {984const bs = this.get_blob_store();985if (bs == null) {986throw new Error("BlobStore not available");987}988return await bs.readFile(path);989};990try {991return await retry_until_success({992f,993max_time: 30000,994});995} catch (err) {996unlink(path); // TODO: think through again if this is the right thing to do.997throw err;998}999};10001001// This is called by project-actions when exporting the notebook1002// to an ipynb file:1003get_blob_store = (): BlobStoreInterface | undefined => {1004const blobs = this._actions?.blobs;1005if (blobs == null) {1006return;1007}1008const t = new TextDecoder();1009return {1010getBase64: (sha1: string): string | undefined => {1011const buf = blobs.get(sha1);1012if (buf === undefined) {1013return buf;1014}1015return uint8ArrayToBase64(buf);1016},10171018getString: (sha1: string): string | undefined => {1019const buf = blobs.get(sha1);1020if (buf === undefined) {1021return buf;1022}1023return t.decode(buf);1024},10251026readFile: async (path: string): Promise<string> => {1027const buf = await readFile(path);1028const sha1: string = misc_node_sha1(buf);1029blobs.set(sha1, buf);1030return sha1;1031},10321033saveBase64: (data: string) => {1034const buf = Buffer.from(data, "base64");1035const sha1: string = misc_node_sha1(buf);1036blobs.set(sha1, buf);1037return sha1;1038},1039};1040};10411042process_comm_message_from_kernel = (mesg): void => {1043if (this._actions == null) {1044return;1045}1046const dbg = this.dbg("process_comm_message_from_kernel");1047// This can be HUGE so don't print out the entire message; e.g., it could contain1048// massive binary data!1049dbg(mesg.header);1050this._actions.process_comm_message_from_kernel(mesg);1051};10521053ipywidgetsGetBuffer = (1054model_id: string,1055// buffer_path is the string[] *or* the JSON of that.1056buffer_path: string | string[],1057): Buffer | undefined => {1058if (typeof buffer_path != "string") {1059buffer_path = JSON.stringify(buffer_path);1060}1061return this._actions?.syncdb.ipywidgets_state?.getBuffer(1062model_id,1063buffer_path,1064);1065};10661067send_comm_message_to_kernel = ({1068msg_id,1069comm_id,1070target_name,1071data,1072buffers64,1073buffers,1074}: {1075msg_id: string;1076comm_id: string;1077target_name: string;1078data: any;1079buffers64?: string[];1080buffers?: Buffer[];1081}): void => {1082if (this.sockets == null) {1083throw Error("sockets not initialized");1084}1085const dbg = this.dbg("send_comm_message_to_kernel");1086// this is HUGE1087// dbg({ msg_id, comm_id, target_name, data, buffers64 });1088if (buffers64 != null && buffers64.length > 0) {1089buffers = buffers64?.map((x) => Buffer.from(base64ToBuffer(x))) ?? [];1090dbg(1091"buffers lengths = ",1092buffers.map((x) => x.byteLength),1093);1094if (this._actions?.syncdb.ipywidgets_state != null) {1095this._actions.syncdb.ipywidgets_state.setModelBuffers(1096comm_id,1097data.buffer_paths,1098buffers,1099false,1100);1101}1102}11031104const message = {1105parent_header: {},1106metadata: {},1107channel: "shell",1108content: { comm_id, target_name, data },1109header: {1110msg_id,1111username: "user",1112session: "",1113msg_type: "comm_msg" as MessageType,1114version: VERSION,1115date: new Date().toISOString(),1116},1117buffers,1118};11191120// HUGE1121// dbg(message);1122// "The Kernel listens for these messages on the Shell channel,1123// and the Frontend listens for them on the IOPub channel." -- docs1124this.sockets.send(message);1125};11261127chdir = async (path: string): Promise<void> => {1128if (!this.name) return; // no kernel, no current directory1129const dbg = this.dbg("chdir");1130dbg({ path });1131let lang;1132try {1133// using probably cached data, so likely very fast1134lang = await getLanguage(this.name);1135} catch (err) {1136dbg("WARNING ", err);1137const info = await this.kernel_info();1138lang = info.language_info?.name ?? "";1139}11401141const absPath = getAbsolutePathFromHome(path);1142const code = createChdirCommand(lang, absPath);1143// code = '' if no command needed, e.g., for sparql.1144if (code) {1145await this.execute_code_now({ code });1146}1147};1148}11491150export function get_kernel_by_pid(pid: number): JupyterKernel | undefined {1151for (const kernel of Object.values(kernels.kernels)) {1152if (kernel.get_spawned_kernel()?.spawn.pid === pid) {1153return kernel;1154}1155}1156return;1157}115811591160