Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/sync/editor/generic/evaluator.ts
1450 views
1
/*
2
* This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.
3
* License: MS-RSL – see LICENSE.md for details
4
*/
5
6
//##############################################################################
7
//
8
// CoCalc: Collaborative Calculation
9
// Copyright (C) 2016, Sagemath Inc., MS-RSL.
10
//
11
//##############################################################################
12
13
/*
14
Evaluation of code with streaming output built on both the clients and
15
server (local hub) using a sync_table. This evaluator is associated
16
to a syncdoc editing session, and provides code evaluation that
17
may be used to enhance the experience of document editing.
18
*/
19
20
const stringify = require("json-stable-stringify");
21
22
import { SyncTable } from "@cocalc/sync/table/synctable";
23
import { to_key } from "@cocalc/sync/table/util";
24
import {
25
close,
26
copy_with,
27
copy_without,
28
from_json,
29
to_json,
30
} from "@cocalc/util/misc";
31
import { FLAGS, MARKERS, sagews } from "@cocalc/util/sagews";
32
import { ISageSession, SageCallOpts } from "@cocalc/util/types/sage";
33
import { SyncDoc } from "./sync-doc";
34
import { Client } from "./types";
35
36
type State = "init" | "ready" | "closed";
37
38
// What's supported so far.
39
type Program = "sage" | "bash";
40
41
// Object whose meaning depends on the program
42
type Input = any;
43
44
export class Evaluator {
45
private syncdoc: SyncDoc;
46
private client: Client;
47
private inputs_table: SyncTable;
48
private outputs_table: SyncTable;
49
private sage_session: ISageSession;
50
private state: State = "init";
51
private table_options: any[] = [];
52
private create_synctable: Function;
53
54
private last_call_time: Date = new Date(0);
55
56
constructor(syncdoc: SyncDoc, client: Client, create_synctable: Function) {
57
this.syncdoc = syncdoc;
58
this.client = client;
59
this.create_synctable = create_synctable;
60
this.table_options = [{ ephemeral: true, persistent: true }];
61
}
62
63
public async init(): Promise<void> {
64
// Initialize the inputs and outputs tables in parallel:
65
const i = this.init_eval_inputs();
66
const o = this.init_eval_outputs();
67
await Promise.all([i, o]);
68
69
if (this.client.is_project()) {
70
await this.init_project_evaluator();
71
}
72
this.set_state("ready");
73
}
74
75
public async close(): Promise<void> {
76
if (this.inputs_table != null) {
77
await this.inputs_table.close();
78
}
79
if (this.outputs_table != null) {
80
await this.outputs_table.close();
81
}
82
if (this.sage_session != null) {
83
this.sage_session.close();
84
}
85
close(this);
86
this.set_state("closed");
87
}
88
89
private dbg(_f): Function {
90
if (this.client.is_project()) {
91
return this.client.dbg(`Evaluator.${_f}`);
92
} else {
93
return (..._) => {};
94
}
95
}
96
97
private async init_eval_inputs(): Promise<void> {
98
const query = {
99
eval_inputs: [
100
{
101
string_id: this.syncdoc.get_string_id(),
102
input: null,
103
time: null,
104
user_id: null,
105
},
106
],
107
};
108
this.inputs_table = await this.create_synctable(
109
query,
110
this.table_options,
111
0,
112
);
113
}
114
115
private async init_eval_outputs(): Promise<void> {
116
const query = {
117
eval_outputs: [
118
{
119
string_id: this.syncdoc.get_string_id(),
120
output: null,
121
time: null,
122
number: null,
123
},
124
],
125
};
126
this.outputs_table = await this.create_synctable(
127
query,
128
this.table_options,
129
0,
130
);
131
this.outputs_table.setMaxListeners(200); // in case of many evaluations at once.
132
}
133
134
private set_state(state: State): void {
135
this.state = state;
136
}
137
138
private assert_not_closed(): void {
139
if (this.state === "closed") {
140
throw Error("closed -- sync evaluator");
141
}
142
}
143
144
private assert_is_project(): void {
145
if (!this.client.is_project()) {
146
throw Error("BUG -- this code should only run in the project.");
147
}
148
}
149
150
private assert_is_browser(): void {
151
if (this.client.is_project()) {
152
throw Error("BUG -- this code should only run in the web browser.");
153
}
154
}
155
156
// If given, cb below is called repeatedly with results as they appear.
157
public call(opts: { program: Program; input: Input; cb?: Function }): void {
158
this.assert_not_closed();
159
this.assert_is_browser();
160
const dbg = this.dbg("call");
161
dbg(opts.program, opts.input, opts.cb != undefined);
162
163
let time = this.client.server_time();
164
// Perturb time if it is <= last time when this client did an evaluation.
165
// We do this so that the time below is different than anything else.
166
if (time <= this.last_call_time) {
167
// slightly later
168
time = new Date(this.last_call_time.valueOf() + 1);
169
}
170
// make time be congruent to our uid
171
this.last_call_time = time;
172
173
const user_id: number = this.syncdoc.get_my_user_id();
174
const obj = {
175
string_id: this.syncdoc.get_string_id(),
176
time,
177
user_id,
178
input: copy_without(opts, "cb"),
179
};
180
dbg(JSON.stringify(obj));
181
this.inputs_table.set(obj);
182
// root cause of https://github.com/sagemathinc/cocalc/issues/1589
183
this.inputs_table.save();
184
185
if (opts.cb == null) {
186
// Fire and forget -- no need to listen for responses.
187
dbg("no cb defined, so fire and forget");
188
return;
189
}
190
191
// Listen for output until we receive a message with mesg.done true.
192
const messages = {};
193
194
// output may appear in random order, so we use mesg_number
195
// to sort it out.
196
let mesg_number = 0;
197
198
const send = (mesg) => {
199
dbg("send", mesg);
200
if (mesg.done) {
201
this.outputs_table.removeListener("change", handle_output);
202
}
203
if (opts.cb != null) {
204
opts.cb(mesg);
205
}
206
};
207
208
const handle_output = (keys: string[]) => {
209
// console.log("handle_output #{to_json(keys)}")
210
dbg("handle_output", keys);
211
this.assert_not_closed();
212
for (const key of keys) {
213
const t = from_json(key);
214
if (t[1].valueOf() != time.valueOf()) {
215
dbg("not our eval", t[1].valueOf(), time.valueOf());
216
continue;
217
}
218
const x = this.outputs_table.get(key);
219
if (x == null) {
220
dbg("x is null");
221
continue;
222
}
223
const y = x.get("output");
224
if (y == null) {
225
dbg("y is null");
226
continue;
227
}
228
dbg("y = ", JSON.stringify(y.toJS()));
229
const mesg = y.toJS();
230
if (mesg == null) {
231
dbg("probably never happens, but makes typescript happy.");
232
continue;
233
}
234
// OK, we called opts.cb on output mesg with the given timestamp and user_id...
235
delete mesg.id; // waste of space
236
237
// Messages may arrive in somewhat random order. This *DOES HAPPEN*,
238
// since changes are output from the project by computing a diff of
239
// a synctable, and then an array of objects sent out... and
240
// the order in that diff is random.
241
// E.g. this in a Sage worksheet would break:
242
// for i in range(20): print i; sys.stdout.flush()
243
if (t[2] !== mesg_number) {
244
// Not the next message, so put message in the
245
// set of messages that arrived too early.
246
dbg("put message in holding", t[2], mesg_number);
247
messages[t[2]] = mesg;
248
continue;
249
}
250
251
// Finally, the right message to handle next.
252
// Inform caller of result
253
send(mesg);
254
mesg_number += 1;
255
256
// Then, push out any messages that arrived earlier
257
// that are ready to send.
258
while (messages[mesg_number] != null) {
259
send(messages[mesg_number]);
260
delete messages[mesg_number];
261
mesg_number += 1;
262
}
263
}
264
};
265
266
this.outputs_table.on("change", handle_output);
267
}
268
269
private execute_sage_code_hook(output_uuid: string): Function {
270
this.assert_is_project();
271
const dbg = this.dbg(`execute_sage_code_hook('${output_uuid}')`);
272
dbg();
273
this.assert_not_closed();
274
275
// We track the output_line from within this project, and compare
276
// to what is set in the document (by the user). If they go out
277
// of sync for a while, we fill in the result.
278
// TODO: since it's now possible to know whether or not users are
279
// connected... maybe we could use that instead?
280
let output_line = MARKERS.output;
281
282
const hook = (mesg) => {
283
dbg(`processing mesg '${to_json(mesg)}'`);
284
let content = this.syncdoc.to_str();
285
let i = content.indexOf(MARKERS.output + output_uuid);
286
if (i === -1) {
287
// no cell anymore, so do nothing further right now.
288
return;
289
}
290
i += 37;
291
const n = content.indexOf("\n", i);
292
if (n === -1) {
293
// corrupted? -- don't try further right now.
294
return;
295
}
296
// This is what the frontend also does:
297
output_line +=
298
stringify(copy_without(mesg, ["id", "event"])) + MARKERS.output;
299
300
if (output_line.length - 1 <= n - i) {
301
// Things are looking fine (at least, the line is longer enough).
302
// TODO: try instead comparing actual content, not just length?
303
// Or maybe don't... since this stupid code will all get deleted anyways
304
// when we rewrite sagews handling.
305
return;
306
}
307
308
dbg("browser client didn't maintain sync promptly. fixing");
309
dbg(
310
`sage_execute_code: i=${i}, n=${n}, output_line.length=${output_line.length}`,
311
);
312
dbg(`output_line='${output_line}', sync_line='${content.slice(i, n)}'`);
313
const x = content.slice(0, i);
314
content = x + output_line + content.slice(n);
315
if (mesg.done) {
316
let j = x.lastIndexOf(MARKERS.cell);
317
if (j !== -1) {
318
j = x.lastIndexOf("\n", j);
319
const cell_id = x.slice(j + 2, j + 38);
320
//dbg("removing a cell flag: before='#{content}', cell_id='#{cell_id}'")
321
const S = sagews(content);
322
S.remove_cell_flag(cell_id, FLAGS.running);
323
S.set_cell_flag(cell_id, FLAGS.this_session);
324
content = S.content;
325
}
326
}
327
//dbg("removing a cell flag: after='#{content}'")
328
this.syncdoc.from_str(content);
329
this.syncdoc.commit();
330
};
331
332
return (mesg) => {
333
setTimeout(() => hook(mesg), 5000);
334
};
335
}
336
337
private handle_input_change(key: string): void {
338
this.assert_not_closed();
339
this.assert_is_project();
340
341
const dbg = this.dbg("handle_input_change");
342
dbg(`change: ${key}`);
343
344
const t = from_json(key);
345
let number, string_id, time;
346
const id = ([string_id, time, number] = [t[0], t[1], 0]);
347
if (this.outputs_table.get(to_key(id)) != null) {
348
dbg("already being handled");
349
return;
350
}
351
dbg(`no outputs yet with key ${to_json(id)}`);
352
const r = this.inputs_table.get(key);
353
if (r == null) {
354
dbg("deleted old input");
355
// This happens when deleting from input table (if that is
356
// ever supported, e.g., for maybe trimming old evals...).
357
// Nothing we need to do here.
358
return;
359
}
360
const input = r.get("input");
361
if (input == null) {
362
throw Error("input must be specified");
363
return;
364
}
365
const x = input.toJS();
366
dbg("x = ", x);
367
if (x == null) {
368
throw Error("BUG: can't happen");
369
return;
370
}
371
if (x.program == null || x.input == null) {
372
this.outputs_table.set({
373
string_id,
374
time,
375
number,
376
output: {
377
error: "must specify both program and input",
378
done: true,
379
},
380
});
381
this.outputs_table.save();
382
return;
383
}
384
385
let f;
386
switch (x.program) {
387
case "sage":
388
f = this.evaluate_using_sage;
389
break;
390
case "shell":
391
f = this.evaluate_using_shell;
392
break;
393
default:
394
this.outputs_table.set({
395
string_id,
396
time,
397
number,
398
output: {
399
error: `no program '${x.program}'`,
400
done: true,
401
},
402
});
403
this.outputs_table.save();
404
return;
405
}
406
f = f.bind(this);
407
408
let hook: Function;
409
if (
410
x.program === "sage" &&
411
x.input.event === "execute_code" &&
412
x.input.output_uuid != null
413
) {
414
hook = this.execute_sage_code_hook(x.input.output_uuid);
415
} else {
416
// no op
417
hook = (_) => {};
418
}
419
420
f(x.input, (output) => {
421
if (this.state == "closed") {
422
return;
423
}
424
425
dbg(`got output='${to_json(output)}'; id=${to_json(id)}`);
426
hook(output);
427
this.outputs_table.set({ string_id, time, number, output });
428
this.outputs_table.save();
429
number += 1;
430
});
431
}
432
433
// Runs only in the project
434
private async init_project_evaluator(): Promise<void> {
435
this.assert_is_project();
436
437
const dbg = this.dbg("init_project_evaluator");
438
dbg("init");
439
this.inputs_table.on("change", async (keys) => {
440
for (const key of keys) {
441
await this.handle_input_change(key);
442
}
443
});
444
/* CRITICAL: it's very important to handle all the inputs
445
that may have happened just moments before
446
this object got created. Why? The first input is
447
the user trying to frickin' evaluate a cell
448
in their worksheet to start things running... and they
449
might somehow do that moments before the worksheet
450
gets opened on the backend; if we don't do the
451
following, then often this eval is missed, and
452
confusion and frustration ensues. */
453
const v = this.inputs_table.get();
454
if (v != null) {
455
dbg(`handle ${v.size} pending evaluations`);
456
for (const key of v.keys()) {
457
if (key != null) {
458
await this.handle_input_change(key);
459
}
460
}
461
}
462
}
463
464
private ensure_sage_session_exists(): void {
465
if (this.sage_session != null) return;
466
this.dbg("ensure_sage_session_exists")();
467
// This code only runs in the project, where client
468
// has a sage_session method.
469
this.sage_session = this.client.sage_session({
470
path: this.syncdoc.get_path(),
471
});
472
}
473
474
// Runs only in the project
475
private async evaluate_using_sage(
476
input: SageCallOpts["input"],
477
cb: SageCallOpts["cb"],
478
): Promise<void> {
479
this.assert_is_project();
480
const dbg = this.dbg("evaluate_using_sage");
481
dbg();
482
483
// TODO: input also may have -- uuid, output_uuid, timeout
484
if (input.event === "execute_code") {
485
input = copy_with(input, ["code", "data", "preparse", "event", "id"]);
486
dbg(
487
"ensure sage session is running, so we can actually execute the code",
488
);
489
}
490
try {
491
this.ensure_sage_session_exists();
492
if (input.event === "execute_code") {
493
// We only need to actually create the socket, which makes a running process,
494
// if we are going to execute code. The other events, e.g., 'status' don't
495
// need a running sage session.
496
if (!this.sage_session.is_running()) {
497
dbg("sage session is not running, so init socket");
498
await this.sage_session.init_socket();
499
}
500
}
501
} catch (error) {
502
cb({ error, done: true });
503
return;
504
}
505
dbg("send call to backend sage session manager", to_json(input));
506
await this.sage_session.call({ input, cb });
507
}
508
509
// Runs only in the project
510
private evaluate_using_shell(input: Input, cb: Function): void {
511
this.assert_is_project();
512
const dbg = this.dbg("evaluate_using_shell");
513
dbg();
514
515
input.cb = (err, output) => {
516
if (output == null) {
517
output = {};
518
}
519
if (err) {
520
output.error = err;
521
}
522
output.done = true;
523
cb(output);
524
};
525
this.client.shell(input);
526
}
527
}
528
529