/*1Read a file from a project/compute server via an async generator, so it is memory2efficient.34This is a conat service that uses requestMany, takes as input a filename path, and streams all5the binary data from that path.67We use headers to add sequence numbers into the response messages.89This is useful to implement:1011- an http server for downloading any file, even large ones.121314IDEAS:1516- we could also implement a version of this that takes a directory17as input, runs compressed tar on it, and pipes the output into18response messages. We could then implement streaming download of19a tarball of a directory tree, or also copying a directory tree from20one place to another (without using rsync). I've done this already21over a websocket for compute servers, so would just copy that code.222324DEVELOPMENT:2526See src/packages/backend/conat/test/files/read.test.ts for unit tests.2728~/cocalc/src/packages/backend$ node2930require('@cocalc/backend/conat'); a = require('@cocalc/conat/files/read'); a.createServer({project_id:'00847397-d6a8-4cb0-96a8-6ef64ac3e6cf',compute_server_id:0,createReadStream:require('fs').createReadStream})3132for await (const chunk of await a.readFile({project_id:'00847397-d6a8-4cb0-96a8-6ef64ac3e6cf',compute_server_id:0,path:'/tmp/a'})) { console.log({chunk}); }333435for await (const chunk of await a.readFile({project_id:'00847397-d6a8-4cb0-96a8-6ef64ac3e6cf',compute_server_id:0,path:'/projects/6b851643-360e-435e-b87e-f9a6ab64a8b1/cocalc/.git/objects/pack/pack-771f7fe4ee855601463be070cf9fb9afc91f84ac.pack'})) { console.log({chunk}); }363738*/3940import { conat } from "@cocalc/conat/client";41import { projectSubject } from "@cocalc/conat/names";42import { type Subscription } from "@cocalc/conat/core/client";4344let subs: { [name: string]: Subscription } = {};45export async function close({ project_id, compute_server_id, name = "" }) {46const subject = getSubject({ project_id, compute_server_id, name });47if (subs[subject] == null) {48return;49}50const sub = subs[subject];51delete subs[subject];52await sub.drain();53}5455function getSubject({ project_id, compute_server_id, name = "" }) {56return projectSubject({57project_id,58compute_server_id,59service: `files:read${name ?? ""}`,60});61}6263export async function createServer({64createReadStream,65project_id,66compute_server_id,67name = "",68}) {69const subject = getSubject({70project_id,71compute_server_id,72name,73});74const cn = await conat();75const sub = await cn.subscribe(subject);76subs[subject] = sub;77listen({ sub, createReadStream });78}7980async function listen({ sub, createReadStream }) {81// NOTE: we just handle as many messages as we get in parallel, so this82// could be a large number of simultaneous downloads. These are all by83// authenticated users of the project, and the load is on the project,84// so I think that makes sense.85for await (const mesg of sub) {86handleMessage(mesg, createReadStream);87}88}8990async function handleMessage(mesg, createReadStream) {91try {92await sendData(mesg, createReadStream);93await mesg.respond(null, { headers: { done: true } });94} catch (err) {95// console.log("sending ERROR", err);96mesg.respondSync(null, { headers: { error: `${err}` } });97}98}99100const MAX_CHUNK_SIZE = 16384 * 16 * 3;101102function getSeqHeader(seq) {103return { headers: { seq } };104}105106async function sendData(mesg, createReadStream) {107const { path } = mesg.data;108let seq = 0;109for await (let chunk of createReadStream(path, {110highWaterMark: 16384 * 16 * 3,111})) {112// console.log("sending ", { seq, bytes: chunk.length });113// We must break the chunk into smaller messages or it will114// get bounced by conat...115while (chunk.length > 0) {116seq += 1;117mesg.respondSync(chunk.slice(0, MAX_CHUNK_SIZE), getSeqHeader(seq));118chunk = chunk.slice(MAX_CHUNK_SIZE);119}120}121}122123export interface ReadFileOptions {124project_id: string;125compute_server_id?: number;126path: string;127name?: string;128maxWait?: number;129}130131export async function* readFile({132project_id,133compute_server_id = 0,134path,135name = "",136maxWait = 1000 * 60 * 10, // 10 minutes137}: ReadFileOptions) {138const cn = await conat();139const subject = getSubject({140project_id,141compute_server_id,142name,143});144const v: any = [];145let seq = 0;146let bytes = 0;147for await (const resp of await cn.requestMany(148subject,149{ path },150{151maxWait,152},153)) {154if (resp.headers == null) {155continue;156}157if (resp.headers.error) {158throw Error(`${resp.headers.error}`);159}160if (resp.headers.done) {161return;162}163if (resp.headers.seq) {164const next = resp.headers.seq as number;165bytes = resp.data.length;166// console.log("received seq", { seq: next, bytes });167if (next != seq + 1) {168throw Error(`lost data: seq=${seq}, next=${next}`);169}170seq = next;171}172yield resp.data;173}174if (bytes != 0) {175throw Error("truncated");176}177// console.log("done!");178return v;179}180181182