Path: blob/master/src/packages/backend/conat/test/files/write.test.ts
1451 views
/*1Test async streaming writing of files to compute servers using NATS.234DEVELOPMENT:56pnpm test ./write.test.ts78*/910import { before, after } from "@cocalc/backend/conat/test/setup";1112beforeAll(before);1314import { close, createServer, writeFile } from "@cocalc/conat/files/write";15import { createWriteStream, createReadStream } from "fs";16import { file as tempFile } from "tmp-promise";17import { writeFile as fsWriteFile, readFile } from "fs/promises";18import { sha1 } from "@cocalc/backend/sha1";19import { delay } from "awaiting";2021describe("do a basic test that the file writing service works", () => {22const project_id = "00000000-0000-4000-8000-000000000000";23const compute_server_id = 0;24it("create the write server", async () => {25await createServer({26project_id,27compute_server_id,28createWriteStream,29});30});3132let cleanups: any[] = [];33const CONTENT = "cocalc";34let source;35it("creates the file we will read", async () => {36const { path, cleanup } = await tempFile();37source = path;38await fsWriteFile(path, CONTENT);39cleanups.push(cleanup);40});4142let dest;43it("write to a new file", async () => {44const { path, cleanup } = await tempFile();45dest = path;46cleanups.push(cleanup);4748const stream = createReadStream(source);49const { bytes, chunks } = await writeFile({50stream,51project_id,52compute_server_id,53path,54});55expect(chunks).toBe(1);56expect(bytes).toBe(CONTENT.length);57});5859it("confirm that the dest file is correct", async () => {60await delay(50);61const d = (await readFile(dest)).toString();62expect(d).toEqual(CONTENT);63});6465it("closes the write server", async () => {66close({ project_id, compute_server_id });67for (const f of cleanups) {68f();69}70});71});7273describe("do a more challenging test that involves a larger file that has to be broken into many chunks", () => {74const project_id = "00000000-0000-4000-8000-000000000000";75const compute_server_id = 1;7677it("create the write server", async () => {78await createServer({79project_id,80compute_server_id,81createWriteStream,82});83});8485let cleanups: any[] = [];86let CONTENT = "";87for (let i = 0; i < 1000000; i++) {88CONTENT += `${i}`;89}90let source;91it("creates the file we will read", async () => {92const { path, cleanup } = await tempFile();93source = path;94await fsWriteFile(path, CONTENT);95cleanups.push(cleanup);96});9798let dest;99it("write to a new file", async () => {100const { path, cleanup } = await tempFile();101dest = path;102cleanups.push(cleanup);103104const stream = createReadStream(source);105const { bytes, chunks } = await writeFile({106stream,107project_id,108compute_server_id,109path,110});111expect(chunks).toBeGreaterThan(1);112expect(bytes).toBe(CONTENT.length);113});114115it("confirm that the dest file is correct", async () => {116let d = (await readFile(dest)).toString();117if (d.length != CONTENT.length) {118// under heavy load file might not have been flushed **to disk** (even though it was fully and119// correctly received), so we wait to give it a chance, then try again.120await delay(1000);121d = (await readFile(dest)).toString();122}123expect(d.length).toEqual(CONTENT.length);124// not directly comparing, since huge and if something goes wrong the output125// saying the test failed is huge.126expect(sha1(d)).toEqual(sha1(CONTENT));127});128129it("closes the write server", async () => {130close({ project_id, compute_server_id });131for (const f of cleanups) {132f();133}134});135});136137afterAll(after);138139140