Path: blob/master/src/packages/jupyter/stateless-api/kernel.ts
1447 views
import { kernel as createKernel } from "@cocalc/jupyter/kernel";1import type { JupyterKernelInterface } from "@cocalc/jupyter/types/project-interface";2import { run_cell } from "@cocalc/jupyter/nbgrader/jupyter-run";3import { mkdtemp, rm } from "fs/promises";4import { tmpdir } from "os";5import { join } from "path";6import getLogger from "@cocalc/backend/logger";7import { reuseInFlight } from "@cocalc/util/reuse-in-flight";8import { type Limits } from "@cocalc/util/jupyter/nbgrader-types";910const log = getLogger("jupyter:stateless-api:kernel");1112export const DEFAULT_POOL_SIZE = 2;13const DEFAULT_POOL_TIMEOUT_S = 3600;1415// When we idle timeout we always keep at least this many kernels around. We don't go to 0.16const MIN_POOL_SIZE = 1;1718// -n = max open files19// -f = max bytes allowed to *write* to disk20// -t = max cputime is 30 seconds21// -v = max virtual memory usage to 3GB22const DEFAULT_ULIMIT = "-n 1000 -f 10485760 -t 30 -v 3000000";2324export default class Kernel {25private static pools: { [kernelName: string]: Kernel[] } = {};26private static last_active: { [kernelName: string]: number } = {};27private static ulimit: { [kernelName: string]: string } = {};2829private kernel?: JupyterKernelInterface;30private tempDir: string;3132constructor(private kernelName: string) {}3334private static getPool(kernelName: string) {35let pool = Kernel.pools[kernelName];36if (pool == null) {37pool = Kernel.pools[kernelName] = [];38}39return pool;40}4142// changing ulimit only impacts NEWLY **created** kernels.43static setUlimit(kernelName: string, ulimit: string) {44Kernel.ulimit[kernelName] = ulimit;45}4647// Set a timeout for a given kernel pool (for a specifically named kernel)48// to determine when to clear it if no requests have been made.49private static setIdleTimeout(kernelName: string, timeout_s: number) {50if (!timeout_s) {51// 0 = no timeout52return;53}54const now = Date.now();55Kernel.last_active[kernelName] = now;56setTimeout(57() => {58if (Kernel.last_active[kernelName] > now) {59// kernel was requested after now.60return;61}62// No recent request for kernelName.63// Keep at least MIN_POOL_SIZE in Kernel.pools[kernelName]. I.e.,64// instead of closing and deleting everything, we just want to65// shrink the pool to MIN_POOL_SIZE.66// no request for kernelName, so we clear them from the pool67const poolToShrink = Kernel.pools[kernelName] ?? [];68if (poolToShrink.length > MIN_POOL_SIZE) {69// check if pool needs shrinking70// calculate how many to close71const numToClose = poolToShrink.length - MIN_POOL_SIZE;72for (let i = 0; i < numToClose; i++) {73poolToShrink[i].close(); // close oldest kernels first74}75// update pool to have only the most recent kernels76Kernel.pools[kernelName] = poolToShrink.slice(numToClose);77}78},79(timeout_s ?? DEFAULT_POOL_TIMEOUT_S) * 1000,80);81}8283static async getFromPool(84kernelName: string,85{86size = DEFAULT_POOL_SIZE,87timeout_s = DEFAULT_POOL_TIMEOUT_S,88}: { size?: number; timeout_s?: number } = {},89): Promise<Kernel> {90if (size <= 0) {91// not using a pool -- just create and return kernel92const k = new Kernel(kernelName);93await k.init();94return k;95}96this.setIdleTimeout(kernelName, timeout_s);97const pool = Kernel.getPool(kernelName);98let i = 1;99while (pool.length <= size) {100// <= since going to remove one below101const k = new Kernel(kernelName);102// we cause this kernel to get init'd soon, but NOT immediately, since starting103// several at once just makes them all take much longer exactly when the user104// most wants to use their new kernel105setTimeout(106async () => {107try {108await k.init();109} catch (err) {110log.debug("Failed to pre-init Jupyter kernel -- ", kernelName, err);111}112},113// stagger startup by a few seconds, though kernels that are needed will start ASAP.114Math.random() * 3000 * i,115);116i += 1;117pool.push(k);118}119const k = pool.shift() as Kernel;120// it's ok to call again due to reuseInFlight and that no-op after init.121await k.init();122return k;123}124125private init = reuseInFlight(async () => {126if (this.kernel != null) {127// already initialized128return;129}130this.tempDir = await mkdtemp(join(tmpdir(), "cocalc"));131const path = `${this.tempDir}/execute.ipynb`;132this.kernel = createKernel({133name: this.kernelName,134path,135ulimit: Kernel.ulimit[this.kernelName] ?? DEFAULT_ULIMIT,136});137await this.kernel.ensure_running();138await this.kernel.execute_code_now({ code: "" });139});140141// empty all pools and do not refill142static closeAll() {143for (const kernelName in Kernel.pools) {144for (const kernel of Kernel.pools[kernelName]) {145kernel.close();146}147}148Kernel.pools = {};149Kernel.last_active = {};150}151152execute = async (153code: string,154limits: Partial<Limits> = {155timeout_ms: 30000,156timeout_ms_per_cell: 30000,157max_output: 5000000,158max_output_per_cell: 1000000,159start_time: Date.now(),160total_output: 0,161},162) => {163if (this.kernel == null) {164throw Error("kernel already closed");165}166167if (limits.total_output == null) {168limits.total_output = 0;169}170const cell = { cell_type: "code", source: [code], outputs: [] };171await run_cell(this.kernel, limits, cell);172return cell.outputs;173};174175chdir = async (path: string) => {176if (this.kernel == null) return;177await this.kernel.chdir(path);178};179180// this is not used anywhere181returnToPool = async (): Promise<void> => {182if (this.kernel == null) {183throw Error("kernel already closed");184}185const pool = Kernel.getPool(this.kernelName);186pool.push(this);187};188189close = async () => {190if (this.kernel == null) {191return;192}193try {194await this.kernel.close();195} catch (err) {196log.warn("Error closing kernel", err);197} finally {198delete this.kernel;199}200try {201await rm(this.tempDir, { force: true, recursive: true });202} catch (err) {203log.warn("Error cleaning up temporary directory", err);204}205};206}207208209