Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/hub/servers/app/upload.ts
1503 views
1
/*
2
Support user uploading files directly to CoCalc from their browsers.
3
4
- uploading to projects and compute servers, with full support for potentially
5
very LARGE file uploads that stream via NATS. This checks users is authenticated
6
with write access.
7
8
- uploading blobs to our database.
9
10
Which of the above happens depends on query params.
11
12
NOTE: Code for downloading files from projects/compute servers
13
is in the middle of packages/hub/proxy/handle-request.ts
14
15
16
I'm sorry the code below is so insane. It was extremely hard to write
17
and involves tricky state in subtle ways all over the place, due to
18
how the uploads are chunked and sent in bits by Dropzone, which is absolutely
19
necessary due to how cloudflare works.
20
*/
21
22
import { Router } from "express";
23
import { getLogger } from "@cocalc/hub/logger";
24
import getAccount from "@cocalc/server/auth/get-account";
25
import isCollaborator from "@cocalc/server/projects/is-collaborator";
26
import formidable from "formidable";
27
import { PassThrough } from "node:stream";
28
import { writeFile as writeFileToProject } from "@cocalc/conat/files/write";
29
import { join } from "path";
30
import { callback } from "awaiting";
31
32
// ridiculously long -- effectively no limit.
33
const MAX_UPLOAD_TIME_MS = 1000 * 60 * 60 * 24 * 7;
34
35
const logger = getLogger("hub:servers:app:upload");
36
37
export default function init(router: Router) {
38
router.post("/upload", async (req, res) => {
39
const account_id = await getAccount(req);
40
if (!account_id) {
41
res.status(500).send("user must be signed in to upload files");
42
return;
43
}
44
const { project_id, compute_server_id, path = "", ttl, blob } = req.query;
45
try {
46
if (blob) {
47
//await handleBlobUpload({ ttl, req, res });
48
console.log(ttl);
49
throw Error("not implemented");
50
} else {
51
await handleUploadToProject({
52
account_id,
53
project_id,
54
compute_server_id,
55
path,
56
req,
57
res,
58
});
59
}
60
} catch (err) {
61
logger.debug("upload failed ", err);
62
res.status(500).send(`upload failed -- ${err}`);
63
}
64
});
65
}
66
67
// async function handleBlobUpload({ ttl, req, res }) {
68
// throw Error("blob handling not implemented");
69
// }
70
71
const errors: { [key: string]: string[] } = {};
72
const finished: { [key: string]: { state: boolean; cb: () => void } } = {};
73
74
async function handleUploadToProject({
75
account_id,
76
project_id,
77
compute_server_id: compute_server_id0,
78
path,
79
req,
80
res,
81
}) {
82
logger.debug({
83
account_id,
84
project_id,
85
compute_server_id0,
86
path,
87
});
88
89
if (
90
typeof project_id != "string" ||
91
!(await isCollaborator({ account_id, project_id }))
92
) {
93
throw Error("user must be collaborator on project");
94
}
95
if (typeof compute_server_id0 != "string") {
96
throw Error("compute_server_id must be given");
97
}
98
const compute_server_id = parseInt(compute_server_id0);
99
if (typeof path != "string") {
100
throw Error("path must be given");
101
}
102
const done = { state: false, cb: () => {} };
103
let filename = "noname.txt";
104
let stream: any | null = null;
105
let chunkStream: any | null = null;
106
const form = formidable({
107
keepExtensions: true,
108
hashAlgorithm: "sha1",
109
// file = {"size":195,"newFilename":"649205cf239d49f350c645f00.py","originalFilename":"a (2).py","mimetype":"application/octet-stream","hash":"318c0246ae31424f9225b566e7e09bef6c8acc40"}
110
fileWriteStreamHandler: (file) => {
111
filename = file?.["originalFilename"] ?? "noname.txt";
112
const { chunkStream: chunkStream0, totalStream } = getWriteStream({
113
project_id,
114
compute_server_id,
115
path,
116
filename,
117
});
118
chunkStream = chunkStream0;
119
stream = totalStream;
120
(async () => {
121
for await (const data of chunkStream) {
122
stream.write(data);
123
}
124
done.state = true;
125
done.cb();
126
})();
127
return chunkStream;
128
},
129
});
130
131
const [fields] = await form.parse(req);
132
// console.log("form", { fields, files });
133
// fields looks like this: {"dzuuid":["ce5fa828-5155-4fa0-b30a-869bd4c956a5"],"dzchunkindex":["1"],"dztotalfilesize":["10000000"],"dzchunksize":["8000000"],"dztotalchunkcount":["2"],"dzchunkbyteoffset":["8000000"]}
134
135
// console.log({ filename, fields, path, files });
136
137
const index = parseInt(fields.dzchunkindex?.[0] ?? "0");
138
const count = parseInt(fields.dztotalchunkcount?.[0] ?? "1");
139
const key = JSON.stringify({ path, filename, compute_server_id, project_id });
140
if (index > 0 && errors?.[key]?.length > 0) {
141
res.status(500).send(`upload failed -- ${errors[key].join(", ")}`);
142
return;
143
}
144
if (index == 0) {
145
// start brand new upload. this is the only time we clear the errors map.
146
errors[key] = [];
147
finished[key] = { state: false, cb: () => {} };
148
// @ts-ignore
149
(async () => {
150
try {
151
// console.log("NATS: started writing ", filename);
152
await writeFileToProject({
153
stream,
154
project_id,
155
compute_server_id,
156
path: join(path, fields.fullPath?.[0] ?? filename),
157
maxWait: MAX_UPLOAD_TIME_MS,
158
});
159
// console.log("NATS: finished writing ", filename);
160
} catch (err) {
161
// console.log("NATS: error ", err);
162
errors[key].push(`${err}`);
163
} finally {
164
// console.log("NATS: freeing write stream");
165
freeWriteStream({
166
project_id,
167
compute_server_id,
168
path,
169
filename,
170
});
171
finished[key].state = true;
172
finished[key].cb();
173
}
174
})();
175
}
176
if (index == count - 1) {
177
// console.log("finish");
178
if (!done.state) {
179
const f = (cb) => {
180
done.cb = cb;
181
};
182
await callback(f);
183
}
184
stream.end();
185
if (!finished[key].state) {
186
const f = (cb) => {
187
finished[key].cb = cb;
188
};
189
await callback(f);
190
}
191
delete finished[key];
192
}
193
if ((errors[key]?.length ?? 0) > 0) {
194
// console.log("saying upload failed");
195
let e = errors[key].join(", ");
196
if (e.includes("Error: 503")) {
197
e += ", Upload service not running.";
198
}
199
res.status(500).send(`Upload failed: ${e}`);
200
} else {
201
// console.log("saying upload worked");
202
res.send({ status: "ok" });
203
}
204
}
205
206
function getKey(opts) {
207
return JSON.stringify(opts);
208
}
209
210
const streams: any = {};
211
export function getWriteStream(opts) {
212
const key = getKey(opts);
213
let totalStream = streams[key];
214
if (totalStream == null) {
215
totalStream = new PassThrough();
216
streams[key] = totalStream;
217
}
218
const chunkStream = new PassThrough();
219
return { chunkStream, totalStream };
220
}
221
222
function freeWriteStream(opts) {
223
delete streams[getKey(opts)];
224
}
225
226