Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/conat/files/read.ts
1452 views
1
/*
2
Read a file from a project/compute server via an async generator, so it is memory
3
efficient.
4
5
This is a conat service that uses requestMany, takes as input a filename path, and streams all
6
the binary data from that path.
7
8
We use headers to add sequence numbers into the response messages.
9
10
This is useful to implement:
11
12
- an http server for downloading any file, even large ones.
13
14
15
IDEAS:
16
17
- we could also implement a version of this that takes a directory
18
as input, runs compressed tar on it, and pipes the output into
19
response messages. We could then implement streaming download of
20
a tarball of a directory tree, or also copying a directory tree from
21
one place to another (without using rsync). I've done this already
22
over a websocket for compute servers, so would just copy that code.
23
24
25
DEVELOPMENT:
26
27
See src/packages/backend/conat/test/files/read.test.ts for unit tests.
28
29
~/cocalc/src/packages/backend$ node
30
31
require('@cocalc/backend/conat'); a = require('@cocalc/conat/files/read'); a.createServer({project_id:'00847397-d6a8-4cb0-96a8-6ef64ac3e6cf',compute_server_id:0,createReadStream:require('fs').createReadStream})
32
33
for await (const chunk of await a.readFile({project_id:'00847397-d6a8-4cb0-96a8-6ef64ac3e6cf',compute_server_id:0,path:'/tmp/a'})) { console.log({chunk}); }
34
35
36
for await (const chunk of await a.readFile({project_id:'00847397-d6a8-4cb0-96a8-6ef64ac3e6cf',compute_server_id:0,path:'/projects/6b851643-360e-435e-b87e-f9a6ab64a8b1/cocalc/.git/objects/pack/pack-771f7fe4ee855601463be070cf9fb9afc91f84ac.pack'})) { console.log({chunk}); }
37
38
39
*/
40
41
import { conat } from "@cocalc/conat/client";
42
import { projectSubject } from "@cocalc/conat/names";
43
import { type Subscription } from "@cocalc/conat/core/client";
44
45
let subs: { [name: string]: Subscription } = {};
46
export async function close({ project_id, compute_server_id, name = "" }) {
47
const subject = getSubject({ project_id, compute_server_id, name });
48
if (subs[subject] == null) {
49
return;
50
}
51
const sub = subs[subject];
52
delete subs[subject];
53
await sub.drain();
54
}
55
56
function getSubject({ project_id, compute_server_id, name = "" }) {
57
return projectSubject({
58
project_id,
59
compute_server_id,
60
service: `files:read${name ?? ""}`,
61
});
62
}
63
64
export async function createServer({
65
createReadStream,
66
project_id,
67
compute_server_id,
68
name = "",
69
}) {
70
const subject = getSubject({
71
project_id,
72
compute_server_id,
73
name,
74
});
75
const cn = await conat();
76
const sub = await cn.subscribe(subject);
77
subs[subject] = sub;
78
listen({ sub, createReadStream });
79
}
80
81
async function listen({ sub, createReadStream }) {
82
// NOTE: we just handle as many messages as we get in parallel, so this
83
// could be a large number of simultaneous downloads. These are all by
84
// authenticated users of the project, and the load is on the project,
85
// so I think that makes sense.
86
for await (const mesg of sub) {
87
handleMessage(mesg, createReadStream);
88
}
89
}
90
91
async function handleMessage(mesg, createReadStream) {
92
try {
93
await sendData(mesg, createReadStream);
94
await mesg.respond(null, { headers: { done: true } });
95
} catch (err) {
96
// console.log("sending ERROR", err);
97
mesg.respondSync(null, { headers: { error: `${err}` } });
98
}
99
}
100
101
const MAX_CHUNK_SIZE = 16384 * 16 * 3;
102
103
function getSeqHeader(seq) {
104
return { headers: { seq } };
105
}
106
107
async function sendData(mesg, createReadStream) {
108
const { path } = mesg.data;
109
let seq = 0;
110
for await (let chunk of createReadStream(path, {
111
highWaterMark: 16384 * 16 * 3,
112
})) {
113
// console.log("sending ", { seq, bytes: chunk.length });
114
// We must break the chunk into smaller messages or it will
115
// get bounced by conat...
116
while (chunk.length > 0) {
117
seq += 1;
118
mesg.respondSync(chunk.slice(0, MAX_CHUNK_SIZE), getSeqHeader(seq));
119
chunk = chunk.slice(MAX_CHUNK_SIZE);
120
}
121
}
122
}
123
124
export interface ReadFileOptions {
125
project_id: string;
126
compute_server_id?: number;
127
path: string;
128
name?: string;
129
maxWait?: number;
130
}
131
132
export async function* readFile({
133
project_id,
134
compute_server_id = 0,
135
path,
136
name = "",
137
maxWait = 1000 * 60 * 10, // 10 minutes
138
}: ReadFileOptions) {
139
const cn = await conat();
140
const subject = getSubject({
141
project_id,
142
compute_server_id,
143
name,
144
});
145
const v: any = [];
146
let seq = 0;
147
let bytes = 0;
148
for await (const resp of await cn.requestMany(
149
subject,
150
{ path },
151
{
152
maxWait,
153
},
154
)) {
155
if (resp.headers == null) {
156
continue;
157
}
158
if (resp.headers.error) {
159
throw Error(`${resp.headers.error}`);
160
}
161
if (resp.headers.done) {
162
return;
163
}
164
if (resp.headers.seq) {
165
const next = resp.headers.seq as number;
166
bytes = resp.data.length;
167
// console.log("received seq", { seq: next, bytes });
168
if (next != seq + 1) {
169
throw Error(`lost data: seq=${seq}, next=${next}`);
170
}
171
seq = next;
172
}
173
yield resp.data;
174
}
175
if (bytes != 0) {
176
throw Error("truncated");
177
}
178
// console.log("done!");
179
return v;
180
}
181
182