Path: blob/master/src/packages/jupyter/execute/execute-code.ts
1447 views
/*1* This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.2* License: MS-RSL – see LICENSE.md for details3*/45/*6Send code to a kernel to be evaluated, then wait for7the results and gather them together.8*/910import { callback, delay } from "awaiting";11import { EventEmitter } from "events";12import { VERSION } from "@cocalc/jupyter/kernel/version";13import type { JupyterKernelInterface as JupyterKernel } from "@cocalc/jupyter/types/project-interface";14import type { MessageType } from "@cocalc/jupyter/zmq/types";15import { copy_with, deep_copy, uuid } from "@cocalc/util/misc";16import type {17CodeExecutionEmitterInterface,18OutputMessage,19ExecOpts,20StdinFunction,21} from "@cocalc/jupyter/types/project-interface";22import { getLogger } from "@cocalc/backend/logger";23import { EventIterator } from "@cocalc/util/event-iterator";24import { once } from "@cocalc/util/async-utils";25import { reuseInFlight } from "@cocalc/util/reuse-in-flight";26import type { Message } from "@cocalc/jupyter/zmq/message";2728const log = getLogger("jupyter:execute-code");2930type State = "init" | "running" | "done" | "closed";3132export class CodeExecutionEmitter33extends EventEmitter34implements CodeExecutionEmitterInterface35{36readonly kernel: JupyterKernel;37readonly code: string;38readonly id?: string;39readonly stdin?: StdinFunction;40readonly halt_on_error: boolean;41// DO NOT set iopub_done or shell_done directly; instead42// set them using the function set_shell_done and set_iopub_done.43// This ensures that we call _finish when both vars have been set.44private iopub_done: boolean = false;45private shell_done: boolean = false;46private state: State = "init";47private _message: any;48private _go_cb: Function | undefined = undefined;49private timeout_ms?: number;50private timer?: any;51private killing: string = "";52private _iter?: EventIterator<OutputMessage>;5354constructor(kernel: JupyterKernel, opts: ExecOpts) {55super();56this.kernel = kernel;57this.code = opts.code;58this.id = opts.id;59this.stdin = opts.stdin;60this.halt_on_error = !!opts.halt_on_error;61this.timeout_ms = opts.timeout_ms;62this._message = {63parent_header: {},64metadata: {},65channel: "shell",66header: {67msg_id: `execute_${uuid()}`,68username: "",69session: "",70msg_type: "execute_request" as MessageType,71version: VERSION,72date: new Date().toISOString(),73},74content: {75code: this.code,76silent: false,77store_history: true, // so execution_count is updated.78user_expressions: {},79allow_stdin: this.stdin != null,80},81};82}8384// async interface:85iter = (): EventIterator<OutputMessage> => {86if (this.state == "closed") {87throw Error("closed");88}89if (this._iter == null) {90this._iter = new EventIterator<OutputMessage>(this, "output", {91map: (args) => {92if (args[0]?.done) {93setTimeout(() => this._iter?.close(), 1);94}95return args[0];96},97});98}99return this._iter;100};101102waitUntilDone = reuseInFlight(async () => {103try {104await once(this, "done");105} catch {106// it throws on close, but that's also "done".107}108});109110private setState = (state: State) => {111this.state = state;112this.emit(state);113};114115// Emits a valid result, which is116// https://jupyter-client.readthedocs.io/en/stable/messaging.html#python-api117// Or an array of those when this.all is true118emit_output = (output: OutputMessage): void => {119this.emit("output", output);120if (output["done"]) {121this.setState("done");122}123};124125// Call this to inform anybody listening that we've canceled126// this execution, and will NOT be doing it ever, and it127// was explicitly canceled.128cancel = (): void => {129this.emit("canceled");130this.setState("done");131this._iter?.close();132};133134close = (): void => {135if (this.state == "closed") {136return;137}138this.setState("closed");139if (this.timer != null) {140clearTimeout(this.timer);141delete this.timer;142}143this._iter?.close();144delete this._iter;145// @ts-ignore146delete this._go_cb;147this.emit("closed");148this.removeAllListeners();149};150151throw_error = (err): void => {152this.emit("error", err);153this.close();154};155156private handleStdin = async (mesg: Message): Promise<void> => {157if (!this.stdin) {158throw Error("BUG -- stdin handling not supported");159}160log.silly("handleStdin: STDIN kernel --> server: ", mesg);161if (mesg.parent_header.msg_id !== this._message.header.msg_id) {162log.warn(163"handleStdin: STDIN msg_id mismatch:",164mesg.parent_header.msg_id,165this._message.header.msg_id,166);167return;168}169170let response;171try {172response = await this.stdin(173mesg.content.prompt ? mesg.content.prompt : "",174!!mesg.content.password,175);176} catch (err) {177response = `ERROR -- ${err}`;178}179log.silly("handleStdin: STDIN client --> server", response);180const m = {181channel: "stdin",182parent_header: this._message.header,183metadata: {},184header: {185msg_id: uuid(), // this._message.header.msg_id186username: "",187session: "",188msg_type: "input_reply" as MessageType,189version: VERSION,190date: new Date().toISOString(),191},192content: {193value: response,194},195};196log.silly("handleStdin: STDIN server --> kernel:", m);197this.kernel.sockets?.send(m);198};199200private handleShell = (mesg: Message): void => {201if (mesg.parent_header.msg_id !== this._message.header.msg_id) {202log.silly(203`handleShell: msg_id mismatch: ${mesg.parent_header.msg_id} != ${this._message.header.msg_id}`,204);205return;206}207log.silly("handleShell: got SHELL message -- ", mesg);208209if (mesg.content?.status == "ok") {210this._push_mesg(mesg);211this.set_shell_done(true);212} else {213log.warn(`handleShell: status != ok: ${mesg.content?.status}`);214// NOTE: I'm adding support for "abort" status, since I was just reading215// the kernel docs and it exists but is deprecated. Some old kernels216// might use it and we should thus properly support it:217// https://jupyter-client.readthedocs.io/en/stable/messaging.html#request-reply218//219// 2023-05-11: this was conditional on mesg.content?.status == "error" or == "abort"220// but in reality, there was also "aborted". Hence this as an catch-all else.221if (this.halt_on_error) {222this.kernel.clear_execute_code_queue();223}224this.set_shell_done(true);225}226};227228private set_shell_done = (value: boolean): void => {229this.shell_done = value;230if (this.iopub_done && this.shell_done) {231this._finish();232}233};234235private set_iopub_done = (value: boolean): void => {236this.iopub_done = value;237if (this.iopub_done && this.shell_done) {238this._finish();239}240};241242handleIOPub = (mesg: Message): void => {243if (mesg.parent_header.msg_id !== this._message.header.msg_id) {244// iopub message for a different execute request so ignore it.245return;246}247// these can be huge -- do not uncomment except for low level debugging!248// log.silly("handleIOPub: got IOPUB message -- ", mesg);249250if (mesg.content?.comm_id != null) {251// A comm message that is a result of execution of this code.252// IGNORE here -- all comm messages are handles at a higher253// level in jupyter.ts. Also, this case should never happen, since254// we do not emit an event from jupyter.ts in this case anyways.255} else {256// A normal output message.257this._push_mesg(mesg);258}259260this.set_iopub_done(261!!this.killing || mesg.content?.execution_state == "idle",262);263};264265// Called if the kernel is closed for some reason, e.g., crashing.266private handleClosed = (): void => {267log.debug("CodeExecutionEmitter.handleClosed: kernel closed");268this.killing = "kernel crashed";269this._finish();270};271272private _finish = (): void => {273if (this.state == "closed") {274return;275}276this.kernel.removeListener("iopub", this.handleIOPub);277if (this.stdin != null) {278this.kernel.removeListener("stdin", this.handleStdin);279}280this.kernel.removeListener("shell", this.handleShell);281if (this.kernel._execute_code_queue != null) {282this.kernel._execute_code_queue.shift(); // finished283this.kernel._process_execute_code_queue(); // start next exec284}285this.kernel.removeListener("closed", this.handleClosed);286this.kernel.removeListener("failed", this.handleClosed);287this._push_mesg({ done: true });288this.close();289290// Finally call the callback that was setup in this._go.291// This is what makes it possible to await on the entire292// execution. Also it is important to explicitly293// signal an error if we had to kill execution due294// to hitting a timeout, since the kernel may or may295// not have randomly done so itself in output.296this._go_cb?.(this.killing);297this._go_cb = undefined;298};299300_push_mesg = (mesg): void => {301// TODO: mesg isn't a normal javascript object;302// it's **silently** immutable, which303// is pretty annoying for our use. For now, we304// just copy it, which is a waste.305const header = mesg.header;306mesg = copy_with(mesg, ["metadata", "content", "buffers", "done"]);307mesg = deep_copy(mesg);308if (header !== undefined) {309mesg.msg_type = header.msg_type;310}311this.emit_output(mesg);312};313314go = async (): Promise<void> => {315await callback(this._go);316};317318private _go = (cb: Function): void => {319if (this.state != "init") {320cb("may only run once");321return;322}323this.state = "running";324log.silly("_execute_code", this.code);325const kernelState = this.kernel.get_state();326if (kernelState == "closed" || kernelState == "failed") {327log.silly("_execute_code", "kernel.get_state() is ", kernelState);328this.killing = kernelState;329this._finish();330cb(kernelState);331return;332}333334this._go_cb = cb; // this._finish will call this.335336if (this.stdin != null) {337this.kernel.on("stdin", this.handleStdin);338}339this.kernel.on("shell", this.handleShell);340this.kernel.on("iopub", this.handleIOPub);341342this.kernel.once("closed", this.handleClosed);343this.kernel.once("failed", this.handleClosed);344345if (this.timeout_ms) {346// setup a timeout at which point things will get killed if they don't finish347this.timer = setTimeout(this.timeout, this.timeout_ms);348}349350log.debug("_execute_code: send the message to get things rolling");351if (this.kernel.sockets == null) {352throw Error("bug -- sockets must be defined");353}354this.kernel.sockets.send(this._message);355};356357private timeout = async (): Promise<void> => {358if (this.state == "closed") {359log.debug(360"CodeExecutionEmitter.timeout: already finished, so nothing to worry about",361);362return;363}364this.killing =365"Timeout Error: execution time limit = " +366Math.round((this.timeout_ms ?? 0) / 1000) +367" seconds";368let tries = 3;369let d = 1000;370while (this.state != ("closed" as State) && tries > 0) {371log.debug(372"CodeExecutionEmitter.timeout: code still running, so try to interrupt it",373);374// Code still running but timeout reached.375// Keep sending interrupt signal, which will hopefully do something to376// stop running code (there is no guarantee, of course). We377// try a few times...378this.kernel.signal("SIGINT");379await delay(d);380d *= 1.3;381tries -= 1;382}383if (this.state != ("closed" as State)) {384log.debug(385"CodeExecutionEmitter.timeout: now try SIGKILL, which should kill things for sure.",386);387this.kernel.signal("SIGKILL");388this._finish();389}390};391}392393394