Path: blob/master/src/packages/conat/persist/storage.ts
1453 views
/*1Persistent storage of a specific stream or kv store.23You can set a message by providing optionally a key, buffer and/or json value.4A sequence number and time (in ms since epoch) is assigned and returned.5If the key is provided, it is an arbitrary string and all older messages6with that same key are deleted. You can efficiently retrieve a message7by its key. The message content itself is given by the buffer and/or json8value. The buffer is like the "payload" in NATS, and the json is like9the headers in NATS.1011This module is:1213- efficient -- buffer is automatically compressed using zstandard14- synchronous -- fast enough to meet our requirements even with blocking15- memory efficient -- nothing in memory beyond whatever key you request1617We care about memory efficiency here since it's likely we'll want to have18possibly thousands of these in a single nodejs process at once, but with19less than 1 read/write per second for each. Thus memory is critical, and20supporting at least 1000 writes/second is what we need.21Fortunately, this implementation can do ~50,000+ writes per second and read22over 500,000 per second. Yes, it blocks the main thread, but by using23better-sqlite3 and zstd-napi, we get 10x speed increases over async code,24so this is worth it.252627COMPRESSION:2829I implemented *sync* lz4-napi compression here and it's very fast,30but it has to be run with async waits in a loop or it doesn't give back31memory, and such throttling may significantly negatively impact performance32and mean we don't get a 100% sync api (like we have now).33The async functions in lz4-napi seem fine. Upstream report (by me):34https://github.com/antoniomuso/lz4-napi/issues/67835I also tried the rust sync snappy and it had a similar memory leak. Finally,36I tried zstd-napi and it has a very fast sync implementation that does *not*37need async pauses to not leak memory. So zstd-napi it is.38And I like zstandard anyways.3940NOTE:4142We use seconds instead of ms in sqlite since that is the standard43convention for times in sqlite.4445DEVELOPMENT:464748s = require('@cocalc/backend/conat/persist').pstream({path:'/tmp/a.db'})4950*/5152import { refCacheSync } from "@cocalc/util/refcache";53import { createDatabase, type Database, compress, decompress } from "./context";54import type { JSONValue } from "@cocalc/util/types";55import { EventEmitter } from "events";56import {57DataEncoding,58type Headers,59ConatError,60} from "@cocalc/conat/core/client";61import TTL from "@isaacs/ttlcache";62import { getLogger } from "@cocalc/conat/client";6364const logger = getLogger("persist:storage");6566export interface PartialInventory {67// how much space is used by this stream68bytes: number;69limits: Partial<Configuration>;70// number of messages71count: number;72// for streams, the seq number up to which this data is valid, i.e.,73// this data is for all elements of the stream with sequence74// number <= seq.75seq: number;76}7778export interface Configuration {79// How many messages may be in a Stream, oldest messages will be removed80// if the Stream exceeds this size. -1 for unlimited.81max_msgs: number;8283// Maximum age of any message in the stream,84// expressed in milliseconds. 0 for unlimited.85// **Note that max_age is in milliseconds.**86max_age: number;8788// How big the Stream may be. When the stream size89// exceeds this, old messages are removed. -1 for unlimited.90// The size of a message is the sum of the raw uncompressed blob91// size, the headers json and the key length.92max_bytes: number;9394// The largest message that will be accepted. -1 for unlimited.95max_msg_size: number;9697// Attempting to publish a message that causes either of the following98// two rate limits to be exceeded throws an exception.99// For dstream, the messages are explicitly rejected and the client100// gets a "reject" event emitted. E.g., the terminal running in the project101// writes [...] when it gets these rejects, indicating that data was dropped.102// -1 for unlimited103max_bytes_per_second: number;104105// -1 for unlimited106max_msgs_per_second: number;107108// old = delete old messages to make room for nw109// new = refuse writes if they exceed the limits110discard_policy: "old" | "new";111112// If true (default: false), messages will be automatically deleted after their ttl113// Use the option {ttl:number of MILLISECONDS} when publishing to set a ttl.114allow_msg_ttl: boolean;115116// description of this table117desc: JSONValue;118}119120const CONFIGURATION = {121max_msgs: { def: -1, fromDb: parseInt, toDb: (x) => `${parseInt(x)}` },122max_age: { def: 0, fromDb: parseInt, toDb: (x) => `${parseInt(x)}` },123max_bytes: { def: -1, fromDb: parseInt, toDb: (x) => `${parseInt(x)}` },124max_msg_size: { def: -1, fromDb: parseInt, toDb: (x) => `${parseInt(x)}` },125max_bytes_per_second: {126def: -1,127fromDb: parseInt,128toDb: (x) => `${parseInt(x)}`,129},130max_msgs_per_second: {131def: -1,132fromDb: parseInt,133toDb: (x) => `${parseInt(x)}`,134},135discard_policy: {136def: "old",137fromDb: (x) => `${x}`,138toDb: (x) => (x == "new" ? "new" : "old"),139},140allow_msg_ttl: {141def: false,142fromDb: (x) => x == "true",143toDb: (x) => `${!!x}`,144},145desc: {146def: null,147fromDb: JSON.parse,148toDb: JSON.stringify,149},150};151152export const EPHEMERAL_MAX_BYTES = 64 * 1e6;153154enum CompressionAlgorithm {155None = 0,156Zstd = 1,157}158159interface Compression {160// compression algorithm to use161algorithm: CompressionAlgorithm;162// only compress data above this size163threshold: number;164}165166const DEFAULT_COMPRESSION = {167algorithm: CompressionAlgorithm.Zstd,168threshold: 1024,169};170171export interface StoredMessage {172// server assigned positive increasing integer number173seq: number;174// server assigned time in ms since epoch175time: number;176// user assigned key -- when set all previous messages with that key are deleted.177key?: string;178// the encoding used to encode the raw data179encoding: DataEncoding;180// arbitrary binary data181raw: Buffer;182// arbitrary JSON-able object -- analogue of NATS headers, but anything JSON-able183headers?: Headers;184}185186export interface SetOperation extends StoredMessage {187op: undefined;188msgID?: string;189}190191export interface DeleteOperation {192op: "delete";193// sequence numbers of deleted messages194seqs: number[];195}196197export interface StorageOptions {198// absolute path to sqlite database file. This needs to be a valid filename199// path, and must also be kept under 1K so it can be stored in cloud storage.200path: string;201// if false (the default) do not require sync writes to disk on every set202sync?: boolean;203// if set, then data is never saved to disk at all. To avoid using a lot of server204// RAM there is always a hard cap of at most EPHEMERAL_MAX_BYTES on any ephemeral205// table, which is enforced on all writes. Clients should always set max_bytes,206// possibly as low as they can, and check by reading back what is set.207ephemeral?: boolean;208// compression configuration209compression?: Compression;210}211212// persistence for stream of messages with subject213export class PersistentStream extends EventEmitter {214private readonly options: StorageOptions;215private readonly db: Database;216private readonly msgIDs = new TTL({ ttl: 2 * 60 * 1000 });217private conf: Configuration;218219constructor(options: StorageOptions) {220super();221logger.debug("constructor ", options.path);222223this.setMaxListeners(1000);224options = { compression: DEFAULT_COMPRESSION, ...options };225this.options = options;226const location = this.options.ephemeral227? ":memory:"228: this.options.path + ".db";229this.db = createDatabase(location);230//console.log(location);231this.init();232}233234init = () => {235if (!this.options.sync && !this.options.ephemeral) {236// Unless sync is set, we do not require that the filesystem has commited changes237// to disk after every insert. This can easily make things 10x faster. sets are238// typically going to come in one-by-one as users edit files, so this works well239// for our application. Also, loss of a few seconds persistence is acceptable240// in a lot of applications, e.g., if it is just edit history for a file.241this.db.prepare("PRAGMA synchronous=OFF").run();242}243// time is in *seconds* since the epoch, since that is standard for sqlite.244// ttl is in milliseconds.245this.db246.prepare(247`CREATE TABLE IF NOT EXISTS messages (248seq INTEGER PRIMARY KEY AUTOINCREMENT, key TEXT UNIQUE, time INTEGER NOT NULL, headers TEXT, compress NUMBER NOT NULL, encoding NUMBER NOT NULL, raw BLOB NOT NULL, size NUMBER NOT NULL, ttl NUMBER249)250`,251)252.run();253this.db254.prepare(255`256CREATE TABLE IF NOT EXISTS config (257field TEXT PRIMARY KEY, value TEXT NOT NULL258)`,259)260.run();261this.db262.prepare("CREATE INDEX IF NOT EXISTS idx_messages_key ON messages(key)")263.run();264this.db265.prepare("CREATE INDEX IF NOT EXISTS idx_messages_time ON messages(time)")266.run();267268this.conf = this.config();269};270271close = () => {272logger.debug("close ", this.options.path);273if (this.db != null) {274this.vacuum();275this.db.prepare("PRAGMA wal_checkpoint(FULL)").run();276this.db.close();277// @ts-ignore278}279// @ts-ignore280delete this.options;281this.msgIDs?.clear();282// @ts-ignore283delete this.msgIDs;284};285286private compress = (287raw: Buffer,288): { raw: Buffer; compress: CompressionAlgorithm } => {289if (290this.options.compression!.algorithm == CompressionAlgorithm.None ||291raw.length <= this.options.compression!.threshold292) {293return { raw, compress: CompressionAlgorithm.None };294}295if (this.options.compression!.algorithm == CompressionAlgorithm.Zstd) {296return { raw: compress(raw), compress: CompressionAlgorithm.Zstd };297}298throw Error(299`unknown compression algorithm: ${this.options.compression!.algorithm}`,300);301};302303set = ({304encoding,305raw,306headers,307key,308ttl,309previousSeq,310msgID,311}: {312encoding: DataEncoding;313raw: Buffer;314headers?: JSONValue;315key?: string;316ttl?: number;317previousSeq?: number;318// if given, any attempt to publish something again with the same msgID319// is deduplicated. Use this to prevent accidentally writing twice, e.g.,320// due to not getting a response back from the server.321msgID?: string;322}): { seq: number; time: number } => {323if (previousSeq === null) {324previousSeq = undefined;325}326if (key === null) {327key = undefined;328}329if (msgID != null && this.msgIDs?.has(msgID)) {330return this.msgIDs.get(msgID)!;331}332if (key !== undefined && previousSeq !== undefined) {333// throw error if current seq number for the row334// with this key is not previousSeq.335const { seq } = this.db // there is an index on the key so this is fast336.prepare("SELECT seq FROM messages WHERE key=?")337.get(key) as any;338if (seq != previousSeq) {339throw new ConatError("wrong last sequence", {340code: "wrong-last-sequence",341});342}343}344const time = Date.now();345const compressedRaw = this.compress(raw);346const serializedHeaders = JSON.stringify(headers);347const size =348(serializedHeaders?.length ?? 0) +349(raw?.length ?? 0) +350(key?.length ?? 0);351352this.enforceLimits(size);353354const tx = this.db.transaction(355(time, compress, encoding, raw, headers, key, size, ttl) => {356if (key !== undefined) {357// insert with key -- delete all previous messages, as they will358// never be needed again and waste space.359this.db.prepare("DELETE FROM messages WHERE key = ?").run(key);360}361return this.db362.prepare(363"INSERT INTO messages(time, compress, encoding, raw, headers, key, size, ttl) VALUES (?, ?, ?, ?, ?, ?, ?, ?) RETURNING seq",364)365.get(time / 1000, compress, encoding, raw, headers, key, size, ttl);366},367);368const row = tx(369time,370compressedRaw.compress,371encoding,372compressedRaw.raw,373serializedHeaders,374key,375size,376ttl,377);378const seq = Number((row as any).seq);379// lastInsertRowid - is a bigint from sqlite, but we won't hit that limit380this.emit("change", {381seq,382time,383key,384encoding,385raw,386headers,387msgID,388});389if (msgID !== undefined) {390this.msgIDs.set(msgID, { time, seq });391}392return { time, seq };393};394395get = ({396seq,397key,398}: { seq: number; key: undefined } | { seq: undefined; key: string }):399| StoredMessage400| undefined => {401let x;402if (seq) {403x = this.db404.prepare(405"SELECT seq, key, time, compress, encoding, raw, headers FROM messages WHERE seq=?",406)407.get(seq);408} else if (key != null) {409// NOTE: we guarantee when doing set above that there is at most one410// row with a given key. Also there's a unique constraint.411x = this.db412.prepare(413"SELECT seq, key, time, compress, encoding, raw, headers FROM messages WHERE key=?",414)415.get(key);416} else {417x = undefined;418}419return dbToMessage(x as any);420};421422*getAll({423start_seq,424end_seq,425}: {426end_seq?: number;427start_seq?: number;428} = {}): IterableIterator<StoredMessage> {429let query: string, stmt;430431const where: string[] = [];432const v: number[] = [];433if (start_seq != null) {434where.push("seq>=?");435v.push(start_seq);436}437if (end_seq != null) {438where.push("seq<=?");439v.push(end_seq);440}441query = `SELECT seq, key, time, compress, encoding, raw, headers FROM messages ${where.length == 0 ? "" : " where " + where.join(" AND ")} ORDER BY seq`;442stmt = this.db.prepare(query);443for (const row of stmt.iterate(...v)) {444yield dbToMessage(row)!;445}446}447448delete = ({449seq,450last_seq,451all,452}: {453seq?: number;454last_seq?: number;455all?: boolean;456}): { seqs: number[] } => {457let seqs: number[] = [];458if (all) {459seqs = this.db460.prepare("SELECT seq FROM messages")461.all()462.map((row: any) => row.seq);463this.db.prepare("DELETE FROM messages").run();464this.vacuum();465} else if (last_seq) {466seqs = this.db467.prepare("SELECT seq FROM messages WHERE seq<=?")468.all(last_seq)469.map((row: any) => row.seq);470this.db.prepare("DELETE FROM messages WHERE seq<=?").run(last_seq);471this.vacuum();472} else if (seq) {473seqs = this.db474.prepare("SELECT seq FROM messages WHERE seq=?")475.all(seq)476.map((row: any) => row.seq);477this.db.prepare("DELETE FROM messages WHERE seq=?").run(seq);478}479this.emit("change", { op: "delete", seqs });480return { seqs };481};482483vacuum = () => {484try {485this.db.prepare("VACUUM").run();486} catch {}487};488489get length(): number {490const { length } = this.db491.prepare("SELECT COUNT(*) AS length FROM messages")492.get() as { length: number };493return length;494}495496totalSize = (): number => {497return (498(this.db.prepare(`SELECT SUM(size) AS sum FROM messages`).get() as any)499.sum ?? 0500);501};502503seq = (): number => {504return (505(this.db.prepare(`SELECT MAX(seq) AS seq FROM messages`).get() as any)506.seq ?? 0507);508};509510inventory = (): PartialInventory => {511return {512bytes: this.totalSize(),513count: this.length,514limits: this.getConfig(),515seq: this.seq(),516};517};518519keys = (): string[] => {520const v = this.db521.prepare("SELECT key FROM messages WHERE key IS NOT NULL")522.all() as {523key: string;524}[];525return v.map(({ key }) => key);526};527528sqlite = (statement: string, params: any[] = []): any[] => {529// Matches "attach database" (case-insensitive, ignores whitespace)530if (/\battach\s+database\b/i.test(statement)) {531throw Error("ATTACH DATABASE not allowed");532}533const stmt = this.db.prepare(statement);534try {535return stmt.all(...params);536} catch (err) {537if (err.message.includes("run() instead")) {538stmt.run(...params);539return [];540} else {541throw err;542}543}544};545546// only returns fields that are not set to their default value,547// and doesn't enforce any limits548getConfig = (): Partial<Configuration> => {549const cur: any = {};550for (const { field, value } of this.db551.prepare("SELECT * FROM config")552.all() as any) {553const { def, fromDb } = CONFIGURATION[field];554cur[field] = fromDb(value);555if (cur[field] == def) {556delete cur[field];557}558}559return cur;560};561562config = (config?: Partial<Configuration>): Configuration => {563const cur: any = {};564for (const { field, value } of this.db565.prepare("SELECT * FROM config")566.all() as any) {567cur[field] = value;568}569const full: Partial<Configuration> = {};570for (const key in CONFIGURATION) {571const { def, fromDb, toDb } = CONFIGURATION[key];572full[key] =573config?.[key] ?? (cur[key] !== undefined ? fromDb(cur[key]) : def);574let x = toDb(full[key]);575if (config?.[key] != null && full[key] != (cur[key] ?? def)) {576// making a change577this.db578.prepare(579`INSERT INTO config (field, value) VALUES(?, ?) ON CONFLICT(field) DO UPDATE SET value=excluded.value`,580)581.run(key, x);582}583full[key] = fromDb(x);584if (585this.options.ephemeral &&586key == "max_bytes" &&587(full[key] == null || full[key] <= 0 || full[key] > EPHEMERAL_MAX_BYTES)588) {589// for ephemeral we always make it so max_bytes is capped590// (note -- this isn't explicitly set in the sqlite database, since we might591// change it, and by not setting it in the database we can)592full[key] = EPHEMERAL_MAX_BYTES;593}594}595this.conf = full as Configuration;596// ensure any new limits are enforced597this.enforceLimits(0);598return full as Configuration;599};600601private emitDelete = (rows) => {602if (rows.length > 0) {603const seqs = rows.map((row: { seq: number }) => row.seq);604this.emit("change", { op: "delete", seqs });605}606};607608// do whatever limit enforcement and throttling is needed when inserting one new message609// with the given size; if size=0 assume not actually inserting a new message, and just610// enforcingt current limits611private enforceLimits = (size: number = 0) => {612if (613size > 0 &&614(this.conf.max_msgs_per_second > 0 || this.conf.max_bytes_per_second > 0)615) {616const { msgs, bytes } = this.db617.prepare(618"SELECT COUNT(*) AS msgs, SUM(size) AS bytes FROM messages WHERE time >= ?",619)620.get(Date.now() / 1000 - 1) as { msgs: number; bytes: number };621if (622this.conf.max_msgs_per_second > 0 &&623msgs > this.conf.max_msgs_per_second624) {625throw new ConatError("max_msgs_per_second exceeded", {626code: "reject",627});628}629if (630this.conf.max_bytes_per_second > 0 &&631bytes > this.conf.max_bytes_per_second632) {633throw new ConatError("max_bytes_per_second exceeded", {634code: "reject",635});636}637}638639if (this.conf.max_msgs > -1) {640const length = this.length + (size > 0 ? 1 : 0);641if (length > this.conf.max_msgs) {642if (this.conf.discard_policy == "new") {643if (size > 0) {644throw new ConatError("max_msgs limit reached", { code: "reject" });645}646} else {647// delete earliest messages to make room648const rows = this.db649.prepare(650`DELETE FROM messages WHERE seq IN (SELECT seq FROM messages ORDER BY seq ASC LIMIT ?) RETURNING seq`,651)652.all(length - this.conf.max_msgs);653this.emitDelete(rows);654}655}656}657658if (this.conf.max_age > 0) {659const rows = this.db660.prepare(661`DELETE FROM messages WHERE seq IN (SELECT seq FROM messages WHERE time <= ?) RETURNING seq`,662)663.all((Date.now() - this.conf.max_age) / 1000);664this.emitDelete(rows);665}666667if (this.conf.max_bytes > -1) {668if (size > this.conf.max_bytes) {669if (this.conf.discard_policy == "new") {670if (size > 0) {671throw new ConatError("max_bytes limit reached", { code: "reject" });672}673} else {674// new message exceeds total, so this is the same as adding in the new message,675// then deleting everything.676this.delete({ all: true });677}678} else {679// delete all the earliest (in terms of seq number) messages680// so that the sum of the remaining681// sizes along with the new size is <= max_bytes.682// Only enforce if actually inserting, or if current sum is over683const totalSize = this.totalSize();684const newTotal = totalSize + size;685if (newTotal > this.conf.max_bytes) {686const bytesToFree = newTotal - this.conf.max_bytes;687let freed = 0;688let lastSeqToDelete: number | null = null;689690for (const { seq, size: msgSize } of this.db691.prepare(`SELECT seq, size FROM messages ORDER BY seq ASC`)692.iterate() as any) {693if (freed >= bytesToFree) break;694freed += msgSize;695lastSeqToDelete = seq;696}697698if (lastSeqToDelete !== null) {699if (this.conf.discard_policy == "new") {700if (size > 0) {701throw new ConatError("max_bytes limit reached", {702code: "reject",703});704}705} else {706const rows = this.db707.prepare(`DELETE FROM messages WHERE seq <= ? RETURNING seq`)708.all(lastSeqToDelete);709this.emitDelete(rows);710}711}712}713}714}715716if (this.conf.allow_msg_ttl) {717const rows = this.db718.prepare(719`DELETE FROM messages WHERE ttl IS NOT null AND time + ttl/1000 < ? RETURNING seq`,720)721.all(Date.now() / 1000);722this.emitDelete(rows);723}724725if (this.conf.max_msg_size > -1 && size > this.conf.max_msg_size) {726throw new ConatError(727`max_msg_size of ${this.conf.max_msg_size} bytes exceeded`,728{ code: "reject" },729);730}731};732}733734function dbToMessage(735x:736| {737seq: number;738key?: string;739time: number;740compress: CompressionAlgorithm;741encoding: DataEncoding;742raw: Buffer;743headers?: string;744}745| undefined,746): StoredMessage | undefined {747if (x === undefined) {748return x;749}750return {751seq: x.seq,752time: x.time * 1000,753key: x.key != null ? x.key : undefined,754encoding: x.encoding,755raw: handleDecompress(x),756headers: x.headers ? JSON.parse(x.headers) : undefined,757};758}759760function handleDecompress({761raw,762compress,763}: {764raw: Buffer;765compress: CompressionAlgorithm;766}) {767if (compress == CompressionAlgorithm.None) {768return raw;769} else if (compress == CompressionAlgorithm.Zstd) {770return decompress(raw);771} else {772throw Error(`unknown compression ${compress}`);773}774}775776interface CreateOptions extends StorageOptions {777noCache?: boolean;778}779780export const cache = refCacheSync<CreateOptions, PersistentStream>({781name: "persistent-storage-stream",782createKey: ({ path }: CreateOptions) => path,783createObject: (options: CreateOptions) => {784const pstream = new PersistentStream(options);785pstream.init();786return pstream;787},788});789790export function pstream(791options: StorageOptions & { noCache?: boolean },792): PersistentStream {793return cache(options);794}795796797