Path: blob/master/src/packages/file-server/zfs/streams.ts
1447 views
/*1Send/Receive incremental replication streams of a filesystem.2*/34import { type PrimaryKey } from "./types";5import { get, getRecent, set } from "./db";6import getLogger from "@cocalc/backend/logger";7import {8filesystemStreamsPath,9filesystemStreamsFilename,10filesystemDataset,11} from "./names";12import { exec } from "./util";13import { split } from "@cocalc/util/misc";14import { join } from "path";15import { getSnapshots } from "./snapshots";16import { STREAM_INTERVAL_MS, MAX_STREAMS } from "./config";1718const logger = getLogger("file-server:zfs:send");1920export async function send(fs: PrimaryKey) {21const filesystem = get(fs);22if (filesystem.archived) {23logger.debug("filesystem is archived, so nothing to do", fs);24return;25}26const { snapshots } = filesystem;27const newest_snapshot = snapshots[snapshots.length - 1];28if (!newest_snapshot) {29logger.debug("no snapshots yet");30return;31}32if (newest_snapshot == filesystem.last_send_snapshot) {33logger.debug("no new snapshots", fs);34// the most recent snapshot is the same as the last one we used to make35// an archive, so nothing to do.36return;37}38await exec({39command: "sudo",40args: ["mkdir", "-p", filesystemStreamsPath(filesystem)],41what: { ...filesystem, desc: "make send target directory" },42});4344let stream;45if (!filesystem.last_send_snapshot) {46logger.debug("doing first ever send -- a full send");47stream = filesystemStreamsFilename({48...filesystem,49snapshot1: new Date(0).toISOString(),50snapshot2: newest_snapshot,51});52try {53await exec({54verbose: true,55command: `sudo sh -c 'zfs send -e -c -R ${filesystemDataset(filesystem)}@${newest_snapshot} > ${stream}.temp'`,56what: {57...filesystem,58desc: "send: zfs send of full filesystem dataset (first full send)",59},60});61} catch (err) {62await exec({63verbose: true,64command: "sudo",65args: ["rm", `${stream}.temp`],66});67throw err;68}69} else {70logger.debug("doing incremental send");71const snapshot1 = filesystem.last_send_snapshot;72const snapshot2 = newest_snapshot;73stream = filesystemStreamsFilename({74...filesystem,75snapshot1,76snapshot2,77});78try {79await exec({80verbose: true,81command: `sudo sh -c 'zfs send -e -c -I @${snapshot1} ${filesystemDataset(filesystem)}@${snapshot2} > ${stream}.temp'`,82what: {83...filesystem,84desc: "send: zfs incremental send",85},86});87} catch (err) {88await exec({89verbose: true,90command: "sudo",91args: ["rm", `${stream}.temp`],92});93throw err;94}95}96await exec({97verbose: true,98command: "sudo",99args: ["mv", `${stream}.temp`, stream],100});101set({ ...fs, last_send_snapshot: newest_snapshot });102}103104async function getStreams(fs: PrimaryKey) {105const filesystem = get(fs);106const streamsPath = filesystemStreamsPath(filesystem);107const { stdout } = await exec({108command: "sudo",109args: ["ls", streamsPath],110what: { ...filesystem, desc: "getting list of streams" },111});112return split(stdout.trim()).filter((path) => path.endsWith(".zfs"));113}114115export async function recv(fs: PrimaryKey) {116const filesystem = get(fs);117if (filesystem.archived) {118throw Error("filesystem must not be archived");119}120const streams = await getStreams(filesystem);121if (streams.length == 0) {122logger.debug("no streams");123return;124}125const { snapshots } = filesystem;126const newest_snapshot = snapshots[snapshots.length - 1] ?? "";127const toRead = streams.filter((snapshot) => snapshot >= newest_snapshot);128if (toRead.length == 0) {129return;130}131const streamsPath = filesystemStreamsPath(filesystem);132try {133for (const stream of toRead) {134await exec({135verbose: true,136command: `sudo sh -c 'cat ${join(streamsPath, stream)} | zfs recv ${filesystemDataset(filesystem)}'`,137what: {138...filesystem,139desc: `send: zfs incremental receive`,140},141});142}143} finally {144// ensure snapshots and size info in our database is up to date:145await getSnapshots(fs);146}147}148149function getRange(streamName) {150const v = streamName.split("Z-");151return { snapshot1: v + "Z", snapshot2: v[1].slice(0, -".zfs".length) };152}153154// Replace older streams so that there are at most maxStreams total streams.155export async function recompact({156maxStreams,157...fs158}: PrimaryKey & { maxStreams: number }) {159const filesystem = get(fs);160const { snapshots } = filesystem;161const streams = await getStreams(filesystem);162if (streams.length <= maxStreams) {163// nothing to do164return;165}166if (maxStreams < 1) {167throw Error("maxStreams must be at least 1");168}169// replace first n streams by one full replication stream170let n = streams.length - maxStreams + 1;171let snapshot2 = getRange(streams[n - 1]).snapshot2;172while (!snapshots.includes(snapshot2) && n < streams.length) {173snapshot2 = getRange(streams[n]).snapshot2;174if (snapshots.includes(snapshot2)) {175break;176}177n += 1;178}179if (!snapshots.includes(snapshot2)) {180throw Error(181"bug -- this can't happen because we never delete the last snapshot used for send",182);183}184185const stream = filesystemStreamsFilename({186...filesystem,187snapshot1: new Date(0).toISOString(),188snapshot2,189});190try {191await exec({192verbose: true,193command: `sudo sh -c 'zfs send -e -c -R ${filesystemDataset(filesystem)}@${snapshot2} > ${stream}.temp'`,194what: {195...filesystem,196desc: "send: zfs send of full filesystem dataset (first full send)",197},198});199// if this rm were to fail, then things would be left in a broken state,200// since ${stream}.temp also gets deleted in the catch. But it seems201// highly unlikely this rm of the old streams would ever fail.202const path = filesystemStreamsPath(filesystem);203await exec({204verbose: true,205command: "sudo",206// full paths to the first n streams:207args: ["rm", "-f", ...streams.slice(0, n).map((x) => join(path, x))],208});209await exec({210verbose: true,211command: "sudo",212args: ["mv", `${stream}.temp`, stream],213});214} catch (err) {215await exec({216verbose: true,217command: "sudo",218args: ["rm", "-f", `${stream}.temp`],219});220throw err;221}222}223224// Go through ALL filesystems with last_edited >= cutoff and send a stream if due,225// and also ensure number of streams isn't too large.226export async function maintainStreams(cutoff?: Date) {227logger.debug("backupActiveFilesystems: getting...");228const v = getRecent({ cutoff });229logger.debug(`maintainStreams: considering ${v.length} filesystems`, cutoff);230let i = 0;231for (const { archived, last_edited, last_send_snapshot, ...pk } of v) {232if (archived || !last_edited) {233continue;234}235const age =236new Date(last_edited).valueOf() - new Date(last_send_snapshot ?? 0).valueOf();237if (age < STREAM_INTERVAL_MS) {238// there's a new enough stream already239continue;240}241try {242await send(pk);243await recompact({ ...pk, maxStreams: MAX_STREAMS });244} catch (err) {245logger.debug(`maintainStreams: error -- ${err}`);246}247i += 1;248if (i % 10 == 0) {249logger.debug(`maintainStreams: ${i}/${v.length}`);250}251}252}253254255