Path: blob/master/src/packages/hub/servers/app/upload.ts
1503 views
/*1Support user uploading files directly to CoCalc from their browsers.23- uploading to projects and compute servers, with full support for potentially4very LARGE file uploads that stream via NATS. This checks users is authenticated5with write access.67- uploading blobs to our database.89Which of the above happens depends on query params.1011NOTE: Code for downloading files from projects/compute servers12is in the middle of packages/hub/proxy/handle-request.ts131415I'm sorry the code below is so insane. It was extremely hard to write16and involves tricky state in subtle ways all over the place, due to17how the uploads are chunked and sent in bits by Dropzone, which is absolutely18necessary due to how cloudflare works.19*/2021import { Router } from "express";22import { getLogger } from "@cocalc/hub/logger";23import getAccount from "@cocalc/server/auth/get-account";24import isCollaborator from "@cocalc/server/projects/is-collaborator";25import formidable from "formidable";26import { PassThrough } from "node:stream";27import { writeFile as writeFileToProject } from "@cocalc/conat/files/write";28import { join } from "path";29import { callback } from "awaiting";3031// ridiculously long -- effectively no limit.32const MAX_UPLOAD_TIME_MS = 1000 * 60 * 60 * 24 * 7;3334const logger = getLogger("hub:servers:app:upload");3536export default function init(router: Router) {37router.post("/upload", async (req, res) => {38const account_id = await getAccount(req);39if (!account_id) {40res.status(500).send("user must be signed in to upload files");41return;42}43const { project_id, compute_server_id, path = "", ttl, blob } = req.query;44try {45if (blob) {46//await handleBlobUpload({ ttl, req, res });47console.log(ttl);48throw Error("not implemented");49} else {50await handleUploadToProject({51account_id,52project_id,53compute_server_id,54path,55req,56res,57});58}59} catch (err) {60logger.debug("upload failed ", err);61res.status(500).send(`upload failed -- ${err}`);62}63});64}6566// async function handleBlobUpload({ ttl, req, res }) {67// throw Error("blob handling not implemented");68// }6970const errors: { [key: string]: string[] } = {};71const finished: { [key: string]: { state: boolean; cb: () => void } } = {};7273async function handleUploadToProject({74account_id,75project_id,76compute_server_id: compute_server_id0,77path,78req,79res,80}) {81logger.debug({82account_id,83project_id,84compute_server_id0,85path,86});8788if (89typeof project_id != "string" ||90!(await isCollaborator({ account_id, project_id }))91) {92throw Error("user must be collaborator on project");93}94if (typeof compute_server_id0 != "string") {95throw Error("compute_server_id must be given");96}97const compute_server_id = parseInt(compute_server_id0);98if (typeof path != "string") {99throw Error("path must be given");100}101const done = { state: false, cb: () => {} };102let filename = "noname.txt";103let stream: any | null = null;104let chunkStream: any | null = null;105const form = formidable({106keepExtensions: true,107hashAlgorithm: "sha1",108// file = {"size":195,"newFilename":"649205cf239d49f350c645f00.py","originalFilename":"a (2).py","mimetype":"application/octet-stream","hash":"318c0246ae31424f9225b566e7e09bef6c8acc40"}109fileWriteStreamHandler: (file) => {110filename = file?.["originalFilename"] ?? "noname.txt";111const { chunkStream: chunkStream0, totalStream } = getWriteStream({112project_id,113compute_server_id,114path,115filename,116});117chunkStream = chunkStream0;118stream = totalStream;119(async () => {120for await (const data of chunkStream) {121stream.write(data);122}123done.state = true;124done.cb();125})();126return chunkStream;127},128});129130const [fields] = await form.parse(req);131// console.log("form", { fields, files });132// fields looks like this: {"dzuuid":["ce5fa828-5155-4fa0-b30a-869bd4c956a5"],"dzchunkindex":["1"],"dztotalfilesize":["10000000"],"dzchunksize":["8000000"],"dztotalchunkcount":["2"],"dzchunkbyteoffset":["8000000"]}133134// console.log({ filename, fields, path, files });135136const index = parseInt(fields.dzchunkindex?.[0] ?? "0");137const count = parseInt(fields.dztotalchunkcount?.[0] ?? "1");138const key = JSON.stringify({ path, filename, compute_server_id, project_id });139if (index > 0 && errors?.[key]?.length > 0) {140res.status(500).send(`upload failed -- ${errors[key].join(", ")}`);141return;142}143if (index == 0) {144// start brand new upload. this is the only time we clear the errors map.145errors[key] = [];146finished[key] = { state: false, cb: () => {} };147// @ts-ignore148(async () => {149try {150// console.log("NATS: started writing ", filename);151await writeFileToProject({152stream,153project_id,154compute_server_id,155path: join(path, fields.fullPath?.[0] ?? filename),156maxWait: MAX_UPLOAD_TIME_MS,157});158// console.log("NATS: finished writing ", filename);159} catch (err) {160// console.log("NATS: error ", err);161errors[key].push(`${err}`);162} finally {163// console.log("NATS: freeing write stream");164freeWriteStream({165project_id,166compute_server_id,167path,168filename,169});170finished[key].state = true;171finished[key].cb();172}173})();174}175if (index == count - 1) {176// console.log("finish");177if (!done.state) {178const f = (cb) => {179done.cb = cb;180};181await callback(f);182}183stream.end();184if (!finished[key].state) {185const f = (cb) => {186finished[key].cb = cb;187};188await callback(f);189}190delete finished[key];191}192if ((errors[key]?.length ?? 0) > 0) {193// console.log("saying upload failed");194let e = errors[key].join(", ");195if (e.includes("Error: 503")) {196e += ", Upload service not running.";197}198res.status(500).send(`Upload failed: ${e}`);199} else {200// console.log("saying upload worked");201res.send({ status: "ok" });202}203}204205function getKey(opts) {206return JSON.stringify(opts);207}208209const streams: any = {};210export function getWriteStream(opts) {211const key = getKey(opts);212let totalStream = streams[key];213if (totalStream == null) {214totalStream = new PassThrough();215streams[key] = totalStream;216}217const chunkStream = new PassThrough();218return { chunkStream, totalStream };219}220221function freeWriteStream(opts) {222delete streams[getKey(opts)];223}224225226