Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/conat/files/write.ts
1453 views
1
/*
2
Streaming write over Conat to a project or compute server.
3
4
This is a key component to support user uploads, while being memory efficient
5
by streaming the write. Basically it uses conat to support efficiently doing
6
streaming writes of files to any compute server or project that is somehow
7
connected to conat.
8
9
INSTRUCTIONS:
10
11
Import writeFile:
12
13
import { writeFile } from "@cocalc/conat/files/write";
14
15
Now you can write a given path to a project (or compute_server) as
16
simply as this:
17
18
const stream = createReadStream('a file')
19
await writeFile({stream, project_id, compute_server_id, path, maxWait})
20
21
- Here stream can be any readable stream, not necessarily a stream made using
22
a file. E.g., you could use PassThrough and explicitly write to it by
23
write calls.
24
25
- maxWait is a time in ms after which if the file isn't fully written, everything
26
is cleaned up and there is an error.
27
28
29
HOW THIS WORKS:
30
31
Here's how this works from the side of the compute server:
32
33
- We start a request/response conat server on the compute server:
34
- There's one message it accepts, which is:
35
"Using streaming download to get {path} from [subject]."
36
The sender of that message should set a long timeout (e.g., 10 minutes).
37
- It uses the streaming read functionality (in read.ts) to download and write
38
to disk the file {path}.
39
- When done it responds {status:"success"} or {status:'error', error:'message...'}
40
41
Here's how it works from the side of whoever is sending the file:
42
43
- Start read server at [subject] that can send {path}.
44
- Send a request saying "we are making {path} available to you at [subject]."
45
- Get back "ok" or error. On error (or timeout), close the read server.
46
- Serve {path} exactly once using the server. When finish sending {path},
47
close it and clean up. We're done.
48
49
50
51
DEVELOPMENT:
52
53
See src/packages/backend/conat/test/files/write.test.ts for unit tests.
54
55
~/cocalc/src/packages/backend$ node
56
57
require('@cocalc/backend/conat'); a = require('@cocalc/conat/files/write');
58
59
project_id = '00847397-d6a8-4cb0-96a8-6ef64ac3e6cf'; compute_server_id = 0; await a.createServer({project_id,compute_server_id,createWriteStream:require('fs').createWriteStream});
60
61
stream=require('fs').createReadStream('env.ts');
62
await a.writeFile({stream, project_id, compute_server_id, path:'/tmp/a.ts'})
63
64
*/
65
66
import { conat } from "@cocalc/conat/client";
67
import { randomId } from "@cocalc/conat/names";
68
import {
69
close as closeReadService,
70
createServer as createReadServer,
71
readFile,
72
} from "./read";
73
import { projectSubject } from "@cocalc/conat/names";
74
import { type Subscription } from "@cocalc/conat/core/client";
75
import { type Readable } from "node:stream";
76
77
function getWriteSubject({ project_id, compute_server_id }) {
78
return projectSubject({
79
project_id,
80
compute_server_id,
81
service: "files:write",
82
});
83
}
84
85
let subs: { [name: string]: Subscription } = {};
86
export async function close({ project_id, compute_server_id }) {
87
const subject = getWriteSubject({ project_id, compute_server_id });
88
if (subs[subject] == null) {
89
return;
90
}
91
const sub = subs[subject];
92
delete subs[subject];
93
await sub.drain();
94
}
95
96
export async function createServer({
97
project_id,
98
compute_server_id,
99
createWriteStream,
100
}: {
101
project_id: string;
102
compute_server_id: number;
103
// createWriteStream returns a writeable stream
104
// for writing the specified path to disk. It
105
// can be an async function.
106
createWriteStream: (path: string) => any;
107
}) {
108
const subject = getWriteSubject({ project_id, compute_server_id });
109
let sub = subs[subject];
110
if (sub != null) {
111
return;
112
}
113
const cn = await conat();
114
sub = await cn.subscribe(subject);
115
subs[subject] = sub;
116
listen({ sub, createWriteStream, project_id, compute_server_id });
117
}
118
119
async function listen({
120
sub,
121
createWriteStream,
122
project_id,
123
compute_server_id,
124
}) {
125
// NOTE: we just handle as many messages as we get in parallel, so this
126
// could be a large number of simultaneous downloads. These are all by
127
// authenticated users of the project, and the load is on the project,
128
// so I think that makes sense.
129
for await (const mesg of sub) {
130
handleMessage({ mesg, createWriteStream, project_id, compute_server_id });
131
}
132
}
133
134
async function handleMessage({
135
mesg,
136
createWriteStream,
137
project_id,
138
compute_server_id,
139
}) {
140
let error = "";
141
let writeStream: null | Awaited<ReturnType<typeof createWriteStream>> = null;
142
try {
143
const { path, name, maxWait } = mesg.data;
144
writeStream = await createWriteStream(path);
145
// console.log("created writeStream");
146
writeStream.on("error", (err) => {
147
error = `${err}`;
148
mesg.respondSync({ error, status: "error" });
149
console.warn(`error writing ${path}: ${error}`);
150
writeStream.emit("remove");
151
});
152
let chunks = 0;
153
let bytes = 0;
154
for await (const chunk of await readFile({
155
project_id,
156
compute_server_id,
157
name,
158
path,
159
maxWait,
160
})) {
161
if (error) {
162
// console.log("error", error);
163
writeStream.end();
164
return;
165
}
166
writeStream.write(chunk);
167
chunks += 1;
168
bytes += chunk.length;
169
// console.log("wrote ", bytes);
170
}
171
writeStream.end();
172
writeStream.emit("rename");
173
mesg.respondSync({ status: "success", bytes, chunks });
174
} catch (err) {
175
if (!error) {
176
mesg.respondSync({ error: `${err}`, status: "error" });
177
writeStream?.emit("remove");
178
}
179
}
180
}
181
182
export interface WriteFileOptions {
183
project_id: string;
184
compute_server_id?: number;
185
path: string;
186
stream: Readable;
187
maxWait?: number;
188
}
189
190
export async function writeFile({
191
project_id,
192
compute_server_id = 0,
193
path,
194
stream,
195
maxWait = 1000 * 60 * 10, // 10 minutes
196
}): Promise<{ bytes: number; chunks: number }> {
197
const name = randomId();
198
try {
199
function createReadStream() {
200
return stream;
201
}
202
// start read server
203
await createReadServer({
204
createReadStream,
205
project_id,
206
compute_server_id,
207
name,
208
});
209
// tell compute server to start reading our file.
210
const cn = await conat();
211
const resp = await cn.request(
212
getWriteSubject({ project_id, compute_server_id }),
213
{ name, path, maxWait },
214
{ timeout: maxWait },
215
);
216
const { error, bytes, chunks } = resp.data;
217
if (error) {
218
throw Error(error);
219
}
220
return { bytes, chunks };
221
} finally {
222
await closeReadService({ project_id, compute_server_id, name });
223
}
224
}
225
226