Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/sync-fs/lib/index.ts
1496 views
1
/*
2
* This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.
3
* License: MS-RSL – see LICENSE.md for details
4
*/
5
6
import {
7
copyFile,
8
mkdir,
9
open,
10
rename,
11
rm,
12
stat,
13
writeFile,
14
} from "fs/promises";
15
import { basename, dirname, join } from "path";
16
import type { FilesystemState /*FilesystemStatePatch*/ } from "./types";
17
import { exec, mtimeDirTree, parseCommonPrefixes, remove } from "./util";
18
import { toCompressedJSON } from "./compressed-json";
19
import SyncClient, { type Role } from "@cocalc/sync-client/lib/index";
20
import { encodeIntToUUID } from "@cocalc/util/compute/manager";
21
import getLogger from "@cocalc/backend/logger";
22
import { apiCall } from "@cocalc/api-client";
23
import mkdirp from "mkdirp";
24
import { throttle } from "lodash";
25
import { trunc_middle } from "@cocalc/util/misc";
26
import getListing from "@cocalc/backend/get-listing";
27
import { executeCode } from "@cocalc/backend/execute-code";
28
import { delete_files } from "@cocalc/backend/files/delete-files";
29
import { move_files } from "@cocalc/backend/files/move-files";
30
import { rename_file } from "@cocalc/backend/files/rename-file";
31
import { initConatClientService } from "./conat/syncfs-client";
32
import { initConatServerService } from "./conat/syncfs-server";
33
34
const EXPLICIT_HIDDEN_EXCLUDES = [".cache", ".local"];
35
36
const log = getLogger("sync-fs:index").debug;
37
const REGISTER_INTERVAL_MS = 30000;
38
39
export default function syncFS(opts: Options) {
40
log("syncFS: ", opts);
41
return new SyncFS(opts);
42
}
43
44
type State = "init" | "ready" | "sync" | "closed";
45
46
interface Options {
47
lower: string;
48
upper: string;
49
mount: string;
50
project_id: string;
51
compute_server_id: number;
52
// sync at most every this many seconds
53
syncIntervalMin?: number;
54
// but up to this long if there is no activity (exponential backoff)
55
syncIntervalMax?: number;
56
// list of top-level directory names that are excluded from sync.
57
// do not use wildcards.
58
// RECOMMEND: hidden files in HOME should be excluded, which you can do by including "./*"
59
// ALSO: if you have "~" or "." in the exclude array, then sync is completely disabled.
60
exclude?: string[];
61
readTrackingFile?: string;
62
tar: { send; get };
63
compression?: "lz4"; // default 'lz4'
64
data?: string; // absolute path to data directory (default: /data)
65
role: Role;
66
}
67
68
const UNIONFS = ".unionfs-fuse";
69
// Do not make this too short, since every time it happens, the project has to
70
// do a find scan, which can take some resources!
71
const DEFAULT_SYNC_INTERVAL_MIN_S = 10;
72
// no idea what this *should* be.
73
const DEFAULT_SYNC_INTERVAL_MAX_S = 30;
74
75
// if sync fails this many times in a row, then we pause syncing until the user
76
// explicitly re-enables it. We have to do this, since the failure mode could
77
// result in massive bandwidth usage.
78
const MAX_FAILURES_IN_A_ROW = 3;
79
80
export class SyncFS {
81
private state: State = "init";
82
private lower: string;
83
private upper: string;
84
private mount: string;
85
private data: string;
86
private project_id: string;
87
private compute_server_id: number;
88
private syncInterval: number;
89
private registerToSyncInterval?;
90
private syncIntervalMin: number;
91
private syncIntervalMax: number;
92
private exclude: string[];
93
private readTrackingFile?: string;
94
private scratch: string;
95
private error_txt: string;
96
private tar: { send; get };
97
// number of failures in a row to sync.
98
private numFails: number = 0;
99
private conatService;
100
101
private client: SyncClient;
102
103
private timeout;
104
private websocket?;
105
106
private role: Role;
107
108
constructor({
109
lower,
110
upper,
111
mount,
112
project_id,
113
compute_server_id,
114
syncIntervalMin = DEFAULT_SYNC_INTERVAL_MIN_S,
115
syncIntervalMax = DEFAULT_SYNC_INTERVAL_MAX_S,
116
exclude = [],
117
readTrackingFile,
118
tar,
119
compression = "lz4",
120
data = "/data",
121
role,
122
}: Options) {
123
this.role = role;
124
this.lower = lower;
125
this.upper = upper;
126
this.mount = mount;
127
this.data = data;
128
this.project_id = project_id;
129
this.compute_server_id = compute_server_id;
130
this.exclude = exclude;
131
this.syncInterval = syncIntervalMin;
132
this.syncIntervalMin = syncIntervalMin;
133
this.syncIntervalMax = syncIntervalMax;
134
this.readTrackingFile = readTrackingFile;
135
this.scratch = join(
136
this.lower,
137
".compute-servers",
138
`${this.compute_server_id}`,
139
);
140
this.client = new SyncClient({
141
project_id: this.project_id,
142
client_id: encodeIntToUUID(this.compute_server_id),
143
role,
144
});
145
this.state = "ready";
146
this.error_txt = join(this.scratch, "error.txt");
147
if (!compression) {
148
this.tar = tar;
149
} else if (compression == "lz4") {
150
const alter = (v) => ["-I", "lz4"].concat(v);
151
this.tar = {
152
send: async ({ createArgs, extractArgs, HOME }) => {
153
createArgs = alter(createArgs);
154
extractArgs = alter(extractArgs);
155
await tar.send({ createArgs, extractArgs, HOME });
156
},
157
get: async ({ createArgs, extractArgs, HOME }) => {
158
createArgs = alter(createArgs);
159
extractArgs = alter(extractArgs);
160
await tar.get({ createArgs, extractArgs, HOME });
161
},
162
};
163
} else {
164
throw Error(`invalid compression: '${compression}'`);
165
}
166
}
167
168
init = async () => {
169
await this.initConatService();
170
await this.mountUnionFS();
171
await this.bindMountExcludes();
172
await this.makeScratchDir();
173
try {
174
await rm(this.error_txt);
175
} catch (_) {}
176
await this.initSyncRequestHandler();
177
await this.syncLoop();
178
};
179
180
close = async () => {
181
log("close");
182
if (this.state == "closed") {
183
return;
184
}
185
this.state = "closed";
186
if (this.conatService != null) {
187
this.conatService.close();
188
delete this.conatService;
189
}
190
if (this.timeout != null) {
191
clearTimeout(this.timeout);
192
delete this.timeout;
193
}
194
if (this.registerToSyncInterval) {
195
clearInterval(this.registerToSyncInterval);
196
delete this.registerToSyncInterval;
197
}
198
const args = ["-uz", this.mount];
199
log("fusermount", args.join(" "));
200
try {
201
await exec("fusermount", args);
202
} catch (err) {
203
log("fusermount fail -- ", err);
204
}
205
try {
206
await this.unmountExcludes();
207
} catch (err) {
208
log("unmountExcludes fail -- ", err);
209
}
210
this.websocket?.removeListener("data", this.handleApiRequest);
211
this.websocket?.removeListener("state", this.registerToSync);
212
};
213
214
// The sync api listens on the project websocket for requests
215
// to do a sync. There's no response (for now).
216
// Project --> ComputeServer: "heh, please do a sync now"
217
private initSyncRequestHandler = async () => {
218
log("initSyncRequestHandler: installing sync request handler");
219
this.websocket = await this.client.project_client.websocket(
220
this.project_id,
221
);
222
this.websocket.on("data", this.handleApiRequest);
223
log("initSyncRequestHandler: installed handler");
224
this.registerToSync();
225
// We use *both* a period interval and websocket state change,
226
// since we can't depend on just the state change to always
227
// be enough, unfortunately... :-(
228
this.registerToSyncInterval = setInterval(
229
this.registerToSync,
230
REGISTER_INTERVAL_MS,
231
);
232
this.websocket.on("state", this.registerToSync);
233
};
234
235
private registerToSync = async (state = "online") => {
236
if (state != "online") return;
237
try {
238
log("registerToSync: registering");
239
const api = await this.client.project_client.api(this.project_id);
240
await api.computeServerSyncRegister(this.compute_server_id);
241
await apiCall("v2/compute/set-detailed-state", {
242
id: this.compute_server_id,
243
state: "ready",
244
progress: 100,
245
name: "filesystem",
246
timeout: Math.round(REGISTER_INTERVAL_MS / 1000) + 15,
247
});
248
log("registerToSync: registered");
249
} catch (err) {
250
log("registerToSync: ERROR -- ", err);
251
}
252
};
253
254
private handleApiRequest = async (data) => {
255
try {
256
log("handleApiRequest:", { data });
257
const resp = await this.doApiRequest(data);
258
log("handleApiRequest: ", { resp });
259
if (data.id && this.websocket != null) {
260
this.websocket.write({
261
id: data.id,
262
resp,
263
});
264
}
265
} catch (err) {
266
// console.trace(err);
267
const error = `${err}`;
268
if (data.id && this.websocket != null) {
269
log("handleApiRequest: returning error", { event: data?.event, error });
270
this.websocket.write({
271
id: data.id,
272
error,
273
});
274
} else {
275
log("handleApiRequest: ignoring error", { event: data?.event, error });
276
}
277
}
278
};
279
280
doApiRequest = async (data) => {
281
log("doApiRequest", { data });
282
switch (data?.event) {
283
case "compute_server_sync_request":
284
try {
285
if (this.state == "sync") {
286
// already in progress
287
return;
288
}
289
if (!this.syncIsDisabled()) {
290
await this.sync();
291
}
292
log("doApiRequest: sync worked");
293
} catch (err) {
294
log("doApiRequest: sync failed", err);
295
}
296
return;
297
298
case "copy_from_project_to_compute_server":
299
case "copy_from_compute_server_to_project": {
300
const extractArgs = ["-x"];
301
extractArgs.push("-C");
302
extractArgs.push(data.dest ? data.dest : ".");
303
const HOME = this.mount;
304
for (const { prefix, paths } of parseCommonPrefixes(data.paths)) {
305
const createArgs = ["-c", "-C", prefix, ...paths];
306
log({ extractArgs, createArgs });
307
if (data.event == "copy_from_project_to_compute_server") {
308
await this.tar.get({
309
createArgs,
310
extractArgs,
311
HOME,
312
});
313
} else if (data.event == "copy_from_compute_server_to_project") {
314
await this.tar.send({
315
createArgs,
316
extractArgs,
317
HOME,
318
});
319
} else {
320
// impossible
321
throw Error(`bug -- invalid event ${data.event}`);
322
}
323
}
324
return;
325
}
326
327
case "listing":
328
return await getListing(data.path, data.hidden, { HOME: this.mount });
329
330
case "exec":
331
return await executeCode({ ...data.opts, home: this.mount });
332
333
case "delete_files":
334
return await delete_files(data.paths, this.mount);
335
336
case "move_files":
337
return await move_files(
338
data.paths,
339
data.dest,
340
(path) => this.client.set_deleted(path),
341
this.mount,
342
);
343
case "rename_file":
344
return await rename_file(
345
data.src,
346
data.dest,
347
(path) => this.client.set_deleted(path),
348
this.mount,
349
);
350
351
default:
352
throw Error(`unknown event '${data?.event}'`);
353
}
354
};
355
356
private mountUnionFS = async () => {
357
// NOTE: allow_other is essential to allow bind mounted as root
358
// of fast scratch directories into HOME!
359
// unionfs-fuse -o allow_other,auto_unmount,nonempty,large_read,cow,max_files=32768 /upper=RW:/home/user=RO /merged
360
await exec("unionfs-fuse", [
361
"-o",
362
"allow_other,auto_unmount,nonempty,large_read,cow,max_files=32768",
363
`${this.upper}=RW:${this.lower}=RO`,
364
this.mount,
365
]);
366
};
367
368
private shouldMountExclude = (path) => {
369
return (
370
path &&
371
!path.startsWith(".") &&
372
!path.startsWith("/") &&
373
path != "~" &&
374
!path.includes("/")
375
);
376
};
377
378
private unmountExcludes = async () => {
379
for (const path of this.exclude) {
380
if (this.shouldMountExclude(path)) {
381
try {
382
const target = join(this.mount, path);
383
log("unmountExcludes -- unmounting", { target });
384
await exec("sudo", ["umount", target]);
385
} catch (err) {
386
log("unmountExcludes -- warning ", err);
387
}
388
}
389
}
390
};
391
392
private bindMountExcludes = async () => {
393
// Setup bind mounds for each excluded directory, e.g.,
394
// mount --bind /data/scratch /home/user/scratch
395
for (const path of this.exclude) {
396
if (this.shouldMountExclude(path)) {
397
log("bindMountExcludes -- mounting", { path });
398
const source = join(this.data, path);
399
const target = join(this.mount, path);
400
const upper = join(this.upper, path);
401
log("bindMountExcludes -- mounting", { source, target });
402
await mkdirp(source);
403
// Yes, we have to mkdir in the upper level of the unionfs, because
404
// we excluded this path from the websocketfs metadataFile caching.
405
await mkdirp(upper);
406
await exec("sudo", ["mount", "--bind", source, target]);
407
} else {
408
log("bindMountExcludes -- skipping", { path });
409
}
410
}
411
// The following are (1) not mounted above due to shouldMountExclude,
412
// and (2) always get exclued, and (3) start with . so could conflict
413
// with the unionfs upper layer, so we change them:
414
for (const path of EXPLICIT_HIDDEN_EXCLUDES) {
415
log("bindMountExcludes -- explicit hidden path ", { path });
416
const source = join(this.data, `.explicit${path}`);
417
const target = join(this.mount, path);
418
const upper = join(this.upper, path);
419
log("bindMountExcludes -- explicit hidden path", { source, target });
420
await mkdirp(source);
421
await mkdirp(upper);
422
await exec("sudo", ["mount", "--bind", source, target]);
423
}
424
};
425
426
public sync = async () => {
427
if (this.state == "sync") {
428
throw Error("sync currently in progress");
429
}
430
if (this.state != "ready") {
431
throw Error(
432
`can only sync when state is ready but state is "${this.state}"`,
433
);
434
}
435
log("sync: doing a sync");
436
const t0 = Date.now();
437
try {
438
this.state = "sync";
439
await this.__doSync();
440
this.numFails = 0; // it worked
441
this.reportState({
442
state: "ready",
443
progress: 100,
444
timeout: 3 + this.syncInterval,
445
});
446
} catch (err) {
447
this.numFails += 1;
448
let extra;
449
let message = trunc_middle(`${err.message}`, 500);
450
if (this.numFails >= MAX_FAILURES_IN_A_ROW) {
451
extra = `XXX Sync failed ${MAX_FAILURES_IN_A_ROW} in a row. FIX THE PROBLEM, THEN CLEAR THIS ERROR TO RESUME SYNC. -- ${message}`;
452
} else {
453
extra = `XXX Sync failed ${this.numFails} times in a row with -- ${message}`;
454
}
455
// extra here sets visible error state that the user sees.
456
this.reportState({ state: "error", extra, timeout: 60, progress: 0 });
457
await this.logSyncError(extra);
458
throw Error(extra);
459
} finally {
460
if (this.state != ("closed" as State)) {
461
this.state = "ready";
462
}
463
log("sync - done, time=", (Date.now() - t0) / 1000);
464
}
465
};
466
467
private syncIsDisabled = () => {
468
if (this.exclude.includes("~") || this.exclude.includes(".")) {
469
log("syncLoop: '~' or '.' is included in excludes, so we never sync");
470
return true;
471
}
472
return false;
473
};
474
475
private syncLoop = async () => {
476
if (this.syncIsDisabled()) {
477
const wait = 1000 * 60;
478
log(`syncLoop -- sleeping ${wait / 1000} seconds...`);
479
this.timeout = setTimeout(this.syncLoop, wait);
480
return;
481
}
482
const t0 = Date.now();
483
if (this.state == "ready") {
484
log("syncLoop: ready");
485
try {
486
if (this.numFails >= MAX_FAILURES_IN_A_ROW) {
487
// TODO: get the current error message and if cleared do sync. Otherwise:
488
const detailedState = await this.getDetailedState();
489
if (
490
detailedState &&
491
(!detailedState.extra || detailedState.state != "error")
492
) {
493
log("syncLoop: resuming sync since error was cleared");
494
this.numFails = 0;
495
await this.sync();
496
} else {
497
log(
498
`syncLoop: not syncing due to failing ${this.numFails} times in a row. Will restart when error message is cleared.`,
499
);
500
}
501
} else {
502
await this.sync();
503
}
504
} catch (err) {
505
// This might happen if there is a lot of filesystem activity,
506
// which changes things during the sync.
507
// NOTE: the error message can be VERY long, including
508
// all the output filenames.
509
log(err.message);
510
// In case of error, we aggressively back off to reduce impact.
511
this.syncInterval = Math.min(
512
this.syncIntervalMax,
513
1.5 * this.syncInterval,
514
);
515
}
516
} else {
517
log("sync: skipping since state = ", this.state);
518
}
519
// We always wait as long as the last sync took plus the
520
// next interval. This way if sync is taking a long time
521
// due to huge files or load, we spread it out, up to a point,
522
// which is maybe a good idea. If sync is fast, it's fine
523
// to do it frequently.
524
const wait = Math.min(
525
this.syncIntervalMax * 1000,
526
this.syncInterval * 1000 + (Date.now() - t0),
527
);
528
log(`syncLoop -- sleeping ${wait / 1000} seconds...`);
529
this.timeout = setTimeout(this.syncLoop, wait);
530
};
531
532
private makeScratchDir = async () => {
533
await mkdir(this.scratch, { recursive: true });
534
};
535
536
private logSyncError = async (mesg: string) => {
537
try {
538
await writeFile(this.error_txt, mesg);
539
} catch (err) {
540
log(`UNABLE to log sync err -- ${err}`);
541
}
542
};
543
544
// Save current state to database; useful to inform user as to what is going on.
545
// We throttle this, because if you call it, then immediately call it again,
546
// two different hub servers basically gets two different stats at the same time,
547
// and which state is saved to the database is pretty random! By spacing this out
548
// by 2s, such a problem is vastly less likely.
549
private reportState = throttle(
550
async (opts: { state; extra?; timeout?; progress? }) => {
551
log("reportState", opts);
552
try {
553
await apiCall("v2/compute/set-detailed-state", {
554
id: this.compute_server_id,
555
name: "filesystem-sync",
556
...opts,
557
});
558
} catch (err) {
559
log("reportState: WARNING -- ", err);
560
}
561
},
562
1500,
563
{ leading: true, trailing: true },
564
);
565
566
private getDetailedState = async () => {
567
return await apiCall("v2/compute/get-detailed-state", {
568
id: this.compute_server_id,
569
name: "filesystem-sync",
570
});
571
};
572
573
// ONLY call this from this.sync!
574
private __doSync = async () => {
575
log("doSync");
576
this.reportState({ state: "get-compute-state", progress: 0, timeout: 10 });
577
await this.makeScratchDir();
578
const api = await this.client.project_client.api(this.project_id);
579
const { computeState, whiteouts } = await this.getComputeState();
580
// log("doSync", computeState, whiteouts);
581
const computeStateJson = join(
582
".compute-servers",
583
`${this.compute_server_id}`,
584
"compute-state.json.lz4",
585
);
586
await writeFile(
587
join(this.lower, computeStateJson),
588
await toCompressedJSON(computeState),
589
);
590
this.reportState({
591
state: "send-state-to-project",
592
progress: 20,
593
timeout: 10,
594
});
595
const { removeFromCompute, copyFromCompute, copyFromProjectTar } =
596
await api.syncFS({
597
computeStateJson,
598
exclude: this.exclude,
599
compute_server_id: this.compute_server_id,
600
now: Date.now(),
601
});
602
603
// log("doSync", { removeFromCompute, copyFromCompute, copyFromProjectTar });
604
let isActive = false;
605
if (whiteouts.length > 0) {
606
isActive = true;
607
await remove(whiteouts, join(this.upper, UNIONFS));
608
}
609
if (removeFromCompute?.length ?? 0 > 0) {
610
isActive = true;
611
await remove(removeFromCompute, this.upper);
612
}
613
if (copyFromCompute?.length ?? 0 > 0) {
614
isActive = true;
615
this.reportState({
616
state: `send-${copyFromCompute?.length ?? 0}-files-to-project`,
617
progress: 50,
618
});
619
await this.sendFiles(copyFromCompute);
620
}
621
if (copyFromProjectTar) {
622
isActive = true;
623
this.reportState({
624
state: "receive-files-from-project",
625
progress: 70,
626
});
627
await this.receiveFiles(copyFromProjectTar);
628
}
629
log("DONE receiving files from project as part of sync");
630
631
if (isActive) {
632
this.syncInterval = this.syncIntervalMin;
633
} else {
634
// exponential backoff when not active
635
this.syncInterval = Math.min(
636
this.syncIntervalMax,
637
1.3 * this.syncInterval,
638
);
639
}
640
await this.updateReadTracking();
641
};
642
643
// private getComputeStatePatch = async (
644
// lastState: FilesystemState,
645
// ): Promise<FilesystemStatePatch> => {
646
// // todo -- whiteouts?
647
// const { computeState: newState } = await this.getComputeState();
648
// return makePatch(lastState, newState);
649
// };
650
651
private getComputeState = async (): Promise<{
652
computeState: FilesystemState;
653
whiteouts: string[];
654
}> => {
655
// Create the map from all paths in upper (both directories and files and whiteouts),
656
// except ones excluded from sync, to the ctime for the path (or negative mtime
657
// for deleted paths): {[path:string]:mtime of last change to file metadata}
658
const whiteLen = "_HIDDEN~".length;
659
const computeState = await mtimeDirTree({
660
path: this.upper,
661
exclude: this.exclude,
662
});
663
const whiteouts: string[] = [];
664
const unionfs = join(this.upper, UNIONFS);
665
const mtimes = await mtimeDirTree({
666
path: unionfs,
667
exclude: [],
668
});
669
for (const path in mtimes) {
670
const mtime = mtimes[path];
671
if (path.endsWith("_HIDDEN~")) {
672
const p = path.slice(0, -whiteLen);
673
whiteouts.push(path);
674
if ((await stat(join(unionfs, path))).isDirectory()) {
675
whiteouts.push(p);
676
}
677
computeState[p] = -mtime;
678
}
679
}
680
681
return { computeState, whiteouts };
682
};
683
684
private sendFiles = async (files: string[]) => {
685
const target = join(this.scratch, "copy-to-project");
686
log("sendFiles: sending ", files.length, "files listed in ", target);
687
const file = await open(target, "w");
688
await file.write(files.join("\0"));
689
await file.close();
690
const createArgs = [
691
"-c",
692
"--null",
693
"--no-recursion",
694
"--verbatim-files-from",
695
"--files-from",
696
target,
697
];
698
const extractArgs = ["--delay-directory-restore", "-x"];
699
await this.tar.send({ createArgs, extractArgs });
700
log("sendFiles: ", files.length, "sent");
701
};
702
703
// pathToFileList is the path to a file in the file system on
704
// in the project that has the names of the files to copy to
705
// the compute server.
706
private receiveFiles = async (pathToFileList: string) => {
707
log("receiveFiles: getting files in from project -- ", pathToFileList);
708
// this runs in the project
709
const createArgs = [
710
"-c",
711
"--null",
712
"--no-recursion",
713
"--verbatim-files-from",
714
"--files-from",
715
pathToFileList,
716
];
717
// this runs here
718
const extractArgs = ["--delay-directory-restore", "-x"];
719
await this.tar.get({
720
createArgs,
721
extractArgs,
722
});
723
log("receiveFiles: files in ", pathToFileList, "received from project");
724
};
725
726
private updateReadTracking = async () => {
727
if (!this.readTrackingFile) {
728
return;
729
}
730
// 1. Move the read tracking file to the project. We do a move, so atomic
731
// and new writes go to a new file and nothing is missed.
732
// 2. Call tar.get to grab the files.
733
// NOTE: read tracking isn't triggered on any files that were copied over,
734
// since unionfs reads those from the local cache (stat doesn't count), so
735
// we don't have to filter those out.
736
737
// We make any errors below WARNINGS that do not throw an exception, because
738
// this is an optimization, not critical for sync, and each time we do it,
739
// things are reset.
740
const readTrackingOnProject = join(
741
".compute-servers",
742
`${this.compute_server_id}`,
743
"read-tracking",
744
);
745
this.reportState({
746
state: "cache-files-from-project",
747
progress: 80,
748
});
749
try {
750
try {
751
// move the file; first locally, then copy across devices, then delete.
752
// This is to make the initial mv atomic so we don't miss anything.
753
const tmp = join(
754
dirname(this.readTrackingFile),
755
`.${basename(this.readTrackingFile)}.tmp`,
756
);
757
await rename(this.readTrackingFile, tmp); // should be atomic
758
await copyFile(tmp, join(this.lower, readTrackingOnProject));
759
await rm(tmp);
760
} catch (err) {
761
if (err.code == "ENOENT") {
762
log(
763
`updateReadTracking -- no read tracking file '${this.readTrackingFile}'`,
764
);
765
return;
766
}
767
// this could be harmless, e.g., the file doesn't exist yet
768
log(
769
`updateReadTracking -- issue moving tracking file '${this.readTrackingFile}'`,
770
err,
771
);
772
return;
773
}
774
const createArgs = [
775
"-c",
776
"--null",
777
"--no-recursion",
778
"--verbatim-files-from",
779
"--files-from",
780
readTrackingOnProject,
781
];
782
const extractArgs = ["--keep-newer-files", "-x"];
783
log("updateReadTracking:", "tar", createArgs.join(" "));
784
try {
785
await this.tar.get({ createArgs, extractArgs });
786
} catch (err) {
787
log(
788
`updateReadTracking -- issue extracting tracking file '${this.readTrackingFile}'`,
789
err,
790
);
791
return;
792
}
793
} finally {
794
this.reportState({
795
state: "cache-files-from-project",
796
progress: 85,
797
});
798
}
799
};
800
801
initConatService = async () => {
802
if (this.role == "compute_server") {
803
this.conatService = await initConatClientService({
804
syncfs: this,
805
project_id: this.project_id,
806
compute_server_id: this.compute_server_id,
807
});
808
} else if (this.role == "project") {
809
this.conatService = await initConatServerService({
810
syncfs: this,
811
project_id: this.project_id,
812
});
813
} else {
814
throw Error("only compute_server and project roles are supported");
815
}
816
};
817
}
818
819