Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/backend/execute-code.ts
1447 views
1
/*
2
* This file is part of CoCalc: Copyright © 2020–2024 Sagemath, Inc.
3
* License: MS-RSL – see LICENSE.md for details
4
*/
5
6
// Execute code in a subprocess.
7
8
import { callback, delay } from "awaiting";
9
import LRU from "lru-cache";
10
import {
11
ChildProcessWithoutNullStreams,
12
spawn,
13
SpawnOptionsWithoutStdio,
14
} from "node:child_process";
15
import { chmod, mkdtemp, rm, writeFile } from "node:fs/promises";
16
import { tmpdir } from "node:os";
17
import { join } from "node:path";
18
import { EventEmitter } from "node:stream";
19
import shellEscape from "shell-escape";
20
import getLogger from "@cocalc/backend/logger";
21
import { envToInt } from "@cocalc/backend/misc/env-to-number";
22
import { aggregate } from "@cocalc/util/aggregate";
23
import { callback_opts } from "@cocalc/util/async-utils";
24
import { PROJECT_EXEC_DEFAULT_TIMEOUT_S } from "@cocalc/util/consts/project";
25
import { to_json, trunc, uuid, walltime } from "@cocalc/util/misc";
26
import {
27
ExecuteCodeOutputAsync,
28
ExecuteCodeOutputBlocking,
29
isExecuteCodeOptionsAsyncGet,
30
type ExecuteCodeFunctionWithCallback,
31
type ExecuteCodeOptions,
32
type ExecuteCodeOptionsAsyncGet,
33
type ExecuteCodeOptionsWithCallback,
34
type ExecuteCodeOutput,
35
} from "@cocalc/util/types/execute-code";
36
import { Processes } from "@cocalc/util/types/project-info/types";
37
import { envForSpawn } from "./misc";
38
import { ProcessStats } from "./process-stats";
39
40
const log = getLogger("execute-code");
41
42
const PREFIX = "COCALC_PROJECT_ASYNC_EXEC";
43
const ASYNC_CACHE_MAX = envToInt(`${PREFIX}_CACHE_MAX`, 100);
44
const ASYNC_CACHE_TTL_S = envToInt(`${PREFIX}_TTL_S`, 60 * 60);
45
// for async execution, every that many secs check up on the child-tree
46
let MONITOR_INTERVAL_S = envToInt(`${PREFIX}_MONITOR_INTERVAL_S`, 60);
47
48
export function setMonitorIntervalSeconds(n) {
49
MONITOR_INTERVAL_S = n;
50
}
51
52
const MONITOR_STATS_LENGTH_MAX = envToInt(
53
`${PREFIX}_MONITOR_STATS_LENGTH_MAX`,
54
100,
55
);
56
57
log.debug("configuration:", {
58
ASYNC_CACHE_MAX,
59
ASYNC_CACHE_TTL_S,
60
MONITOR_INTERVAL_S,
61
MONITOR_STATS_LENGTH_MAX,
62
});
63
64
type AsyncAwait = "finished";
65
const updates = new EventEmitter();
66
const eventKey = (type: AsyncAwait, job_id: string): string =>
67
`${type}-${job_id}`;
68
69
const asyncCache = new LRU<string, ExecuteCodeOutputAsync>({
70
max: ASYNC_CACHE_MAX,
71
ttl: 1000 * ASYNC_CACHE_TTL_S,
72
updateAgeOnGet: true,
73
updateAgeOnHas: true,
74
});
75
76
function truncStats(obj?: ExecuteCodeOutputAsync) {
77
if (Array.isArray(obj?.stats)) {
78
// truncate to $MONITOR_STATS_LENGTH_MAX, by discarding the inital entries
79
obj.stats = obj.stats.slice(obj.stats.length - MONITOR_STATS_LENGTH_MAX);
80
}
81
}
82
83
function asyncCacheUpdate(job_id: string, upd): ExecuteCodeOutputAsync {
84
const obj = asyncCache.get(job_id);
85
if (Array.isArray(obj?.stats) && Array.isArray(upd.stats)) {
86
obj.stats.push(...upd.stats);
87
truncStats(obj);
88
}
89
const next: ExecuteCodeOutputAsync = { ...obj, ...upd };
90
asyncCache.set(job_id, next);
91
if (next.status !== "running") {
92
updates.emit(eventKey("finished", next.job_id), next);
93
}
94
return next;
95
}
96
97
// Async/await interface to executing code.
98
export async function executeCode(
99
opts: ExecuteCodeOptions | ExecuteCodeOptionsAsyncGet,
100
): Promise<ExecuteCodeOutput> {
101
return await callback_opts(execute_code)(opts);
102
}
103
104
// Callback interface to executing code.
105
// This will get deprecated and is only used by some old coffeescript code.
106
export const execute_code: ExecuteCodeFunctionWithCallback = aggregate(
107
(opts: ExecuteCodeOptionsWithCallback): void => {
108
(async () => {
109
try {
110
let data = await executeCodeNoAggregate(opts);
111
if (isExecuteCodeOptionsAsyncGet(opts) && data.type === "async") {
112
// stats could contain a lot of data. we only return it if requested.
113
if (opts.async_stats !== true) {
114
data = { ...data, stats: undefined };
115
}
116
}
117
opts.cb?.(undefined, data);
118
} catch (err) {
119
opts.cb?.(err);
120
}
121
})();
122
},
123
);
124
125
export async function cleanUpTempDir(tempDir: string | undefined) {
126
if (tempDir) {
127
try {
128
await rm(tempDir, { force: true, recursive: true });
129
} catch (err) {
130
console.log("WARNING: issue cleaning up tempDir", err);
131
}
132
}
133
}
134
135
// actual implementation, without the aggregate wrapper
136
async function executeCodeNoAggregate(
137
opts: ExecuteCodeOptions | ExecuteCodeOptionsAsyncGet,
138
): Promise<ExecuteCodeOutput> {
139
if (isExecuteCodeOptionsAsyncGet(opts)) {
140
const key = opts.async_get;
141
const cached = asyncCache.get(key);
142
if (cached != null) {
143
const { async_await } = opts;
144
if (cached.status === "running" && async_await === true) {
145
return new Promise((done) =>
146
updates.once(eventKey("finished", key), (data) => done(data)),
147
);
148
} else {
149
return cached;
150
}
151
} else {
152
throw new Error(`Async operation '${key}' does not exist.`);
153
}
154
}
155
156
opts.args ??= [];
157
opts.timeout ??= PROJECT_EXEC_DEFAULT_TIMEOUT_S;
158
opts.ulimit_timeout ??= true;
159
opts.err_on_exit ??= true;
160
opts.verbose ??= false;
161
162
if (opts.verbose) {
163
log.debug(`input: ${opts.command} ${opts.args?.join(" ")}`);
164
}
165
const s = opts.command.split(/\s+/g); // split on whitespace
166
if (opts.args?.length === 0 && s.length > 1) {
167
opts.bash = true;
168
} else if (opts.bash && opts.args?.length > 0) {
169
// Selected bash, but still passed in args.
170
opts.command = shellEscape([opts.command].concat(opts.args));
171
opts.args = [];
172
}
173
174
if (opts.home == null) {
175
opts.home = process.env.HOME;
176
}
177
178
if (opts.path == null) {
179
opts.path = opts.home;
180
} else if (opts.path[0] !== "/") {
181
opts.path = opts.home + "/" + opts.path;
182
}
183
if (opts.cwd) {
184
opts.path = opts.cwd;
185
}
186
187
let tempDir: string | undefined = undefined;
188
189
try {
190
let origCommand = "";
191
if (opts.bash) {
192
// using bash, which (for better or worse), we do by writing the command to run
193
// under bash to a file, then executing that file.
194
let cmd: string;
195
if (opts.timeout && opts.ulimit_timeout) {
196
// This ensures that everything involved with this
197
// command really does die no matter what; it's
198
// better than killing from outside, since it gets
199
// all subprocesses since they inherit the limits.
200
// Leave it to the OS. Note that the argument to ulimit
201
// must be a whole number.
202
cmd = `ulimit -t ${Math.ceil(opts.timeout)}\n${opts.command}`;
203
} else {
204
cmd = opts.command;
205
}
206
207
// We write the cmd to a file, and replace the command and args
208
// with bash and the filename, then do everything below as we would
209
// have done anyways.
210
origCommand = opts.command;
211
opts.command = "bash";
212
tempDir = await mkdtemp(join(tmpdir(), "cocalc-"));
213
const tempPath = join(tempDir, "a.sh");
214
if (opts.verbose) {
215
log.debug("writing temp file that contains bash program", tempPath);
216
}
217
opts.args = [tempPath];
218
await writeFile(tempPath, cmd);
219
await chmod(tempPath, 0o700);
220
}
221
222
if (opts.async_call) {
223
// we return an ID, the caller can then use it to query the status
224
opts.max_output ??= 1024 * 1024; // we limit how much we keep in memory, to avoid problems;
225
opts.timeout ??= PROJECT_EXEC_DEFAULT_TIMEOUT_S;
226
const job_id = uuid();
227
const start = Date.now();
228
const job_config: ExecuteCodeOutputAsync = {
229
type: "async",
230
stdout: "",
231
stderr: "",
232
exit_code: 0,
233
start,
234
job_id,
235
status: "running",
236
};
237
asyncCache.set(job_id, job_config);
238
239
const child = doSpawn(
240
{ ...opts, origCommand, job_id, job_config },
241
async (err, result) => {
242
log.debug("async/doSpawn returned", { err, result });
243
try {
244
const info: Omit<
245
ExecuteCodeOutputAsync,
246
"stdout" | "stderr" | "exit_code"
247
> = {
248
job_id,
249
type: "async",
250
elapsed_s: (Date.now() - start) / 1000,
251
start,
252
status: "error",
253
};
254
if (err) {
255
asyncCacheUpdate(job_id, {
256
stdout: "",
257
stderr: `${err}`,
258
exit_code: 1,
259
...info,
260
});
261
} else if (result != null) {
262
asyncCacheUpdate(job_id, {
263
...result,
264
...info,
265
...{ status: "completed" },
266
});
267
} else {
268
asyncCacheUpdate(job_id, {
269
stdout: "",
270
stderr: `No result`,
271
exit_code: 1,
272
...info,
273
});
274
}
275
} finally {
276
await cleanUpTempDir(tempDir);
277
}
278
},
279
);
280
const pid = child?.pid;
281
282
// pid could be undefined, this means it wasn't possible to spawn a child
283
return { ...job_config, pid };
284
} else {
285
// This is the blocking variant
286
return await callback(doSpawn, { ...opts, origCommand });
287
}
288
} finally {
289
// do not delete the tempDir in async mode!
290
if (!opts.async_call) {
291
await cleanUpTempDir(tempDir);
292
}
293
}
294
}
295
296
function sumChildren(
297
procs: Processes,
298
children: { [pid: number]: number[] },
299
pid: number,
300
): { rss: number; pct_cpu: number; cpu_secs: number } | null {
301
const proc = procs[`${pid}`];
302
if (proc == null) {
303
log.debug(`sumChildren: no process ${pid} in proc`);
304
return null;
305
}
306
let rss = proc.stat.mem.rss;
307
let pct_cpu = proc.cpu.pct;
308
let cpu_secs = proc.cpu.secs;
309
for (const ch of children[pid] ?? []) {
310
const sc = sumChildren(procs, children, ch);
311
if (sc == null) return null;
312
rss += sc.rss;
313
pct_cpu += sc.pct_cpu;
314
cpu_secs += sc.cpu_secs;
315
}
316
return { rss, pct_cpu, cpu_secs };
317
}
318
319
function doSpawn(
320
opts: ExecuteCodeOptions & {
321
origCommand: string;
322
job_id?: string;
323
job_config?: ExecuteCodeOutputAsync;
324
},
325
cb?: (err: string | undefined, result?: ExecuteCodeOutputBlocking) => void,
326
) {
327
const start_time = walltime();
328
329
if (opts.verbose) {
330
log.debug(
331
"spawning",
332
opts.command,
333
"with args",
334
opts.args,
335
"and timeout",
336
opts.timeout,
337
"seconds",
338
);
339
}
340
341
const spawnOptions: SpawnOptionsWithoutStdio = {
342
detached: true, // so we can kill the entire process group if it times out
343
cwd: opts.path,
344
...(opts.uid ? { uid: opts.uid } : undefined),
345
...(opts.gid ? { uid: opts.gid } : undefined),
346
env: {
347
...envForSpawn(),
348
...opts.env,
349
...(opts.uid != null && opts.home ? { HOME: opts.home } : undefined),
350
},
351
};
352
353
// This is the state, which will be captured in closures
354
let child: ChildProcessWithoutNullStreams;
355
let ran_code = false;
356
let stdout = "";
357
let stderr = "";
358
let exit_code: undefined | number = undefined;
359
let stderr_is_done = false;
360
let stdout_is_done = false;
361
let killed = false;
362
let callback_done = false; // set in "finish", which is also called in a timeout
363
let timer: NodeJS.Timeout | undefined = undefined;
364
365
// periodically check up on the child process tree and record stats
366
// this also keeps the entry in the cache alive, when the ttl is less than the duration of the execution
367
async function startMonitor() {
368
const pid = child.pid;
369
const { job_id, job_config } = opts;
370
if (job_id == null || pid == null || job_config == null) return;
371
const monitor = new ProcessStats();
372
await monitor.init();
373
await delay(1000);
374
if (callback_done) return;
375
376
while (true) {
377
if (callback_done) return;
378
const { procs } = await monitor.processes(Date.now());
379
// reconstruct process tree
380
const children: { [pid: number]: number[] } = {};
381
for (const p of Object.values(procs)) {
382
const { pid, ppid } = p;
383
children[ppid] ??= [];
384
children[ppid].push(pid);
385
}
386
// we only consider those, which are the pid itself or one of its children
387
const sc = sumChildren(procs, children, pid);
388
if (sc == null) {
389
// If the process by PID is no longer known, either the process was killed or there are too many running.
390
// in any case, stop monitoring and do not update any data.
391
return;
392
}
393
const { rss, pct_cpu, cpu_secs } = sc;
394
// ?? fallback, in case the cache "forgot" about it
395
const obj = asyncCache.get(job_id) ?? job_config;
396
obj.pid = pid;
397
obj.stats ??= [];
398
obj.stats.push({
399
timestamp: Date.now(),
400
mem_rss: rss,
401
cpu_pct: pct_cpu,
402
cpu_secs,
403
});
404
truncStats(obj);
405
asyncCache.set(job_id, obj);
406
407
// initially, we record more frequently, but then we space it out up until the interval (probably 1 minute)
408
const elapsed_s = (Date.now() - job_config.start) / 1000;
409
// i.e. after 6 minutes, we check every minute
410
const next_s = Math.max(1, Math.floor(elapsed_s / 6));
411
const wait_s = Math.min(next_s, MONITOR_INTERVAL_S);
412
await delay(wait_s * 1000);
413
}
414
}
415
416
try {
417
child = spawn(opts.command, opts.args, spawnOptions);
418
if (child.stdout == null || child.stderr == null) {
419
// The docs/examples at https://nodejs.org/api/child_process.html#child_process_child_process_spawn_command_args_options
420
// suggest that r.stdout and r.stderr are always defined. However, this is
421
// definitely NOT the case in edge cases, as we have observed.
422
cb?.("error creating child process -- couldn't spawn child process");
423
return;
424
}
425
} catch (error) {
426
// Yes, spawn can cause this error if there is no memory, and there's no
427
// event! -- Error: spawn ENOMEM
428
ran_code = false;
429
cb?.(`error ${error}`);
430
return;
431
}
432
433
ran_code = true;
434
435
if (opts.verbose) {
436
log.debug("listening for stdout, stderr and exit_code...");
437
}
438
439
function update_async(
440
job_id: string | undefined,
441
aspect: "stdout" | "stderr" | "pid",
442
data: string | number,
443
): ExecuteCodeOutputAsync | undefined {
444
if (!job_id) return;
445
// job_config fallback, in case the cache forgot about it
446
const obj = asyncCache.get(job_id) ?? opts.job_config;
447
if (obj != null) {
448
if (aspect === "pid") {
449
if (typeof data === "number") {
450
obj.pid = data;
451
}
452
} else if (typeof data === "string") {
453
obj[aspect] = data;
454
}
455
asyncCache.set(job_id, obj);
456
}
457
return obj;
458
}
459
460
child.stdout.on("data", (data) => {
461
data = data.toString();
462
if (opts.max_output != null) {
463
if (stdout.length < opts.max_output) {
464
stdout += data.slice(0, opts.max_output - stdout.length);
465
}
466
} else {
467
stdout += data;
468
}
469
update_async(opts.job_id, "stdout", stdout);
470
});
471
472
child.stderr.on("data", (data) => {
473
data = data.toString();
474
if (opts.max_output != null) {
475
if (stderr.length < opts.max_output) {
476
stderr += data.slice(0, opts.max_output - stderr.length);
477
}
478
} else {
479
stderr += data;
480
}
481
update_async(opts.job_id, "stderr", stderr);
482
});
483
484
child.stderr.on("end", () => {
485
stderr_is_done = true;
486
finish();
487
});
488
489
child.stdout.on("end", () => {
490
stdout_is_done = true;
491
finish();
492
});
493
494
// Doc: https://nodejs.org/api/child_process.html#event-exit – read it!
495
// TODO: This is not 100% correct, because in case the process is killed (signal TERM),
496
// the $code is "null" and a second argument gives the signal (as a string). Hence, after a kill,
497
// this code below changes the exit code to 0. This could be a special case, though.
498
// It cannot be null, though, because the "finish" callback assumes that stdout, err and exit are set.
499
// The local $killed var is only true, if the process has been killed by the timeout – not by another kill.
500
child.on("exit", (code) => {
501
exit_code = code ?? 0;
502
finish();
503
});
504
505
// This can happen, e.g., "Error: spawn ENOMEM" if there is no memory. Without this handler,
506
// an unhandled exception gets raised, which is nasty.
507
// From docs: "Note that the exit-event may or may not fire after an error has occurred. "
508
child.on("error", (err) => {
509
if (exit_code == null) {
510
exit_code = 1;
511
}
512
stderr += to_json(err);
513
// a fundamental issue, we were not running some code
514
ran_code = false;
515
finish();
516
});
517
518
if (opts.job_id && child.pid) {
519
// we don't await it, it runs until $callback_done is true
520
update_async(opts.job_id, "pid", child.pid);
521
startMonitor();
522
}
523
524
const finish = (err?) => {
525
if (!killed && (!stdout_is_done || !stderr_is_done || exit_code == null)) {
526
// it wasn't killed and none of stdout, stderr, and exit_code hasn't been set.
527
// so we let the rest of them get set before actually finishing up.
528
return;
529
}
530
if (callback_done) {
531
// we already finished up.
532
return;
533
}
534
// finally finish up – this will also terminate the monitor
535
callback_done = true;
536
537
if (timer != null) {
538
clearTimeout(timer);
539
timer = undefined;
540
}
541
542
if (opts.verbose && log.isEnabled("debug")) {
543
log.debug(
544
"finished exec of",
545
opts.command,
546
"took",
547
walltime(start_time),
548
"seconds",
549
);
550
log.debug({
551
stdout: trunc(stdout, 512),
552
stderr: trunc(stderr, 512),
553
exit_code,
554
});
555
}
556
557
if (err) {
558
cb?.(err);
559
} else if (opts.err_on_exit && exit_code != 0) {
560
const x = opts.origCommand
561
? opts.origCommand
562
: `'${opts.command}' (args=${opts.args?.join(" ")})`;
563
if (opts.job_id) {
564
cb?.(stderr);
565
} else {
566
// sync behavor, like it was before
567
cb?.(
568
`command '${x}' exited with nonzero code ${exit_code} -- stderr='${trunc(
569
stderr,
570
1024,
571
)}'`,
572
);
573
}
574
} else if (!ran_code) {
575
// regardless of opts.err_on_exit !
576
const x = opts.origCommand
577
? opts.origCommand
578
: `'${opts.command}' (args=${opts.args?.join(" ")})`;
579
cb?.(
580
`command '${x}' was not able to run -- stderr='${trunc(stderr, 1024)}'`,
581
);
582
} else {
583
if (opts.max_output != null) {
584
if (stdout.length >= opts.max_output) {
585
stdout += ` (truncated at ${opts.max_output} characters)`;
586
}
587
if (stderr.length >= opts.max_output) {
588
stderr += ` (truncated at ${opts.max_output} characters)`;
589
}
590
}
591
if (exit_code == null) {
592
// if exit-code not set, may have been SIGKILL so we set it to 1
593
exit_code = 1;
594
}
595
cb?.(undefined, { type: "blocking", stdout, stderr, exit_code });
596
}
597
};
598
599
if (opts.timeout) {
600
// setup a timer that will kill the command after a certain amount of time.
601
const f = () => {
602
if (child.exitCode != null) {
603
// command already exited.
604
return;
605
}
606
if (opts.verbose) {
607
log.debug(
608
"subprocess did not exit after",
609
opts.timeout,
610
"seconds, so killing with SIGKILL",
611
);
612
}
613
try {
614
killed = true; // we set the kill flag in any case – i.e. process will no longer exist
615
if (child.pid != null) {
616
process.kill(-child.pid, "SIGKILL"); // this should kill process group
617
}
618
} catch (err) {
619
// Exceptions can happen, which left uncaught messes up calling code big time.
620
if (opts.verbose) {
621
log.debug("process.kill raised an exception", err);
622
}
623
}
624
finish(`killed command '${opts.command} ${opts.args?.join(" ")}'`);
625
};
626
timer = setTimeout(f, opts.timeout * 1000);
627
}
628
629
return child;
630
}
631
632