Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/file-server/zfs/pull.ts
1447 views
1
/*
2
Use zfs replication over ssh to pull recent filesystems from
3
one file-server to another one.
4
5
This will be used for:
6
7
- backup
8
- moving a filesystem from one region/cluster to another
9
*/
10
11
import {
12
type Filesystem,
13
type RawFilesystem,
14
primaryKey,
15
PrimaryKey,
16
} from "./types";
17
import { exec } from "./util";
18
import {
19
databaseFilename,
20
filesystemDataset,
21
filesystemMountpoint,
22
} from "./names";
23
import { filesystemExists, getRecent, get, set } from "./db";
24
import getLogger from "@cocalc/backend/logger";
25
import { getSnapshots } from "./snapshots";
26
import { createFilesystem, deleteFilesystem } from "./create";
27
import { context } from "./config";
28
import { archiveFilesystem, dearchiveFilesystem } from "./archive";
29
import { deleteSnapshot } from "./snapshots";
30
import { isEqual } from "lodash";
31
import { join } from "path";
32
import { readdir, unlink } from "fs/promises";
33
34
const logger = getLogger("file-server:zfs:pull");
35
36
// number of remote backups of db sqlite file to keep.
37
const NUM_DB_TO_KEEP = 10;
38
39
// This is used for unit testing. It's what fields should match
40
// after doing a sync, except snapshots where local is a superset,
41
// unless you pull with deleteSnapshots set to true.
42
export const SYNCED_FIELDS = [
43
// these four fields identify the filesystem, so they better get sync'd:
44
"namespace",
45
"owner_type",
46
"owner_id",
47
"name",
48
// snaphots -- reflects that we replicated properly.
49
"snapshots",
50
51
// last_edited is useful for targetting sync work and making decisions, e.g.., should we delete
52
"last_edited",
53
// these just get directly sync'd. They aren't used unless somehow local were to actually server
54
// data directly.
55
"affinity",
56
"nfs",
57
];
58
59
interface Remote {
60
// remote = user@hostname that you can ssh to
61
remote: string;
62
// filesystem prefix of the remote server, so {prefix}/database.sqlite3 has the
63
// database that defines the state of the remote server.
64
prefix: string;
65
}
66
67
// Copy from remote to here every filesystem that has changed since cutoff.
68
export async function pull({
69
cutoff,
70
filesystem,
71
remote,
72
prefix,
73
deleteFilesystemCutoff,
74
deleteSnapshots,
75
dryRun,
76
}: Remote & {
77
// pulls everything that's changed with remote last_edited >= cutoff.
78
cutoff?: Date;
79
// alternatively -- if given, only pull this filesystem and nothing else:
80
filesystem?: PrimaryKey;
81
82
// DANGER: if set, any local filesystem with
83
// cutoff <= last_edited <= deleteFilesystemCutoff
84
// gets actually deleted. This makes it possible, e.g., to delete every filesystem
85
// that was deleted on the main server in the last 6 months and deleted at least 1
86
// month ago, so we have a bit of time before destroy backups.
87
deleteFilesystemCutoff?: Date;
88
// if true, delete local snapshots if they were deleted on the remote.
89
deleteSnapshots?: boolean;
90
// just say how much will happen, but don't do anything.
91
dryRun?: boolean;
92
}): Promise<{
93
toUpdate: { remoteFs: Filesystem; localFs?: Filesystem }[];
94
toDelete: RawFilesystem[];
95
}> {
96
logger.debug("pull: from ", { remote, prefix, cutoff, filesystem });
97
if (prefix.startsWith("/")) {
98
throw Error("prefix should not start with /");
99
}
100
if (cutoff == null) {
101
cutoff = new Date(Date.now() - 1000 * 60 * 60 * 24 * 7);
102
}
103
logger.debug("pull: get the remote sqlite database");
104
await exec({ command: "mkdir", args: ["-p", context.PULL] });
105
const remoteDatabase = join(
106
context.PULL,
107
`${remote}:${prefix}---${new Date().toISOString()}.sqlite3`,
108
);
109
// delete all but the most recent remote database files for this remote/prefix (?).
110
const oldDbFiles = (await readdir(context.PULL))
111
.sort()
112
.filter((x) => x.startsWith(`${remote}:${prefix}---`))
113
.slice(0, -NUM_DB_TO_KEEP);
114
for (const path of oldDbFiles) {
115
await unlink(join(context.PULL, path));
116
}
117
118
await exec({
119
command: "scp",
120
args: [`${remote}:/${databaseFilename(prefix)}`, remoteDatabase],
121
});
122
123
logger.debug("pull: compare state");
124
const recent =
125
filesystem != null
126
? [get(filesystem, remoteDatabase)]
127
: getRecent({ cutoff, databaseFile: remoteDatabase });
128
const toUpdate: { remoteFs: Filesystem; localFs?: Filesystem }[] = [];
129
for (const fs of recent) {
130
const remoteFs = get(fs, remoteDatabase);
131
if (!filesystemExists(fs)) {
132
toUpdate.push({ remoteFs });
133
} else {
134
const localFs = get(fs);
135
if (remoteFs.archived != localFs.archived) {
136
// different archive state, so needs an update to resolve this (either way)
137
toUpdate.push({ remoteFs, localFs });
138
continue;
139
}
140
if (deleteSnapshots) {
141
// sync if *any* snapshots differ
142
if (!isEqual(remoteFs.snapshots, localFs.snapshots)) {
143
toUpdate.push({ remoteFs, localFs });
144
}
145
} else {
146
// only sync if newest snapshots are different
147
const newestRemoteSnapshot =
148
remoteFs.snapshots[remoteFs.snapshots.length - 1];
149
if (!newestRemoteSnapshot) {
150
// no snapshots yet, so nothing to do.
151
continue;
152
}
153
const newestLocalSnapshot =
154
localFs.snapshots[localFs.snapshots.length - 1];
155
if (
156
!newestLocalSnapshot ||
157
newestRemoteSnapshot > newestLocalSnapshot
158
) {
159
toUpdate.push({ remoteFs, localFs });
160
}
161
}
162
}
163
}
164
165
logger.debug(`pull: toUpdate.length = ${toUpdate.length}`);
166
if (!dryRun) {
167
for (const x of toUpdate) {
168
logger.debug("pull: updating ", x);
169
await pullOne({ ...x, remote, deleteSnapshots });
170
}
171
}
172
173
const toDelete: RawFilesystem[] = [];
174
if (deleteFilesystemCutoff) {
175
for (const fs of getRecent({ cutoff })) {
176
if (!filesystemExists(fs, remoteDatabase)) {
177
if (new Date(fs.last_edited ?? 0) <= deleteFilesystemCutoff) {
178
// it's old enough to delete:
179
toDelete.push(fs);
180
}
181
}
182
}
183
}
184
logger.debug(`pull: toDelete.length = ${toDelete.length}`);
185
if (!dryRun) {
186
for (const fs of toDelete) {
187
logger.debug("pull: deleting", fs);
188
await deleteFilesystem(fs);
189
}
190
}
191
192
return { toUpdate, toDelete };
193
}
194
195
async function pullOne({
196
remoteFs,
197
localFs,
198
remote,
199
deleteSnapshots,
200
}: {
201
remoteFs: Filesystem;
202
localFs?: Filesystem;
203
remote?: string;
204
deleteSnapshots?: boolean;
205
}) {
206
logger.debug("pull:", { remoteFs, localFs, remote, deleteSnapshots });
207
if (localFs == null) {
208
localFs = await createFilesystem(remoteFs);
209
}
210
211
// sync last_edited, affinity and nfs fields in all cases
212
set({
213
...primaryKey(localFs),
214
last_edited: remoteFs.last_edited,
215
affinity: remoteFs.affinity,
216
nfs: remoteFs.nfs,
217
});
218
219
if (localFs.archived && !remoteFs.archived) {
220
// it's back in use:
221
await dearchiveFilesystem(localFs);
222
// don't return -- will then possibly sync more below, in case of new changes
223
} else if (!localFs.archived && remoteFs.archived) {
224
// we just archive ours. Note in theory there is a chance
225
// that our local version is not update-to-date with the remote
226
// version. However, the point of archiving is it should only happen
227
// many weeks after a filesystem stopped being used, and by that
228
// point we should have already pull'd the latest version.
229
// Don't bother worrying about deleting snapshots.
230
await archiveFilesystem(localFs);
231
return;
232
}
233
if (localFs.archived && remoteFs.archived) {
234
// nothing to do
235
// Also, don't bother worrying about deleting snapshots, since can't.
236
return;
237
}
238
const snapshot = newestCommonSnapshot(localFs.snapshots, remoteFs.snapshots);
239
const newest_snapshot = remoteFs.snapshots[remoteFs.snapshots.length - 1];
240
if (!newest_snapshot || snapshot == newest_snapshot) {
241
logger.debug("pull: already have the newest snapshot locally");
242
} else {
243
const mountpoint = filesystemMountpoint(localFs);
244
try {
245
if (!snapshot) {
246
// full replication with nothing local
247
await exec({
248
verbose: true,
249
command: `ssh ${remote} "zfs send -e -c -R ${filesystemDataset(remoteFs)}@${newest_snapshot}" | sudo zfs recv -o mountpoint=${mountpoint} -F ${filesystemDataset(localFs)}`,
250
what: {
251
...localFs,
252
desc: "pull: doing a full receive from remote",
253
},
254
});
255
} else {
256
// incremental based on the last common snapshot
257
const force =
258
localFs.snapshots[localFs.snapshots.length - 1] == snapshot
259
? ""
260
: " -F ";
261
await exec({
262
verbose: true,
263
command: `ssh ${remote} "zfs send -e -c -I @${snapshot} ${filesystemDataset(remoteFs)}@${newest_snapshot}" | sudo zfs recv -o mountpoint=${mountpoint} -F ${filesystemDataset(localFs)} ${force}`,
264
what: {
265
...localFs,
266
desc: "pull: doing an incremental replication from remote",
267
},
268
});
269
}
270
} finally {
271
// even if there was an error, update local snapshots, since we likely have some new
272
// ones (e.g., even if there was a partial receive, interrupted by a network drop).
273
await getSnapshots(localFs);
274
}
275
}
276
277
if (deleteSnapshots) {
278
// In general due to snapshot trimming, the
279
// list of snapshots on local might NOT match remote, but after replication
280
// local will always have a *supserset* of remote. We thus may have to
281
// trim some snapshots:
282
const remoteSnapshots = new Set(remoteFs.snapshots);
283
const localSnapshots = get(localFs).snapshots;
284
for (const snapshot of localSnapshots) {
285
if (!remoteSnapshots.has(snapshot)) {
286
await deleteSnapshot({ ...localFs, snapshot });
287
}
288
}
289
}
290
}
291
292
// s0 and s1 are sorted oldest-to-newest lists of names of snapshots.
293
// return largest that is in common between the two or undefined if nothing is in common
294
function newestCommonSnapshot(s0: string[], s1: string[]) {
295
const t1 = new Set(s1);
296
for (let i = s0.length - 1; i >= 0; i--) {
297
if (t1.has(s0[i])) {
298
return s0[i];
299
}
300
}
301
}
302
303