Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/jupyter/stateless-api/kernel.ts
1447 views
1
import { kernel as createKernel } from "@cocalc/jupyter/kernel";
2
import type { JupyterKernelInterface } from "@cocalc/jupyter/types/project-interface";
3
import { run_cell } from "@cocalc/jupyter/nbgrader/jupyter-run";
4
import { mkdtemp, rm } from "fs/promises";
5
import { tmpdir } from "os";
6
import { join } from "path";
7
import getLogger from "@cocalc/backend/logger";
8
import { reuseInFlight } from "@cocalc/util/reuse-in-flight";
9
import { type Limits } from "@cocalc/util/jupyter/nbgrader-types";
10
11
const log = getLogger("jupyter:stateless-api:kernel");
12
13
export const DEFAULT_POOL_SIZE = 2;
14
const DEFAULT_POOL_TIMEOUT_S = 3600;
15
16
// When we idle timeout we always keep at least this many kernels around. We don't go to 0.
17
const MIN_POOL_SIZE = 1;
18
19
// -n = max open files
20
// -f = max bytes allowed to *write* to disk
21
// -t = max cputime is 30 seconds
22
// -v = max virtual memory usage to 3GB
23
const DEFAULT_ULIMIT = "-n 1000 -f 10485760 -t 30 -v 3000000";
24
25
export default class Kernel {
26
private static pools: { [kernelName: string]: Kernel[] } = {};
27
private static last_active: { [kernelName: string]: number } = {};
28
private static ulimit: { [kernelName: string]: string } = {};
29
30
private kernel?: JupyterKernelInterface;
31
private tempDir: string;
32
33
constructor(private kernelName: string) {}
34
35
private static getPool(kernelName: string) {
36
let pool = Kernel.pools[kernelName];
37
if (pool == null) {
38
pool = Kernel.pools[kernelName] = [];
39
}
40
return pool;
41
}
42
43
// changing ulimit only impacts NEWLY **created** kernels.
44
static setUlimit(kernelName: string, ulimit: string) {
45
Kernel.ulimit[kernelName] = ulimit;
46
}
47
48
// Set a timeout for a given kernel pool (for a specifically named kernel)
49
// to determine when to clear it if no requests have been made.
50
private static setIdleTimeout(kernelName: string, timeout_s: number) {
51
if (!timeout_s) {
52
// 0 = no timeout
53
return;
54
}
55
const now = Date.now();
56
Kernel.last_active[kernelName] = now;
57
setTimeout(
58
() => {
59
if (Kernel.last_active[kernelName] > now) {
60
// kernel was requested after now.
61
return;
62
}
63
// No recent request for kernelName.
64
// Keep at least MIN_POOL_SIZE in Kernel.pools[kernelName]. I.e.,
65
// instead of closing and deleting everything, we just want to
66
// shrink the pool to MIN_POOL_SIZE.
67
// no request for kernelName, so we clear them from the pool
68
const poolToShrink = Kernel.pools[kernelName] ?? [];
69
if (poolToShrink.length > MIN_POOL_SIZE) {
70
// check if pool needs shrinking
71
// calculate how many to close
72
const numToClose = poolToShrink.length - MIN_POOL_SIZE;
73
for (let i = 0; i < numToClose; i++) {
74
poolToShrink[i].close(); // close oldest kernels first
75
}
76
// update pool to have only the most recent kernels
77
Kernel.pools[kernelName] = poolToShrink.slice(numToClose);
78
}
79
},
80
(timeout_s ?? DEFAULT_POOL_TIMEOUT_S) * 1000,
81
);
82
}
83
84
static async getFromPool(
85
kernelName: string,
86
{
87
size = DEFAULT_POOL_SIZE,
88
timeout_s = DEFAULT_POOL_TIMEOUT_S,
89
}: { size?: number; timeout_s?: number } = {},
90
): Promise<Kernel> {
91
if (size <= 0) {
92
// not using a pool -- just create and return kernel
93
const k = new Kernel(kernelName);
94
await k.init();
95
return k;
96
}
97
this.setIdleTimeout(kernelName, timeout_s);
98
const pool = Kernel.getPool(kernelName);
99
let i = 1;
100
while (pool.length <= size) {
101
// <= since going to remove one below
102
const k = new Kernel(kernelName);
103
// we cause this kernel to get init'd soon, but NOT immediately, since starting
104
// several at once just makes them all take much longer exactly when the user
105
// most wants to use their new kernel
106
setTimeout(
107
async () => {
108
try {
109
await k.init();
110
} catch (err) {
111
log.debug("Failed to pre-init Jupyter kernel -- ", kernelName, err);
112
}
113
},
114
// stagger startup by a few seconds, though kernels that are needed will start ASAP.
115
Math.random() * 3000 * i,
116
);
117
i += 1;
118
pool.push(k);
119
}
120
const k = pool.shift() as Kernel;
121
// it's ok to call again due to reuseInFlight and that no-op after init.
122
await k.init();
123
return k;
124
}
125
126
private init = reuseInFlight(async () => {
127
if (this.kernel != null) {
128
// already initialized
129
return;
130
}
131
this.tempDir = await mkdtemp(join(tmpdir(), "cocalc"));
132
const path = `${this.tempDir}/execute.ipynb`;
133
this.kernel = createKernel({
134
name: this.kernelName,
135
path,
136
ulimit: Kernel.ulimit[this.kernelName] ?? DEFAULT_ULIMIT,
137
});
138
await this.kernel.ensure_running();
139
await this.kernel.execute_code_now({ code: "" });
140
});
141
142
// empty all pools and do not refill
143
static closeAll() {
144
for (const kernelName in Kernel.pools) {
145
for (const kernel of Kernel.pools[kernelName]) {
146
kernel.close();
147
}
148
}
149
Kernel.pools = {};
150
Kernel.last_active = {};
151
}
152
153
execute = async (
154
code: string,
155
limits: Partial<Limits> = {
156
timeout_ms: 30000,
157
timeout_ms_per_cell: 30000,
158
max_output: 5000000,
159
max_output_per_cell: 1000000,
160
start_time: Date.now(),
161
total_output: 0,
162
},
163
) => {
164
if (this.kernel == null) {
165
throw Error("kernel already closed");
166
}
167
168
if (limits.total_output == null) {
169
limits.total_output = 0;
170
}
171
const cell = { cell_type: "code", source: [code], outputs: [] };
172
await run_cell(this.kernel, limits, cell);
173
return cell.outputs;
174
};
175
176
chdir = async (path: string) => {
177
if (this.kernel == null) return;
178
await this.kernel.chdir(path);
179
};
180
181
// this is not used anywhere
182
returnToPool = async (): Promise<void> => {
183
if (this.kernel == null) {
184
throw Error("kernel already closed");
185
}
186
const pool = Kernel.getPool(this.kernelName);
187
pool.push(this);
188
};
189
190
close = async () => {
191
if (this.kernel == null) {
192
return;
193
}
194
try {
195
await this.kernel.close();
196
} catch (err) {
197
log.warn("Error closing kernel", err);
198
} finally {
199
delete this.kernel;
200
}
201
try {
202
await rm(this.tempDir, { force: true, recursive: true });
203
} catch (err) {
204
log.warn("Error cleaning up temporary directory", err);
205
}
206
};
207
}
208
209