/*1Streaming write over Conat to a project or compute server.23This is a key component to support user uploads, while being memory efficient4by streaming the write. Basically it uses conat to support efficiently doing5streaming writes of files to any compute server or project that is somehow6connected to conat.78INSTRUCTIONS:910Import writeFile:1112import { writeFile } from "@cocalc/conat/files/write";1314Now you can write a given path to a project (or compute_server) as15simply as this:1617const stream = createReadStream('a file')18await writeFile({stream, project_id, compute_server_id, path, maxWait})1920- Here stream can be any readable stream, not necessarily a stream made using21a file. E.g., you could use PassThrough and explicitly write to it by22write calls.2324- maxWait is a time in ms after which if the file isn't fully written, everything25is cleaned up and there is an error.262728HOW THIS WORKS:2930Here's how this works from the side of the compute server:3132- We start a request/response conat server on the compute server:33- There's one message it accepts, which is:34"Using streaming download to get {path} from [subject]."35The sender of that message should set a long timeout (e.g., 10 minutes).36- It uses the streaming read functionality (in read.ts) to download and write37to disk the file {path}.38- When done it responds {status:"success"} or {status:'error', error:'message...'}3940Here's how it works from the side of whoever is sending the file:4142- Start read server at [subject] that can send {path}.43- Send a request saying "we are making {path} available to you at [subject]."44- Get back "ok" or error. On error (or timeout), close the read server.45- Serve {path} exactly once using the server. When finish sending {path},46close it and clean up. We're done.47484950DEVELOPMENT:5152See src/packages/backend/conat/test/files/write.test.ts for unit tests.5354~/cocalc/src/packages/backend$ node5556require('@cocalc/backend/conat'); a = require('@cocalc/conat/files/write');5758project_id = '00847397-d6a8-4cb0-96a8-6ef64ac3e6cf'; compute_server_id = 0; await a.createServer({project_id,compute_server_id,createWriteStream:require('fs').createWriteStream});5960stream=require('fs').createReadStream('env.ts');61await a.writeFile({stream, project_id, compute_server_id, path:'/tmp/a.ts'})6263*/6465import { conat } from "@cocalc/conat/client";66import { randomId } from "@cocalc/conat/names";67import {68close as closeReadService,69createServer as createReadServer,70readFile,71} from "./read";72import { projectSubject } from "@cocalc/conat/names";73import { type Subscription } from "@cocalc/conat/core/client";74import { type Readable } from "node:stream";7576function getWriteSubject({ project_id, compute_server_id }) {77return projectSubject({78project_id,79compute_server_id,80service: "files:write",81});82}8384let subs: { [name: string]: Subscription } = {};85export async function close({ project_id, compute_server_id }) {86const subject = getWriteSubject({ project_id, compute_server_id });87if (subs[subject] == null) {88return;89}90const sub = subs[subject];91delete subs[subject];92await sub.drain();93}9495export async function createServer({96project_id,97compute_server_id,98createWriteStream,99}: {100project_id: string;101compute_server_id: number;102// createWriteStream returns a writeable stream103// for writing the specified path to disk. It104// can be an async function.105createWriteStream: (path: string) => any;106}) {107const subject = getWriteSubject({ project_id, compute_server_id });108let sub = subs[subject];109if (sub != null) {110return;111}112const cn = await conat();113sub = await cn.subscribe(subject);114subs[subject] = sub;115listen({ sub, createWriteStream, project_id, compute_server_id });116}117118async function listen({119sub,120createWriteStream,121project_id,122compute_server_id,123}) {124// NOTE: we just handle as many messages as we get in parallel, so this125// could be a large number of simultaneous downloads. These are all by126// authenticated users of the project, and the load is on the project,127// so I think that makes sense.128for await (const mesg of sub) {129handleMessage({ mesg, createWriteStream, project_id, compute_server_id });130}131}132133async function handleMessage({134mesg,135createWriteStream,136project_id,137compute_server_id,138}) {139let error = "";140let writeStream: null | Awaited<ReturnType<typeof createWriteStream>> = null;141try {142const { path, name, maxWait } = mesg.data;143writeStream = await createWriteStream(path);144// console.log("created writeStream");145writeStream.on("error", (err) => {146error = `${err}`;147mesg.respondSync({ error, status: "error" });148console.warn(`error writing ${path}: ${error}`);149writeStream.emit("remove");150});151let chunks = 0;152let bytes = 0;153for await (const chunk of await readFile({154project_id,155compute_server_id,156name,157path,158maxWait,159})) {160if (error) {161// console.log("error", error);162writeStream.end();163return;164}165writeStream.write(chunk);166chunks += 1;167bytes += chunk.length;168// console.log("wrote ", bytes);169}170writeStream.end();171writeStream.emit("rename");172mesg.respondSync({ status: "success", bytes, chunks });173} catch (err) {174if (!error) {175mesg.respondSync({ error: `${err}`, status: "error" });176writeStream?.emit("remove");177}178}179}180181export interface WriteFileOptions {182project_id: string;183compute_server_id?: number;184path: string;185stream: Readable;186maxWait?: number;187}188189export async function writeFile({190project_id,191compute_server_id = 0,192path,193stream,194maxWait = 1000 * 60 * 10, // 10 minutes195}): Promise<{ bytes: number; chunks: number }> {196const name = randomId();197try {198function createReadStream() {199return stream;200}201// start read server202await createReadServer({203createReadStream,204project_id,205compute_server_id,206name,207});208// tell compute server to start reading our file.209const cn = await conat();210const resp = await cn.request(211getWriteSubject({ project_id, compute_server_id }),212{ name, path, maxWait },213{ timeout: maxWait },214);215const { error, bytes, chunks } = resp.data;216if (error) {217throw Error(error);218}219return { bytes, chunks };220} finally {221await closeReadService({ project_id, compute_server_id, name });222}223}224225226