Path: blob/master/src/packages/jupyter/execute/output-handler.ts
1447 views
/*1* This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.2* License: MS-RSL – see LICENSE.md for details3*/45/*6Class that handles output messages generated for evaluation of code7for a particular cell.89WARNING: For efficiency reasons (involving syncdb patch sizes),10outputs is a map from the (string representations of) the numbers11from 0 to n-1, where there are n messages. So watch out.1213OutputHandler emits these events:1415- 'change' -- (save), called when we change cell; if save=true, recommend16broadcasting this change to other users ASAP.1718- 'done' -- emited once when finished; after this, everything is cleaned up1920- 'more_output' -- If we exceed the message limit, emit more_output21(mesg, mesg_length) with extra messages.2223- 'process' -- Gets called on any incoming message; it may24**mutate** the message, e.g., removing images uses this.2526*/2728import { callback } from "awaiting";29import { EventEmitter } from "events";30import {31close,32defaults,33required,34server_time,35len,36to_json,37is_object,38} from "@cocalc/util/misc";3940const now = () => server_time().valueOf() - 0;4142const MIN_SAVE_INTERVAL_MS = 500;43const MAX_SAVE_INTERVAL_MS = 45000;4445export class OutputHandler extends EventEmitter {46private _opts: any;47private _n: number;48private _clear_before_next_output: boolean;49private _output_length: number;50private _in_more_output_mode: any;51private _state: any;52private _stdin_cb: any;5354// Never commit output to send to the frontend more frequently than this.saveIntervalMs55// Otherwise, we'll end up with a large number of patches.56// We start out with MIN_SAVE_INTERVAL_MS and exponentially back it off to57// MAX_SAVE_INTERVAL_MS.58private lastSave: number = 0;59private saveIntervalMs = MIN_SAVE_INTERVAL_MS;6061constructor(opts: any) {62super();63this._opts = defaults(opts, {64cell: required, // object; the cell whose output (etc.) will get mutated65// If given, used to truncate, discard output messages; extra66// messages are saved and made available.67max_output_length: undefined,68max_output_messages: undefined,69report_started_ms: undefined, // If no messages for this many ms, then we update via set to indicate70// that cell is being run.71dbg: undefined,72});73const { cell } = this._opts;74cell.output = null;75cell.exec_count = null;76cell.state = "run";77cell.start = null;78cell.end = null;79// Internal state80this._n = 0;81this._clear_before_next_output = false;82this._output_length = 0;83this._in_more_output_mode = false;84this._state = "ready";85// Report that computation started if there is no output soon.86if (this._opts.report_started_ms != null) {87setTimeout(this._report_started, this._opts.report_started_ms);88}8990this.stdin = this.stdin.bind(this);91}9293close = (): void => {94if (this._state == "closed") return;95this._state = "closed";96this.emit("done");97this.removeAllListeners();98close(this, new Set(["_state", "close"]));99};100101_clear_output = (save?: any): void => {102if (this._state === "closed") {103return;104}105this._clear_before_next_output = false;106// clear output message -- we delete all the outputs107// reset the counter n, save, and are done.108// IMPORTANT: In Jupyter the clear_output message and everything109// before it is NOT saved in the notebook output itself110// (like in Sage worksheets).111this._opts.cell.output = null;112this._n = 0;113this._output_length = 0;114this.emit("change", save);115};116117_report_started = (): void => {118if (this._state == "closed" || this._n > 0) {119// do nothing -- already getting output or done.120return;121}122this.emit("change", true);123};124125// Call when computation starts126start = () => {127if (this._state === "closed") {128return;129}130this._opts.cell.start = (new Date() as any) - 0;131this._opts.cell.state = "busy";132this.emit("change", true);133};134135// Call error if an error occurs. An appropriate error message is generated.136// Computation is considered done.137error = (err: any): void => {138if (err === "closed") {139// See https://github.com/sagemathinc/cocalc/issues/2388140this.message({141data: {142"text/markdown":143"<font color='red'>**Jupyter Kernel terminated:**</font> This might be caused by running out of memory or hitting a bug in some library (e.g., forking too many processes, trying to access invalid memory, etc.). Consider restarting or upgrading your project or running the relevant code directly in a terminal to track down the cause, as [explained here](https://github.com/sagemathinc/cocalc/wiki/KernelTerminated).",144},145});146} else {147this.message({148text: `${err}`,149name: "stderr",150});151}152this.done();153};154155// Call done exactly once when done156done = (): void => {157if (this._state === "closed") {158return;159}160this._opts.cell.state = "done";161if (this._opts.cell.start == null) {162this._opts.cell.start = now();163}164this._opts.cell.end = now();165this.emit("change", true);166this.close();167};168169// Handle clear170clear = (wait: any): void => {171if (wait) {172// wait until next output before clearing.173this._clear_before_next_output = true;174return;175}176this._clear_output();177};178179_clean_mesg = (mesg: any): void => {180delete mesg.execution_state;181delete mesg.code;182delete mesg.status;183delete mesg.source;184for (const k in mesg) {185const v = mesg[k];186if (is_object(v) && len(v) === 0) {187delete mesg[k];188}189}190};191192private _push_mesg = (mesg: any, save?: boolean): void => {193if (this._state === "closed") {194return;195}196197if (save == null) {198const n = now();199if (n - this.lastSave > this.saveIntervalMs) {200save = true;201this.lastSave = n;202this.saveIntervalMs = Math.min(203MAX_SAVE_INTERVAL_MS,204this.saveIntervalMs * 1.1,205);206}207} else if (save == true) {208this.lastSave = now();209}210211if (this._opts.cell.output === null) {212this._opts.cell.output = {};213}214this._opts.cell.output[`${this._n}`] = mesg;215this._n += 1;216this.emit("change", save);217};218219set_input = (input: any, save = true): void => {220if (this._state === "closed") {221return;222}223this._opts.cell.input = input;224this.emit("change", save);225};226227// Process incoming messages. This may mutate mesg.228message = (mesg: any): void => {229let has_exec_count: any;230if (this._state === "closed") {231return;232}233234if (this._opts.cell.end) {235// ignore any messages once we're done.236return;237}238239// record execution_count, if there.240if (mesg.execution_count != null) {241has_exec_count = true;242this._opts.cell.exec_count = mesg.execution_count;243delete mesg.execution_count;244} else {245has_exec_count = false;246}247248// delete useless fields249this._clean_mesg(mesg);250251if (len(mesg) === 0) {252// don't even bother saving this message; nothing useful here.253return;254}255256if (has_exec_count) {257// message that has an execution count258mesg.exec_count = this._opts.cell.exec_count;259}260261// hook to process message (e.g., this may mutate mesg,262// e.g., to remove big images)263this.emit("process", mesg);264265if (this._clear_before_next_output) {266this._clear_output(false);267}268269const s = JSON.stringify(mesg);270const mesg_length = s.length;271272if (this._in_more_output_mode) {273this.emit("more_output", mesg, mesg_length);274return;275}276277// check if limits exceeded:278279this._output_length += mesg_length;280281const notTooLong =282this._opts.max_output_length == null ||283this._output_length <= this._opts.max_output_length;284const notTooMany =285this._opts.max_output_messages == null ||286this._n < this._opts.max_output_messages;287288if (notTooLong && notTooMany) {289// limits NOT exceeded290this._push_mesg(mesg);291return;292}293294// Switch to too much output mode:295this._push_mesg({ more_output: true });296this._in_more_output_mode = true;297this.emit("more_output", mesg, mesg_length);298};299300async stdin(prompt: string, password: boolean): Promise<string> {301// See docs for stdin option to execute_code in backend jupyter.coffee302this._push_mesg({ name: "input", opts: { prompt, password } });303// Now we wait until the output message we just included has its304// value set. Then we call cb with that value.305// This weird thing below sets this._stdin_cb, then306// waits for this._stdin_cb to be called, which happens307// when cell_changed gets called.308return await callback((cb) => (this._stdin_cb = cb));309}310311// Call this when the cell changes; only used for stdin right now.312cell_changed = (cell: any, get_password: any): void => {313if (this._state === "closed") {314return;315}316if (this._stdin_cb == null) {317return;318}319const output = cell != null ? cell.get("output") : undefined;320if (output == null) {321return;322}323const value = output.getIn([`${output.size - 1}`, "value"]);324if (value != null) {325let x = value;326if (this._opts.cell.output) {327const n = `${len(this._opts.cell.output) - 1}`;328if (329get_password != null &&330this._opts.cell.output[n] &&331this._opts.cell.output[n].opts != null &&332this._opts.cell.output[n].opts.password333) {334// In case of a password, the value is NEVER placed in the document.335// Instead the value is submitted to the backend via https, with336// a random identifier put in the value.337x = get_password(); // get actual password338}339if (this._opts.cell.output[`${n}`] != null) {340this._opts.cell.output[`${n}`].value = value;341} // sync output-handler view of output with syncdb342}343this._stdin_cb(undefined, x);344delete this._stdin_cb;345}346};347348payload = (payload: any): void => {349if (this._state === "closed") {350return;351}352if (payload.source === "set_next_input") {353this.set_input(payload.text);354} else if (payload.source === "page") {355// Just handle as a normal message; and we don't show in the pager,356// which doesn't make sense for multiple users.357// This happens when requesting help for r:358// https://github.com/sagemathinc/cocalc/issues/1933359this.message(payload);360} else {361// No idea what to do with this...362if (typeof this._opts.dbg === "function") {363this._opts.dbg(`Unknown PAYLOAD: ${to_json(payload)}`);364}365}366};367}368369370