Path: blob/master/src/packages/backend/execute-code.ts
1447 views
/*1* This file is part of CoCalc: Copyright © 2020–2024 Sagemath, Inc.2* License: MS-RSL – see LICENSE.md for details3*/45// Execute code in a subprocess.67import { callback, delay } from "awaiting";8import LRU from "lru-cache";9import {10ChildProcessWithoutNullStreams,11spawn,12SpawnOptionsWithoutStdio,13} from "node:child_process";14import { chmod, mkdtemp, rm, writeFile } from "node:fs/promises";15import { tmpdir } from "node:os";16import { join } from "node:path";17import { EventEmitter } from "node:stream";18import shellEscape from "shell-escape";19import getLogger from "@cocalc/backend/logger";20import { envToInt } from "@cocalc/backend/misc/env-to-number";21import { aggregate } from "@cocalc/util/aggregate";22import { callback_opts } from "@cocalc/util/async-utils";23import { PROJECT_EXEC_DEFAULT_TIMEOUT_S } from "@cocalc/util/consts/project";24import { to_json, trunc, uuid, walltime } from "@cocalc/util/misc";25import {26ExecuteCodeOutputAsync,27ExecuteCodeOutputBlocking,28isExecuteCodeOptionsAsyncGet,29type ExecuteCodeFunctionWithCallback,30type ExecuteCodeOptions,31type ExecuteCodeOptionsAsyncGet,32type ExecuteCodeOptionsWithCallback,33type ExecuteCodeOutput,34} from "@cocalc/util/types/execute-code";35import { Processes } from "@cocalc/util/types/project-info/types";36import { envForSpawn } from "./misc";37import { ProcessStats } from "./process-stats";3839const log = getLogger("execute-code");4041const PREFIX = "COCALC_PROJECT_ASYNC_EXEC";42const ASYNC_CACHE_MAX = envToInt(`${PREFIX}_CACHE_MAX`, 100);43const ASYNC_CACHE_TTL_S = envToInt(`${PREFIX}_TTL_S`, 60 * 60);44// for async execution, every that many secs check up on the child-tree45let MONITOR_INTERVAL_S = envToInt(`${PREFIX}_MONITOR_INTERVAL_S`, 60);4647export function setMonitorIntervalSeconds(n) {48MONITOR_INTERVAL_S = n;49}5051const MONITOR_STATS_LENGTH_MAX = envToInt(52`${PREFIX}_MONITOR_STATS_LENGTH_MAX`,53100,54);5556log.debug("configuration:", {57ASYNC_CACHE_MAX,58ASYNC_CACHE_TTL_S,59MONITOR_INTERVAL_S,60MONITOR_STATS_LENGTH_MAX,61});6263type AsyncAwait = "finished";64const updates = new EventEmitter();65const eventKey = (type: AsyncAwait, job_id: string): string =>66`${type}-${job_id}`;6768const asyncCache = new LRU<string, ExecuteCodeOutputAsync>({69max: ASYNC_CACHE_MAX,70ttl: 1000 * ASYNC_CACHE_TTL_S,71updateAgeOnGet: true,72updateAgeOnHas: true,73});7475function truncStats(obj?: ExecuteCodeOutputAsync) {76if (Array.isArray(obj?.stats)) {77// truncate to $MONITOR_STATS_LENGTH_MAX, by discarding the inital entries78obj.stats = obj.stats.slice(obj.stats.length - MONITOR_STATS_LENGTH_MAX);79}80}8182function asyncCacheUpdate(job_id: string, upd): ExecuteCodeOutputAsync {83const obj = asyncCache.get(job_id);84if (Array.isArray(obj?.stats) && Array.isArray(upd.stats)) {85obj.stats.push(...upd.stats);86truncStats(obj);87}88const next: ExecuteCodeOutputAsync = { ...obj, ...upd };89asyncCache.set(job_id, next);90if (next.status !== "running") {91updates.emit(eventKey("finished", next.job_id), next);92}93return next;94}9596// Async/await interface to executing code.97export async function executeCode(98opts: ExecuteCodeOptions | ExecuteCodeOptionsAsyncGet,99): Promise<ExecuteCodeOutput> {100return await callback_opts(execute_code)(opts);101}102103// Callback interface to executing code.104// This will get deprecated and is only used by some old coffeescript code.105export const execute_code: ExecuteCodeFunctionWithCallback = aggregate(106(opts: ExecuteCodeOptionsWithCallback): void => {107(async () => {108try {109let data = await executeCodeNoAggregate(opts);110if (isExecuteCodeOptionsAsyncGet(opts) && data.type === "async") {111// stats could contain a lot of data. we only return it if requested.112if (opts.async_stats !== true) {113data = { ...data, stats: undefined };114}115}116opts.cb?.(undefined, data);117} catch (err) {118opts.cb?.(err);119}120})();121},122);123124export async function cleanUpTempDir(tempDir: string | undefined) {125if (tempDir) {126try {127await rm(tempDir, { force: true, recursive: true });128} catch (err) {129console.log("WARNING: issue cleaning up tempDir", err);130}131}132}133134// actual implementation, without the aggregate wrapper135async function executeCodeNoAggregate(136opts: ExecuteCodeOptions | ExecuteCodeOptionsAsyncGet,137): Promise<ExecuteCodeOutput> {138if (isExecuteCodeOptionsAsyncGet(opts)) {139const key = opts.async_get;140const cached = asyncCache.get(key);141if (cached != null) {142const { async_await } = opts;143if (cached.status === "running" && async_await === true) {144return new Promise((done) =>145updates.once(eventKey("finished", key), (data) => done(data)),146);147} else {148return cached;149}150} else {151throw new Error(`Async operation '${key}' does not exist.`);152}153}154155opts.args ??= [];156opts.timeout ??= PROJECT_EXEC_DEFAULT_TIMEOUT_S;157opts.ulimit_timeout ??= true;158opts.err_on_exit ??= true;159opts.verbose ??= false;160161if (opts.verbose) {162log.debug(`input: ${opts.command} ${opts.args?.join(" ")}`);163}164const s = opts.command.split(/\s+/g); // split on whitespace165if (opts.args?.length === 0 && s.length > 1) {166opts.bash = true;167} else if (opts.bash && opts.args?.length > 0) {168// Selected bash, but still passed in args.169opts.command = shellEscape([opts.command].concat(opts.args));170opts.args = [];171}172173if (opts.home == null) {174opts.home = process.env.HOME;175}176177if (opts.path == null) {178opts.path = opts.home;179} else if (opts.path[0] !== "/") {180opts.path = opts.home + "/" + opts.path;181}182if (opts.cwd) {183opts.path = opts.cwd;184}185186let tempDir: string | undefined = undefined;187188try {189let origCommand = "";190if (opts.bash) {191// using bash, which (for better or worse), we do by writing the command to run192// under bash to a file, then executing that file.193let cmd: string;194if (opts.timeout && opts.ulimit_timeout) {195// This ensures that everything involved with this196// command really does die no matter what; it's197// better than killing from outside, since it gets198// all subprocesses since they inherit the limits.199// Leave it to the OS. Note that the argument to ulimit200// must be a whole number.201cmd = `ulimit -t ${Math.ceil(opts.timeout)}\n${opts.command}`;202} else {203cmd = opts.command;204}205206// We write the cmd to a file, and replace the command and args207// with bash and the filename, then do everything below as we would208// have done anyways.209origCommand = opts.command;210opts.command = "bash";211tempDir = await mkdtemp(join(tmpdir(), "cocalc-"));212const tempPath = join(tempDir, "a.sh");213if (opts.verbose) {214log.debug("writing temp file that contains bash program", tempPath);215}216opts.args = [tempPath];217await writeFile(tempPath, cmd);218await chmod(tempPath, 0o700);219}220221if (opts.async_call) {222// we return an ID, the caller can then use it to query the status223opts.max_output ??= 1024 * 1024; // we limit how much we keep in memory, to avoid problems;224opts.timeout ??= PROJECT_EXEC_DEFAULT_TIMEOUT_S;225const job_id = uuid();226const start = Date.now();227const job_config: ExecuteCodeOutputAsync = {228type: "async",229stdout: "",230stderr: "",231exit_code: 0,232start,233job_id,234status: "running",235};236asyncCache.set(job_id, job_config);237238const child = doSpawn(239{ ...opts, origCommand, job_id, job_config },240async (err, result) => {241log.debug("async/doSpawn returned", { err, result });242try {243const info: Omit<244ExecuteCodeOutputAsync,245"stdout" | "stderr" | "exit_code"246> = {247job_id,248type: "async",249elapsed_s: (Date.now() - start) / 1000,250start,251status: "error",252};253if (err) {254asyncCacheUpdate(job_id, {255stdout: "",256stderr: `${err}`,257exit_code: 1,258...info,259});260} else if (result != null) {261asyncCacheUpdate(job_id, {262...result,263...info,264...{ status: "completed" },265});266} else {267asyncCacheUpdate(job_id, {268stdout: "",269stderr: `No result`,270exit_code: 1,271...info,272});273}274} finally {275await cleanUpTempDir(tempDir);276}277},278);279const pid = child?.pid;280281// pid could be undefined, this means it wasn't possible to spawn a child282return { ...job_config, pid };283} else {284// This is the blocking variant285return await callback(doSpawn, { ...opts, origCommand });286}287} finally {288// do not delete the tempDir in async mode!289if (!opts.async_call) {290await cleanUpTempDir(tempDir);291}292}293}294295function sumChildren(296procs: Processes,297children: { [pid: number]: number[] },298pid: number,299): { rss: number; pct_cpu: number; cpu_secs: number } | null {300const proc = procs[`${pid}`];301if (proc == null) {302log.debug(`sumChildren: no process ${pid} in proc`);303return null;304}305let rss = proc.stat.mem.rss;306let pct_cpu = proc.cpu.pct;307let cpu_secs = proc.cpu.secs;308for (const ch of children[pid] ?? []) {309const sc = sumChildren(procs, children, ch);310if (sc == null) return null;311rss += sc.rss;312pct_cpu += sc.pct_cpu;313cpu_secs += sc.cpu_secs;314}315return { rss, pct_cpu, cpu_secs };316}317318function doSpawn(319opts: ExecuteCodeOptions & {320origCommand: string;321job_id?: string;322job_config?: ExecuteCodeOutputAsync;323},324cb?: (err: string | undefined, result?: ExecuteCodeOutputBlocking) => void,325) {326const start_time = walltime();327328if (opts.verbose) {329log.debug(330"spawning",331opts.command,332"with args",333opts.args,334"and timeout",335opts.timeout,336"seconds",337);338}339340const spawnOptions: SpawnOptionsWithoutStdio = {341detached: true, // so we can kill the entire process group if it times out342cwd: opts.path,343...(opts.uid ? { uid: opts.uid } : undefined),344...(opts.gid ? { uid: opts.gid } : undefined),345env: {346...envForSpawn(),347...opts.env,348...(opts.uid != null && opts.home ? { HOME: opts.home } : undefined),349},350};351352// This is the state, which will be captured in closures353let child: ChildProcessWithoutNullStreams;354let ran_code = false;355let stdout = "";356let stderr = "";357let exit_code: undefined | number = undefined;358let stderr_is_done = false;359let stdout_is_done = false;360let killed = false;361let callback_done = false; // set in "finish", which is also called in a timeout362let timer: NodeJS.Timeout | undefined = undefined;363364// periodically check up on the child process tree and record stats365// this also keeps the entry in the cache alive, when the ttl is less than the duration of the execution366async function startMonitor() {367const pid = child.pid;368const { job_id, job_config } = opts;369if (job_id == null || pid == null || job_config == null) return;370const monitor = new ProcessStats();371await monitor.init();372await delay(1000);373if (callback_done) return;374375while (true) {376if (callback_done) return;377const { procs } = await monitor.processes(Date.now());378// reconstruct process tree379const children: { [pid: number]: number[] } = {};380for (const p of Object.values(procs)) {381const { pid, ppid } = p;382children[ppid] ??= [];383children[ppid].push(pid);384}385// we only consider those, which are the pid itself or one of its children386const sc = sumChildren(procs, children, pid);387if (sc == null) {388// If the process by PID is no longer known, either the process was killed or there are too many running.389// in any case, stop monitoring and do not update any data.390return;391}392const { rss, pct_cpu, cpu_secs } = sc;393// ?? fallback, in case the cache "forgot" about it394const obj = asyncCache.get(job_id) ?? job_config;395obj.pid = pid;396obj.stats ??= [];397obj.stats.push({398timestamp: Date.now(),399mem_rss: rss,400cpu_pct: pct_cpu,401cpu_secs,402});403truncStats(obj);404asyncCache.set(job_id, obj);405406// initially, we record more frequently, but then we space it out up until the interval (probably 1 minute)407const elapsed_s = (Date.now() - job_config.start) / 1000;408// i.e. after 6 minutes, we check every minute409const next_s = Math.max(1, Math.floor(elapsed_s / 6));410const wait_s = Math.min(next_s, MONITOR_INTERVAL_S);411await delay(wait_s * 1000);412}413}414415try {416child = spawn(opts.command, opts.args, spawnOptions);417if (child.stdout == null || child.stderr == null) {418// The docs/examples at https://nodejs.org/api/child_process.html#child_process_child_process_spawn_command_args_options419// suggest that r.stdout and r.stderr are always defined. However, this is420// definitely NOT the case in edge cases, as we have observed.421cb?.("error creating child process -- couldn't spawn child process");422return;423}424} catch (error) {425// Yes, spawn can cause this error if there is no memory, and there's no426// event! -- Error: spawn ENOMEM427ran_code = false;428cb?.(`error ${error}`);429return;430}431432ran_code = true;433434if (opts.verbose) {435log.debug("listening for stdout, stderr and exit_code...");436}437438function update_async(439job_id: string | undefined,440aspect: "stdout" | "stderr" | "pid",441data: string | number,442): ExecuteCodeOutputAsync | undefined {443if (!job_id) return;444// job_config fallback, in case the cache forgot about it445const obj = asyncCache.get(job_id) ?? opts.job_config;446if (obj != null) {447if (aspect === "pid") {448if (typeof data === "number") {449obj.pid = data;450}451} else if (typeof data === "string") {452obj[aspect] = data;453}454asyncCache.set(job_id, obj);455}456return obj;457}458459child.stdout.on("data", (data) => {460data = data.toString();461if (opts.max_output != null) {462if (stdout.length < opts.max_output) {463stdout += data.slice(0, opts.max_output - stdout.length);464}465} else {466stdout += data;467}468update_async(opts.job_id, "stdout", stdout);469});470471child.stderr.on("data", (data) => {472data = data.toString();473if (opts.max_output != null) {474if (stderr.length < opts.max_output) {475stderr += data.slice(0, opts.max_output - stderr.length);476}477} else {478stderr += data;479}480update_async(opts.job_id, "stderr", stderr);481});482483child.stderr.on("end", () => {484stderr_is_done = true;485finish();486});487488child.stdout.on("end", () => {489stdout_is_done = true;490finish();491});492493// Doc: https://nodejs.org/api/child_process.html#event-exit – read it!494// TODO: This is not 100% correct, because in case the process is killed (signal TERM),495// the $code is "null" and a second argument gives the signal (as a string). Hence, after a kill,496// this code below changes the exit code to 0. This could be a special case, though.497// It cannot be null, though, because the "finish" callback assumes that stdout, err and exit are set.498// The local $killed var is only true, if the process has been killed by the timeout – not by another kill.499child.on("exit", (code) => {500exit_code = code ?? 0;501finish();502});503504// This can happen, e.g., "Error: spawn ENOMEM" if there is no memory. Without this handler,505// an unhandled exception gets raised, which is nasty.506// From docs: "Note that the exit-event may or may not fire after an error has occurred. "507child.on("error", (err) => {508if (exit_code == null) {509exit_code = 1;510}511stderr += to_json(err);512// a fundamental issue, we were not running some code513ran_code = false;514finish();515});516517if (opts.job_id && child.pid) {518// we don't await it, it runs until $callback_done is true519update_async(opts.job_id, "pid", child.pid);520startMonitor();521}522523const finish = (err?) => {524if (!killed && (!stdout_is_done || !stderr_is_done || exit_code == null)) {525// it wasn't killed and none of stdout, stderr, and exit_code hasn't been set.526// so we let the rest of them get set before actually finishing up.527return;528}529if (callback_done) {530// we already finished up.531return;532}533// finally finish up – this will also terminate the monitor534callback_done = true;535536if (timer != null) {537clearTimeout(timer);538timer = undefined;539}540541if (opts.verbose && log.isEnabled("debug")) {542log.debug(543"finished exec of",544opts.command,545"took",546walltime(start_time),547"seconds",548);549log.debug({550stdout: trunc(stdout, 512),551stderr: trunc(stderr, 512),552exit_code,553});554}555556if (err) {557cb?.(err);558} else if (opts.err_on_exit && exit_code != 0) {559const x = opts.origCommand560? opts.origCommand561: `'${opts.command}' (args=${opts.args?.join(" ")})`;562if (opts.job_id) {563cb?.(stderr);564} else {565// sync behavor, like it was before566cb?.(567`command '${x}' exited with nonzero code ${exit_code} -- stderr='${trunc(568stderr,5691024,570)}'`,571);572}573} else if (!ran_code) {574// regardless of opts.err_on_exit !575const x = opts.origCommand576? opts.origCommand577: `'${opts.command}' (args=${opts.args?.join(" ")})`;578cb?.(579`command '${x}' was not able to run -- stderr='${trunc(stderr, 1024)}'`,580);581} else {582if (opts.max_output != null) {583if (stdout.length >= opts.max_output) {584stdout += ` (truncated at ${opts.max_output} characters)`;585}586if (stderr.length >= opts.max_output) {587stderr += ` (truncated at ${opts.max_output} characters)`;588}589}590if (exit_code == null) {591// if exit-code not set, may have been SIGKILL so we set it to 1592exit_code = 1;593}594cb?.(undefined, { type: "blocking", stdout, stderr, exit_code });595}596};597598if (opts.timeout) {599// setup a timer that will kill the command after a certain amount of time.600const f = () => {601if (child.exitCode != null) {602// command already exited.603return;604}605if (opts.verbose) {606log.debug(607"subprocess did not exit after",608opts.timeout,609"seconds, so killing with SIGKILL",610);611}612try {613killed = true; // we set the kill flag in any case – i.e. process will no longer exist614if (child.pid != null) {615process.kill(-child.pid, "SIGKILL"); // this should kill process group616}617} catch (err) {618// Exceptions can happen, which left uncaught messes up calling code big time.619if (opts.verbose) {620log.debug("process.kill raised an exception", err);621}622}623finish(`killed command '${opts.command} ${opts.args?.join(" ")}'`);624};625timer = setTimeout(f, opts.timeout * 1000);626}627628return child;629}630631632