Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/file-server/zfs/streams.ts
1447 views
1
/*
2
Send/Receive incremental replication streams of a filesystem.
3
*/
4
5
import { type PrimaryKey } from "./types";
6
import { get, getRecent, set } from "./db";
7
import getLogger from "@cocalc/backend/logger";
8
import {
9
filesystemStreamsPath,
10
filesystemStreamsFilename,
11
filesystemDataset,
12
} from "./names";
13
import { exec } from "./util";
14
import { split } from "@cocalc/util/misc";
15
import { join } from "path";
16
import { getSnapshots } from "./snapshots";
17
import { STREAM_INTERVAL_MS, MAX_STREAMS } from "./config";
18
19
const logger = getLogger("file-server:zfs:send");
20
21
export async function send(fs: PrimaryKey) {
22
const filesystem = get(fs);
23
if (filesystem.archived) {
24
logger.debug("filesystem is archived, so nothing to do", fs);
25
return;
26
}
27
const { snapshots } = filesystem;
28
const newest_snapshot = snapshots[snapshots.length - 1];
29
if (!newest_snapshot) {
30
logger.debug("no snapshots yet");
31
return;
32
}
33
if (newest_snapshot == filesystem.last_send_snapshot) {
34
logger.debug("no new snapshots", fs);
35
// the most recent snapshot is the same as the last one we used to make
36
// an archive, so nothing to do.
37
return;
38
}
39
await exec({
40
command: "sudo",
41
args: ["mkdir", "-p", filesystemStreamsPath(filesystem)],
42
what: { ...filesystem, desc: "make send target directory" },
43
});
44
45
let stream;
46
if (!filesystem.last_send_snapshot) {
47
logger.debug("doing first ever send -- a full send");
48
stream = filesystemStreamsFilename({
49
...filesystem,
50
snapshot1: new Date(0).toISOString(),
51
snapshot2: newest_snapshot,
52
});
53
try {
54
await exec({
55
verbose: true,
56
command: `sudo sh -c 'zfs send -e -c -R ${filesystemDataset(filesystem)}@${newest_snapshot} > ${stream}.temp'`,
57
what: {
58
...filesystem,
59
desc: "send: zfs send of full filesystem dataset (first full send)",
60
},
61
});
62
} catch (err) {
63
await exec({
64
verbose: true,
65
command: "sudo",
66
args: ["rm", `${stream}.temp`],
67
});
68
throw err;
69
}
70
} else {
71
logger.debug("doing incremental send");
72
const snapshot1 = filesystem.last_send_snapshot;
73
const snapshot2 = newest_snapshot;
74
stream = filesystemStreamsFilename({
75
...filesystem,
76
snapshot1,
77
snapshot2,
78
});
79
try {
80
await exec({
81
verbose: true,
82
command: `sudo sh -c 'zfs send -e -c -I @${snapshot1} ${filesystemDataset(filesystem)}@${snapshot2} > ${stream}.temp'`,
83
what: {
84
...filesystem,
85
desc: "send: zfs incremental send",
86
},
87
});
88
} catch (err) {
89
await exec({
90
verbose: true,
91
command: "sudo",
92
args: ["rm", `${stream}.temp`],
93
});
94
throw err;
95
}
96
}
97
await exec({
98
verbose: true,
99
command: "sudo",
100
args: ["mv", `${stream}.temp`, stream],
101
});
102
set({ ...fs, last_send_snapshot: newest_snapshot });
103
}
104
105
async function getStreams(fs: PrimaryKey) {
106
const filesystem = get(fs);
107
const streamsPath = filesystemStreamsPath(filesystem);
108
const { stdout } = await exec({
109
command: "sudo",
110
args: ["ls", streamsPath],
111
what: { ...filesystem, desc: "getting list of streams" },
112
});
113
return split(stdout.trim()).filter((path) => path.endsWith(".zfs"));
114
}
115
116
export async function recv(fs: PrimaryKey) {
117
const filesystem = get(fs);
118
if (filesystem.archived) {
119
throw Error("filesystem must not be archived");
120
}
121
const streams = await getStreams(filesystem);
122
if (streams.length == 0) {
123
logger.debug("no streams");
124
return;
125
}
126
const { snapshots } = filesystem;
127
const newest_snapshot = snapshots[snapshots.length - 1] ?? "";
128
const toRead = streams.filter((snapshot) => snapshot >= newest_snapshot);
129
if (toRead.length == 0) {
130
return;
131
}
132
const streamsPath = filesystemStreamsPath(filesystem);
133
try {
134
for (const stream of toRead) {
135
await exec({
136
verbose: true,
137
command: `sudo sh -c 'cat ${join(streamsPath, stream)} | zfs recv ${filesystemDataset(filesystem)}'`,
138
what: {
139
...filesystem,
140
desc: `send: zfs incremental receive`,
141
},
142
});
143
}
144
} finally {
145
// ensure snapshots and size info in our database is up to date:
146
await getSnapshots(fs);
147
}
148
}
149
150
function getRange(streamName) {
151
const v = streamName.split("Z-");
152
return { snapshot1: v + "Z", snapshot2: v[1].slice(0, -".zfs".length) };
153
}
154
155
// Replace older streams so that there are at most maxStreams total streams.
156
export async function recompact({
157
maxStreams,
158
...fs
159
}: PrimaryKey & { maxStreams: number }) {
160
const filesystem = get(fs);
161
const { snapshots } = filesystem;
162
const streams = await getStreams(filesystem);
163
if (streams.length <= maxStreams) {
164
// nothing to do
165
return;
166
}
167
if (maxStreams < 1) {
168
throw Error("maxStreams must be at least 1");
169
}
170
// replace first n streams by one full replication stream
171
let n = streams.length - maxStreams + 1;
172
let snapshot2 = getRange(streams[n - 1]).snapshot2;
173
while (!snapshots.includes(snapshot2) && n < streams.length) {
174
snapshot2 = getRange(streams[n]).snapshot2;
175
if (snapshots.includes(snapshot2)) {
176
break;
177
}
178
n += 1;
179
}
180
if (!snapshots.includes(snapshot2)) {
181
throw Error(
182
"bug -- this can't happen because we never delete the last snapshot used for send",
183
);
184
}
185
186
const stream = filesystemStreamsFilename({
187
...filesystem,
188
snapshot1: new Date(0).toISOString(),
189
snapshot2,
190
});
191
try {
192
await exec({
193
verbose: true,
194
command: `sudo sh -c 'zfs send -e -c -R ${filesystemDataset(filesystem)}@${snapshot2} > ${stream}.temp'`,
195
what: {
196
...filesystem,
197
desc: "send: zfs send of full filesystem dataset (first full send)",
198
},
199
});
200
// if this rm were to fail, then things would be left in a broken state,
201
// since ${stream}.temp also gets deleted in the catch. But it seems
202
// highly unlikely this rm of the old streams would ever fail.
203
const path = filesystemStreamsPath(filesystem);
204
await exec({
205
verbose: true,
206
command: "sudo",
207
// full paths to the first n streams:
208
args: ["rm", "-f", ...streams.slice(0, n).map((x) => join(path, x))],
209
});
210
await exec({
211
verbose: true,
212
command: "sudo",
213
args: ["mv", `${stream}.temp`, stream],
214
});
215
} catch (err) {
216
await exec({
217
verbose: true,
218
command: "sudo",
219
args: ["rm", "-f", `${stream}.temp`],
220
});
221
throw err;
222
}
223
}
224
225
// Go through ALL filesystems with last_edited >= cutoff and send a stream if due,
226
// and also ensure number of streams isn't too large.
227
export async function maintainStreams(cutoff?: Date) {
228
logger.debug("backupActiveFilesystems: getting...");
229
const v = getRecent({ cutoff });
230
logger.debug(`maintainStreams: considering ${v.length} filesystems`, cutoff);
231
let i = 0;
232
for (const { archived, last_edited, last_send_snapshot, ...pk } of v) {
233
if (archived || !last_edited) {
234
continue;
235
}
236
const age =
237
new Date(last_edited).valueOf() - new Date(last_send_snapshot ?? 0).valueOf();
238
if (age < STREAM_INTERVAL_MS) {
239
// there's a new enough stream already
240
continue;
241
}
242
try {
243
await send(pk);
244
await recompact({ ...pk, maxStreams: MAX_STREAMS });
245
} catch (err) {
246
logger.debug(`maintainStreams: error -- ${err}`);
247
}
248
i += 1;
249
if (i % 10 == 0) {
250
logger.debug(`maintainStreams: ${i}/${v.length}`);
251
}
252
}
253
}
254
255