Path: blob/master/src/packages/sync/editor/generic/evaluator.ts
1450 views
/*1* This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.2* License: MS-RSL – see LICENSE.md for details3*/45//##############################################################################6//7// CoCalc: Collaborative Calculation8// Copyright (C) 2016, Sagemath Inc., MS-RSL.9//10//##############################################################################1112/*13Evaluation of code with streaming output built on both the clients and14server (local hub) using a sync_table. This evaluator is associated15to a syncdoc editing session, and provides code evaluation that16may be used to enhance the experience of document editing.17*/1819const stringify = require("json-stable-stringify");2021import { SyncTable } from "@cocalc/sync/table/synctable";22import { to_key } from "@cocalc/sync/table/util";23import {24close,25copy_with,26copy_without,27from_json,28to_json,29} from "@cocalc/util/misc";30import { FLAGS, MARKERS, sagews } from "@cocalc/util/sagews";31import { ISageSession, SageCallOpts } from "@cocalc/util/types/sage";32import { SyncDoc } from "./sync-doc";33import { Client } from "./types";3435type State = "init" | "ready" | "closed";3637// What's supported so far.38type Program = "sage" | "bash";3940// Object whose meaning depends on the program41type Input = any;4243export class Evaluator {44private syncdoc: SyncDoc;45private client: Client;46private inputs_table: SyncTable;47private outputs_table: SyncTable;48private sage_session: ISageSession;49private state: State = "init";50private table_options: any[] = [];51private create_synctable: Function;5253private last_call_time: Date = new Date(0);5455constructor(syncdoc: SyncDoc, client: Client, create_synctable: Function) {56this.syncdoc = syncdoc;57this.client = client;58this.create_synctable = create_synctable;59this.table_options = [{ ephemeral: true, persistent: true }];60}6162public async init(): Promise<void> {63// Initialize the inputs and outputs tables in parallel:64const i = this.init_eval_inputs();65const o = this.init_eval_outputs();66await Promise.all([i, o]);6768if (this.client.is_project()) {69await this.init_project_evaluator();70}71this.set_state("ready");72}7374public async close(): Promise<void> {75if (this.inputs_table != null) {76await this.inputs_table.close();77}78if (this.outputs_table != null) {79await this.outputs_table.close();80}81if (this.sage_session != null) {82this.sage_session.close();83}84close(this);85this.set_state("closed");86}8788private dbg(_f): Function {89if (this.client.is_project()) {90return this.client.dbg(`Evaluator.${_f}`);91} else {92return (..._) => {};93}94}9596private async init_eval_inputs(): Promise<void> {97const query = {98eval_inputs: [99{100string_id: this.syncdoc.get_string_id(),101input: null,102time: null,103user_id: null,104},105],106};107this.inputs_table = await this.create_synctable(108query,109this.table_options,1100,111);112}113114private async init_eval_outputs(): Promise<void> {115const query = {116eval_outputs: [117{118string_id: this.syncdoc.get_string_id(),119output: null,120time: null,121number: null,122},123],124};125this.outputs_table = await this.create_synctable(126query,127this.table_options,1280,129);130this.outputs_table.setMaxListeners(200); // in case of many evaluations at once.131}132133private set_state(state: State): void {134this.state = state;135}136137private assert_not_closed(): void {138if (this.state === "closed") {139throw Error("closed -- sync evaluator");140}141}142143private assert_is_project(): void {144if (!this.client.is_project()) {145throw Error("BUG -- this code should only run in the project.");146}147}148149private assert_is_browser(): void {150if (this.client.is_project()) {151throw Error("BUG -- this code should only run in the web browser.");152}153}154155// If given, cb below is called repeatedly with results as they appear.156public call(opts: { program: Program; input: Input; cb?: Function }): void {157this.assert_not_closed();158this.assert_is_browser();159const dbg = this.dbg("call");160dbg(opts.program, opts.input, opts.cb != undefined);161162let time = this.client.server_time();163// Perturb time if it is <= last time when this client did an evaluation.164// We do this so that the time below is different than anything else.165if (time <= this.last_call_time) {166// slightly later167time = new Date(this.last_call_time.valueOf() + 1);168}169// make time be congruent to our uid170this.last_call_time = time;171172const user_id: number = this.syncdoc.get_my_user_id();173const obj = {174string_id: this.syncdoc.get_string_id(),175time,176user_id,177input: copy_without(opts, "cb"),178};179dbg(JSON.stringify(obj));180this.inputs_table.set(obj);181// root cause of https://github.com/sagemathinc/cocalc/issues/1589182this.inputs_table.save();183184if (opts.cb == null) {185// Fire and forget -- no need to listen for responses.186dbg("no cb defined, so fire and forget");187return;188}189190// Listen for output until we receive a message with mesg.done true.191const messages = {};192193// output may appear in random order, so we use mesg_number194// to sort it out.195let mesg_number = 0;196197const send = (mesg) => {198dbg("send", mesg);199if (mesg.done) {200this.outputs_table.removeListener("change", handle_output);201}202if (opts.cb != null) {203opts.cb(mesg);204}205};206207const handle_output = (keys: string[]) => {208// console.log("handle_output #{to_json(keys)}")209dbg("handle_output", keys);210this.assert_not_closed();211for (const key of keys) {212const t = from_json(key);213if (t[1].valueOf() != time.valueOf()) {214dbg("not our eval", t[1].valueOf(), time.valueOf());215continue;216}217const x = this.outputs_table.get(key);218if (x == null) {219dbg("x is null");220continue;221}222const y = x.get("output");223if (y == null) {224dbg("y is null");225continue;226}227dbg("y = ", JSON.stringify(y.toJS()));228const mesg = y.toJS();229if (mesg == null) {230dbg("probably never happens, but makes typescript happy.");231continue;232}233// OK, we called opts.cb on output mesg with the given timestamp and user_id...234delete mesg.id; // waste of space235236// Messages may arrive in somewhat random order. This *DOES HAPPEN*,237// since changes are output from the project by computing a diff of238// a synctable, and then an array of objects sent out... and239// the order in that diff is random.240// E.g. this in a Sage worksheet would break:241// for i in range(20): print i; sys.stdout.flush()242if (t[2] !== mesg_number) {243// Not the next message, so put message in the244// set of messages that arrived too early.245dbg("put message in holding", t[2], mesg_number);246messages[t[2]] = mesg;247continue;248}249250// Finally, the right message to handle next.251// Inform caller of result252send(mesg);253mesg_number += 1;254255// Then, push out any messages that arrived earlier256// that are ready to send.257while (messages[mesg_number] != null) {258send(messages[mesg_number]);259delete messages[mesg_number];260mesg_number += 1;261}262}263};264265this.outputs_table.on("change", handle_output);266}267268private execute_sage_code_hook(output_uuid: string): Function {269this.assert_is_project();270const dbg = this.dbg(`execute_sage_code_hook('${output_uuid}')`);271dbg();272this.assert_not_closed();273274// We track the output_line from within this project, and compare275// to what is set in the document (by the user). If they go out276// of sync for a while, we fill in the result.277// TODO: since it's now possible to know whether or not users are278// connected... maybe we could use that instead?279let output_line = MARKERS.output;280281const hook = (mesg) => {282dbg(`processing mesg '${to_json(mesg)}'`);283let content = this.syncdoc.to_str();284let i = content.indexOf(MARKERS.output + output_uuid);285if (i === -1) {286// no cell anymore, so do nothing further right now.287return;288}289i += 37;290const n = content.indexOf("\n", i);291if (n === -1) {292// corrupted? -- don't try further right now.293return;294}295// This is what the frontend also does:296output_line +=297stringify(copy_without(mesg, ["id", "event"])) + MARKERS.output;298299if (output_line.length - 1 <= n - i) {300// Things are looking fine (at least, the line is longer enough).301// TODO: try instead comparing actual content, not just length?302// Or maybe don't... since this stupid code will all get deleted anyways303// when we rewrite sagews handling.304return;305}306307dbg("browser client didn't maintain sync promptly. fixing");308dbg(309`sage_execute_code: i=${i}, n=${n}, output_line.length=${output_line.length}`,310);311dbg(`output_line='${output_line}', sync_line='${content.slice(i, n)}'`);312const x = content.slice(0, i);313content = x + output_line + content.slice(n);314if (mesg.done) {315let j = x.lastIndexOf(MARKERS.cell);316if (j !== -1) {317j = x.lastIndexOf("\n", j);318const cell_id = x.slice(j + 2, j + 38);319//dbg("removing a cell flag: before='#{content}', cell_id='#{cell_id}'")320const S = sagews(content);321S.remove_cell_flag(cell_id, FLAGS.running);322S.set_cell_flag(cell_id, FLAGS.this_session);323content = S.content;324}325}326//dbg("removing a cell flag: after='#{content}'")327this.syncdoc.from_str(content);328this.syncdoc.commit();329};330331return (mesg) => {332setTimeout(() => hook(mesg), 5000);333};334}335336private handle_input_change(key: string): void {337this.assert_not_closed();338this.assert_is_project();339340const dbg = this.dbg("handle_input_change");341dbg(`change: ${key}`);342343const t = from_json(key);344let number, string_id, time;345const id = ([string_id, time, number] = [t[0], t[1], 0]);346if (this.outputs_table.get(to_key(id)) != null) {347dbg("already being handled");348return;349}350dbg(`no outputs yet with key ${to_json(id)}`);351const r = this.inputs_table.get(key);352if (r == null) {353dbg("deleted old input");354// This happens when deleting from input table (if that is355// ever supported, e.g., for maybe trimming old evals...).356// Nothing we need to do here.357return;358}359const input = r.get("input");360if (input == null) {361throw Error("input must be specified");362return;363}364const x = input.toJS();365dbg("x = ", x);366if (x == null) {367throw Error("BUG: can't happen");368return;369}370if (x.program == null || x.input == null) {371this.outputs_table.set({372string_id,373time,374number,375output: {376error: "must specify both program and input",377done: true,378},379});380this.outputs_table.save();381return;382}383384let f;385switch (x.program) {386case "sage":387f = this.evaluate_using_sage;388break;389case "shell":390f = this.evaluate_using_shell;391break;392default:393this.outputs_table.set({394string_id,395time,396number,397output: {398error: `no program '${x.program}'`,399done: true,400},401});402this.outputs_table.save();403return;404}405f = f.bind(this);406407let hook: Function;408if (409x.program === "sage" &&410x.input.event === "execute_code" &&411x.input.output_uuid != null412) {413hook = this.execute_sage_code_hook(x.input.output_uuid);414} else {415// no op416hook = (_) => {};417}418419f(x.input, (output) => {420if (this.state == "closed") {421return;422}423424dbg(`got output='${to_json(output)}'; id=${to_json(id)}`);425hook(output);426this.outputs_table.set({ string_id, time, number, output });427this.outputs_table.save();428number += 1;429});430}431432// Runs only in the project433private async init_project_evaluator(): Promise<void> {434this.assert_is_project();435436const dbg = this.dbg("init_project_evaluator");437dbg("init");438this.inputs_table.on("change", async (keys) => {439for (const key of keys) {440await this.handle_input_change(key);441}442});443/* CRITICAL: it's very important to handle all the inputs444that may have happened just moments before445this object got created. Why? The first input is446the user trying to frickin' evaluate a cell447in their worksheet to start things running... and they448might somehow do that moments before the worksheet449gets opened on the backend; if we don't do the450following, then often this eval is missed, and451confusion and frustration ensues. */452const v = this.inputs_table.get();453if (v != null) {454dbg(`handle ${v.size} pending evaluations`);455for (const key of v.keys()) {456if (key != null) {457await this.handle_input_change(key);458}459}460}461}462463private ensure_sage_session_exists(): void {464if (this.sage_session != null) return;465this.dbg("ensure_sage_session_exists")();466// This code only runs in the project, where client467// has a sage_session method.468this.sage_session = this.client.sage_session({469path: this.syncdoc.get_path(),470});471}472473// Runs only in the project474private async evaluate_using_sage(475input: SageCallOpts["input"],476cb: SageCallOpts["cb"],477): Promise<void> {478this.assert_is_project();479const dbg = this.dbg("evaluate_using_sage");480dbg();481482// TODO: input also may have -- uuid, output_uuid, timeout483if (input.event === "execute_code") {484input = copy_with(input, ["code", "data", "preparse", "event", "id"]);485dbg(486"ensure sage session is running, so we can actually execute the code",487);488}489try {490this.ensure_sage_session_exists();491if (input.event === "execute_code") {492// We only need to actually create the socket, which makes a running process,493// if we are going to execute code. The other events, e.g., 'status' don't494// need a running sage session.495if (!this.sage_session.is_running()) {496dbg("sage session is not running, so init socket");497await this.sage_session.init_socket();498}499}500} catch (error) {501cb({ error, done: true });502return;503}504dbg("send call to backend sage session manager", to_json(input));505await this.sage_session.call({ input, cb });506}507508// Runs only in the project509private evaluate_using_shell(input: Input, cb: Function): void {510this.assert_is_project();511const dbg = this.dbg("evaluate_using_shell");512dbg();513514input.cb = (err, output) => {515if (output == null) {516output = {};517}518if (err) {519output.error = err;520}521output.done = true;522cb(output);523};524this.client.shell(input);525}526}527528529