import {
copyFile,
mkdir,
open,
rename,
rm,
stat,
writeFile,
} from "fs/promises";
import { basename, dirname, join } from "path";
import type { FilesystemState } from "./types";
import { exec, mtimeDirTree, parseCommonPrefixes, remove } from "./util";
import { toCompressedJSON } from "./compressed-json";
import SyncClient, { type Role } from "@cocalc/sync-client/lib/index";
import { encodeIntToUUID } from "@cocalc/util/compute/manager";
import getLogger from "@cocalc/backend/logger";
import { apiCall } from "@cocalc/api-client";
import mkdirp from "mkdirp";
import { throttle } from "lodash";
import { trunc_middle } from "@cocalc/util/misc";
import getListing from "@cocalc/backend/get-listing";
import { executeCode } from "@cocalc/backend/execute-code";
import { delete_files } from "@cocalc/backend/files/delete-files";
import { move_files } from "@cocalc/backend/files/move-files";
import { rename_file } from "@cocalc/backend/files/rename-file";
import { initConatClientService } from "./conat/syncfs-client";
import { initConatServerService } from "./conat/syncfs-server";
const EXPLICIT_HIDDEN_EXCLUDES = [".cache", ".local"];
const log = getLogger("sync-fs:index").debug;
const REGISTER_INTERVAL_MS = 30000;
export default function syncFS(opts: Options) {
log("syncFS: ", opts);
return new SyncFS(opts);
}
type State = "init" | "ready" | "sync" | "closed";
interface Options {
lower: string;
upper: string;
mount: string;
project_id: string;
compute_server_id: number;
syncIntervalMin?: number;
syncIntervalMax?: number;
exclude?: string[];
readTrackingFile?: string;
tar: { send; get };
compression?: "lz4";
data?: string;
role: Role;
}
const UNIONFS = ".unionfs-fuse";
const DEFAULT_SYNC_INTERVAL_MIN_S = 10;
const DEFAULT_SYNC_INTERVAL_MAX_S = 30;
const MAX_FAILURES_IN_A_ROW = 3;
export class SyncFS {
private state: State = "init";
private lower: string;
private upper: string;
private mount: string;
private data: string;
private project_id: string;
private compute_server_id: number;
private syncInterval: number;
private registerToSyncInterval?;
private syncIntervalMin: number;
private syncIntervalMax: number;
private exclude: string[];
private readTrackingFile?: string;
private scratch: string;
private error_txt: string;
private tar: { send; get };
private numFails: number = 0;
private conatService;
private client: SyncClient;
private timeout;
private websocket?;
private role: Role;
constructor({
lower,
upper,
mount,
project_id,
compute_server_id,
syncIntervalMin = DEFAULT_SYNC_INTERVAL_MIN_S,
syncIntervalMax = DEFAULT_SYNC_INTERVAL_MAX_S,
exclude = [],
readTrackingFile,
tar,
compression = "lz4",
data = "/data",
role,
}: Options) {
this.role = role;
this.lower = lower;
this.upper = upper;
this.mount = mount;
this.data = data;
this.project_id = project_id;
this.compute_server_id = compute_server_id;
this.exclude = exclude;
this.syncInterval = syncIntervalMin;
this.syncIntervalMin = syncIntervalMin;
this.syncIntervalMax = syncIntervalMax;
this.readTrackingFile = readTrackingFile;
this.scratch = join(
this.lower,
".compute-servers",
`${this.compute_server_id}`,
);
this.client = new SyncClient({
project_id: this.project_id,
client_id: encodeIntToUUID(this.compute_server_id),
role,
});
this.state = "ready";
this.error_txt = join(this.scratch, "error.txt");
if (!compression) {
this.tar = tar;
} else if (compression == "lz4") {
const alter = (v) => ["-I", "lz4"].concat(v);
this.tar = {
send: async ({ createArgs, extractArgs, HOME }) => {
createArgs = alter(createArgs);
extractArgs = alter(extractArgs);
await tar.send({ createArgs, extractArgs, HOME });
},
get: async ({ createArgs, extractArgs, HOME }) => {
createArgs = alter(createArgs);
extractArgs = alter(extractArgs);
await tar.get({ createArgs, extractArgs, HOME });
},
};
} else {
throw Error(`invalid compression: '${compression}'`);
}
}
init = async () => {
await this.initConatService();
await this.mountUnionFS();
await this.bindMountExcludes();
await this.makeScratchDir();
try {
await rm(this.error_txt);
} catch (_) {}
await this.initSyncRequestHandler();
await this.syncLoop();
};
close = async () => {
log("close");
if (this.state == "closed") {
return;
}
this.state = "closed";
if (this.conatService != null) {
this.conatService.close();
delete this.conatService;
}
if (this.timeout != null) {
clearTimeout(this.timeout);
delete this.timeout;
}
if (this.registerToSyncInterval) {
clearInterval(this.registerToSyncInterval);
delete this.registerToSyncInterval;
}
const args = ["-uz", this.mount];
log("fusermount", args.join(" "));
try {
await exec("fusermount", args);
} catch (err) {
log("fusermount fail -- ", err);
}
try {
await this.unmountExcludes();
} catch (err) {
log("unmountExcludes fail -- ", err);
}
this.websocket?.removeListener("data", this.handleApiRequest);
this.websocket?.removeListener("state", this.registerToSync);
};
private initSyncRequestHandler = async () => {
log("initSyncRequestHandler: installing sync request handler");
this.websocket = await this.client.project_client.websocket(
this.project_id,
);
this.websocket.on("data", this.handleApiRequest);
log("initSyncRequestHandler: installed handler");
this.registerToSync();
this.registerToSyncInterval = setInterval(
this.registerToSync,
REGISTER_INTERVAL_MS,
);
this.websocket.on("state", this.registerToSync);
};
private registerToSync = async (state = "online") => {
if (state != "online") return;
try {
log("registerToSync: registering");
const api = await this.client.project_client.api(this.project_id);
await api.computeServerSyncRegister(this.compute_server_id);
await apiCall("v2/compute/set-detailed-state", {
id: this.compute_server_id,
state: "ready",
progress: 100,
name: "filesystem",
timeout: Math.round(REGISTER_INTERVAL_MS / 1000) + 15,
});
log("registerToSync: registered");
} catch (err) {
log("registerToSync: ERROR -- ", err);
}
};
private handleApiRequest = async (data) => {
try {
log("handleApiRequest:", { data });
const resp = await this.doApiRequest(data);
log("handleApiRequest: ", { resp });
if (data.id && this.websocket != null) {
this.websocket.write({
id: data.id,
resp,
});
}
} catch (err) {
const error = `${err}`;
if (data.id && this.websocket != null) {
log("handleApiRequest: returning error", { event: data?.event, error });
this.websocket.write({
id: data.id,
error,
});
} else {
log("handleApiRequest: ignoring error", { event: data?.event, error });
}
}
};
doApiRequest = async (data) => {
log("doApiRequest", { data });
switch (data?.event) {
case "compute_server_sync_request":
try {
if (this.state == "sync") {
return;
}
if (!this.syncIsDisabled()) {
await this.sync();
}
log("doApiRequest: sync worked");
} catch (err) {
log("doApiRequest: sync failed", err);
}
return;
case "copy_from_project_to_compute_server":
case "copy_from_compute_server_to_project": {
const extractArgs = ["-x"];
extractArgs.push("-C");
extractArgs.push(data.dest ? data.dest : ".");
const HOME = this.mount;
for (const { prefix, paths } of parseCommonPrefixes(data.paths)) {
const createArgs = ["-c", "-C", prefix, ...paths];
log({ extractArgs, createArgs });
if (data.event == "copy_from_project_to_compute_server") {
await this.tar.get({
createArgs,
extractArgs,
HOME,
});
} else if (data.event == "copy_from_compute_server_to_project") {
await this.tar.send({
createArgs,
extractArgs,
HOME,
});
} else {
throw Error(`bug -- invalid event ${data.event}`);
}
}
return;
}
case "listing":
return await getListing(data.path, data.hidden, { HOME: this.mount });
case "exec":
return await executeCode({ ...data.opts, home: this.mount });
case "delete_files":
return await delete_files(data.paths, this.mount);
case "move_files":
return await move_files(
data.paths,
data.dest,
(path) => this.client.set_deleted(path),
this.mount,
);
case "rename_file":
return await rename_file(
data.src,
data.dest,
(path) => this.client.set_deleted(path),
this.mount,
);
default:
throw Error(`unknown event '${data?.event}'`);
}
};
private mountUnionFS = async () => {
await exec("unionfs-fuse", [
"-o",
"allow_other,auto_unmount,nonempty,large_read,cow,max_files=32768",
`${this.upper}=RW:${this.lower}=RO`,
this.mount,
]);
};
private shouldMountExclude = (path) => {
return (
path &&
!path.startsWith(".") &&
!path.startsWith("/") &&
path != "~" &&
!path.includes("/")
);
};
private unmountExcludes = async () => {
for (const path of this.exclude) {
if (this.shouldMountExclude(path)) {
try {
const target = join(this.mount, path);
log("unmountExcludes -- unmounting", { target });
await exec("sudo", ["umount", target]);
} catch (err) {
log("unmountExcludes -- warning ", err);
}
}
}
};
private bindMountExcludes = async () => {
for (const path of this.exclude) {
if (this.shouldMountExclude(path)) {
log("bindMountExcludes -- mounting", { path });
const source = join(this.data, path);
const target = join(this.mount, path);
const upper = join(this.upper, path);
log("bindMountExcludes -- mounting", { source, target });
await mkdirp(source);
await mkdirp(upper);
await exec("sudo", ["mount", "--bind", source, target]);
} else {
log("bindMountExcludes -- skipping", { path });
}
}
for (const path of EXPLICIT_HIDDEN_EXCLUDES) {
log("bindMountExcludes -- explicit hidden path ", { path });
const source = join(this.data, `.explicit${path}`);
const target = join(this.mount, path);
const upper = join(this.upper, path);
log("bindMountExcludes -- explicit hidden path", { source, target });
await mkdirp(source);
await mkdirp(upper);
await exec("sudo", ["mount", "--bind", source, target]);
}
};
public sync = async () => {
if (this.state == "sync") {
throw Error("sync currently in progress");
}
if (this.state != "ready") {
throw Error(
`can only sync when state is ready but state is "${this.state}"`,
);
}
log("sync: doing a sync");
const t0 = Date.now();
try {
this.state = "sync";
await this.__doSync();
this.numFails = 0;
this.reportState({
state: "ready",
progress: 100,
timeout: 3 + this.syncInterval,
});
} catch (err) {
this.numFails += 1;
let extra;
let message = trunc_middle(`${err.message}`, 500);
if (this.numFails >= MAX_FAILURES_IN_A_ROW) {
extra = `XXX Sync failed ${MAX_FAILURES_IN_A_ROW} in a row. FIX THE PROBLEM, THEN CLEAR THIS ERROR TO RESUME SYNC. -- ${message}`;
} else {
extra = `XXX Sync failed ${this.numFails} times in a row with -- ${message}`;
}
this.reportState({ state: "error", extra, timeout: 60, progress: 0 });
await this.logSyncError(extra);
throw Error(extra);
} finally {
if (this.state != ("closed" as State)) {
this.state = "ready";
}
log("sync - done, time=", (Date.now() - t0) / 1000);
}
};
private syncIsDisabled = () => {
if (this.exclude.includes("~") || this.exclude.includes(".")) {
log("syncLoop: '~' or '.' is included in excludes, so we never sync");
return true;
}
return false;
};
private syncLoop = async () => {
if (this.syncIsDisabled()) {
const wait = 1000 * 60;
log(`syncLoop -- sleeping ${wait / 1000} seconds...`);
this.timeout = setTimeout(this.syncLoop, wait);
return;
}
const t0 = Date.now();
if (this.state == "ready") {
log("syncLoop: ready");
try {
if (this.numFails >= MAX_FAILURES_IN_A_ROW) {
const detailedState = await this.getDetailedState();
if (
detailedState &&
(!detailedState.extra || detailedState.state != "error")
) {
log("syncLoop: resuming sync since error was cleared");
this.numFails = 0;
await this.sync();
} else {
log(
`syncLoop: not syncing due to failing ${this.numFails} times in a row. Will restart when error message is cleared.`,
);
}
} else {
await this.sync();
}
} catch (err) {
log(err.message);
this.syncInterval = Math.min(
this.syncIntervalMax,
1.5 * this.syncInterval,
);
}
} else {
log("sync: skipping since state = ", this.state);
}
const wait = Math.min(
this.syncIntervalMax * 1000,
this.syncInterval * 1000 + (Date.now() - t0),
);
log(`syncLoop -- sleeping ${wait / 1000} seconds...`);
this.timeout = setTimeout(this.syncLoop, wait);
};
private makeScratchDir = async () => {
await mkdir(this.scratch, { recursive: true });
};
private logSyncError = async (mesg: string) => {
try {
await writeFile(this.error_txt, mesg);
} catch (err) {
log(`UNABLE to log sync err -- ${err}`);
}
};
private reportState = throttle(
async (opts: { state; extra?; timeout?; progress? }) => {
log("reportState", opts);
try {
await apiCall("v2/compute/set-detailed-state", {
id: this.compute_server_id,
name: "filesystem-sync",
...opts,
});
} catch (err) {
log("reportState: WARNING -- ", err);
}
},
1500,
{ leading: true, trailing: true },
);
private getDetailedState = async () => {
return await apiCall("v2/compute/get-detailed-state", {
id: this.compute_server_id,
name: "filesystem-sync",
});
};
private __doSync = async () => {
log("doSync");
this.reportState({ state: "get-compute-state", progress: 0, timeout: 10 });
await this.makeScratchDir();
const api = await this.client.project_client.api(this.project_id);
const { computeState, whiteouts } = await this.getComputeState();
const computeStateJson = join(
".compute-servers",
`${this.compute_server_id}`,
"compute-state.json.lz4",
);
await writeFile(
join(this.lower, computeStateJson),
await toCompressedJSON(computeState),
);
this.reportState({
state: "send-state-to-project",
progress: 20,
timeout: 10,
});
const { removeFromCompute, copyFromCompute, copyFromProjectTar } =
await api.syncFS({
computeStateJson,
exclude: this.exclude,
compute_server_id: this.compute_server_id,
now: Date.now(),
});
let isActive = false;
if (whiteouts.length > 0) {
isActive = true;
await remove(whiteouts, join(this.upper, UNIONFS));
}
if (removeFromCompute?.length ?? 0 > 0) {
isActive = true;
await remove(removeFromCompute, this.upper);
}
if (copyFromCompute?.length ?? 0 > 0) {
isActive = true;
this.reportState({
state: `send-${copyFromCompute?.length ?? 0}-files-to-project`,
progress: 50,
});
await this.sendFiles(copyFromCompute);
}
if (copyFromProjectTar) {
isActive = true;
this.reportState({
state: "receive-files-from-project",
progress: 70,
});
await this.receiveFiles(copyFromProjectTar);
}
log("DONE receiving files from project as part of sync");
if (isActive) {
this.syncInterval = this.syncIntervalMin;
} else {
this.syncInterval = Math.min(
this.syncIntervalMax,
1.3 * this.syncInterval,
);
}
await this.updateReadTracking();
};
private getComputeState = async (): Promise<{
computeState: FilesystemState;
whiteouts: string[];
}> => {
const whiteLen = "_HIDDEN~".length;
const computeState = await mtimeDirTree({
path: this.upper,
exclude: this.exclude,
});
const whiteouts: string[] = [];
const unionfs = join(this.upper, UNIONFS);
const mtimes = await mtimeDirTree({
path: unionfs,
exclude: [],
});
for (const path in mtimes) {
const mtime = mtimes[path];
if (path.endsWith("_HIDDEN~")) {
const p = path.slice(0, -whiteLen);
whiteouts.push(path);
if ((await stat(join(unionfs, path))).isDirectory()) {
whiteouts.push(p);
}
computeState[p] = -mtime;
}
}
return { computeState, whiteouts };
};
private sendFiles = async (files: string[]) => {
const target = join(this.scratch, "copy-to-project");
log("sendFiles: sending ", files.length, "files listed in ", target);
const file = await open(target, "w");
await file.write(files.join("\0"));
await file.close();
const createArgs = [
"-c",
"--null",
"--no-recursion",
"--verbatim-files-from",
"--files-from",
target,
];
const extractArgs = ["--delay-directory-restore", "-x"];
await this.tar.send({ createArgs, extractArgs });
log("sendFiles: ", files.length, "sent");
};
private receiveFiles = async (pathToFileList: string) => {
log("receiveFiles: getting files in from project -- ", pathToFileList);
const createArgs = [
"-c",
"--null",
"--no-recursion",
"--verbatim-files-from",
"--files-from",
pathToFileList,
];
const extractArgs = ["--delay-directory-restore", "-x"];
await this.tar.get({
createArgs,
extractArgs,
});
log("receiveFiles: files in ", pathToFileList, "received from project");
};
private updateReadTracking = async () => {
if (!this.readTrackingFile) {
return;
}
const readTrackingOnProject = join(
".compute-servers",
`${this.compute_server_id}`,
"read-tracking",
);
this.reportState({
state: "cache-files-from-project",
progress: 80,
});
try {
try {
const tmp = join(
dirname(this.readTrackingFile),
`.${basename(this.readTrackingFile)}.tmp`,
);
await rename(this.readTrackingFile, tmp);
await copyFile(tmp, join(this.lower, readTrackingOnProject));
await rm(tmp);
} catch (err) {
if (err.code == "ENOENT") {
log(
`updateReadTracking -- no read tracking file '${this.readTrackingFile}'`,
);
return;
}
log(
`updateReadTracking -- issue moving tracking file '${this.readTrackingFile}'`,
err,
);
return;
}
const createArgs = [
"-c",
"--null",
"--no-recursion",
"--verbatim-files-from",
"--files-from",
readTrackingOnProject,
];
const extractArgs = ["--keep-newer-files", "-x"];
log("updateReadTracking:", "tar", createArgs.join(" "));
try {
await this.tar.get({ createArgs, extractArgs });
} catch (err) {
log(
`updateReadTracking -- issue extracting tracking file '${this.readTrackingFile}'`,
err,
);
return;
}
} finally {
this.reportState({
state: "cache-files-from-project",
progress: 85,
});
}
};
initConatService = async () => {
if (this.role == "compute_server") {
this.conatService = await initConatClientService({
syncfs: this,
project_id: this.project_id,
compute_server_id: this.compute_server_id,
});
} else if (this.role == "project") {
this.conatService = await initConatServerService({
syncfs: this,
project_id: this.project_id,
});
} else {
throw Error("only compute_server and project roles are supported");
}
};
}