Path: blob/master/src/packages/conat/sync/core-stream.ts
1452 views
/*1core-stream.ts = the Core Stream data structure for conat.23This is the core data structure that easy-to-use ephemeral and persistent4streams and kv stores are built on. It is NOT meant to be super easy and5simple to use, with save in the background. Instead, operations6are async, and the API is complicated. We build dkv, dstream, akv, etc. on7top of this with a much friendly API.89NOTE: unlike in conat, in kv mode, the keys can be any utf-8 string.10We use the subject to track communication involving this stream, but11otherwise it has no relevant to the keys. Conat's core pub/sub/request/12reply model is very similar to NATS, but the analogue of Jetstream is13different because I don't find Jetstream useful at all, and find this14much more useful.1516DEVELOPMENT:1718~/cocalc/src/packages/backend$ node1920require('@cocalc/backend/conat'); a = require('@cocalc/conat/sync/core-stream'); s = await a.cstream({name:'test'})2122*/2324import { EventEmitter } from "events";25import {26Message,27type Headers,28messageData,29decode,30} from "@cocalc/conat/core/client";31import { isNumericString } from "@cocalc/util/misc";32import refCache from "@cocalc/util/refcache";33import { conat } from "@cocalc/conat/client";34import type { Client } from "@cocalc/conat/core/client";35import jsonStableStringify from "json-stable-stringify";36import type {37SetOperation,38DeleteOperation,39StoredMessage,40Configuration,41} from "@cocalc/conat/persist/storage";42export type { Configuration };43import { join } from "path";44import {45type StorageOptions,46type PersistStreamClient,47stream as persist,48type SetOptions,49} from "@cocalc/conat/persist/client";50import { reuseInFlight } from "@cocalc/util/reuse-in-flight";51import { until } from "@cocalc/util/async-utils";52import { type PartialInventory } from "@cocalc/conat/persist/storage";53import { getLogger } from "@cocalc/conat/client";5455const logger = getLogger("sync:core-stream");5657const PUBLISH_MANY_BATCH_SIZE = 500;5859const log = (..._args) => {};60//const log = console.log;6162// when this many bytes of key:value have been changed (so need to be freed),63// we do a garbage collection pass.64export const KEY_GC_THRESH = 10 * 1e6;6566// NOTE: when you do delete this.deleteKv(key), we ensure the previous67// messages with the given key is completely deleted from sqlite, and68// also create a *new* lightweight tombstone. That tombstone has this69// ttl, which defaults to DEFAULT_TOMBSTONE_TTL (one week), so the tombstone70// itself will be removed after 1 week. The tombstone is only needed for71// clients that go offline during the delete, then come back, and reply the72// partial log of what was missed. Such clients should reset if the73// offline time is longer than DEFAULT_TOMBSTONE_TTL.74// This only happens if allow_msg_ttl is configured to true, which is75// done with dkv, but not on by default otherwise.76export const DEFAULT_TOMBSTONE_TTL = 7 * 24 * 60 * 60 * 1000; // 1 week7778export interface RawMsg extends Message {79timestamp: number;80seq: number;81key?: string;82}8384export interface ChangeEvent<T> {85mesg?: T;86raw?: Partial<RawMsg>;87key?: string;88prev?: T;89msgID?: string;90}9192const HEADER_PREFIX = "CN-";9394export const COCALC_TOMBSTONE_HEADER = `${HEADER_PREFIX}Tombstone`;95export const COCALC_STREAM_HEADER = `${HEADER_PREFIX}Stream`;96export const COCALC_OPTIONS_HEADER = `${HEADER_PREFIX}Options`;9798export interface CoreStreamOptions {99// what it's called100name: string;101// where it is located -- this is who **owns the resource**, which102// may or may not being who is accessing it.103account_id?: string;104project_id?: string;105config?: Partial<Configuration>;106// only load historic messages starting at the given seq number.107start_seq?: number;108109ephemeral?: boolean;110111client?: Client;112113noCache?: boolean;114}115116export interface User {117account_id?: string;118project_id?: string;119}120121export function storagePath({122account_id,123project_id,124name,125}: User & { name: string }) {126let userPath;127if (account_id) {128userPath = `accounts/${account_id}`;129} else if (project_id) {130userPath = `projects/${project_id}`;131} else {132userPath = "hub";133}134return join(userPath, name);135}136137export class CoreStream<T = any> extends EventEmitter {138public readonly name: string;139140private configOptions?: Partial<Configuration>;141private _start_seq?: number;142143// don't do "this.raw=" or "this.messages=" anywhere in this class144// because dstream directly references the public raw/messages.145public readonly raw: RawMsg[] = [];146public readonly messages: T[] = [];147public readonly kv: { [key: string]: { mesg: T; raw: RawMsg } } = {};148private kvChangeBytes = 0;149150// this msgID's is ONLY used in ephemeral mode by the leader.151private readonly msgIDs = new Set<any>();152// lastSeq used by clients to keep track of what they have received; if one153// is skipped they reconnect starting with the last one they didn't miss.154private lastSeq: number = 0;155// IMPORTANT: user here means the *owner* of the resource, **NOT** the156// client who is accessing it! For example, a stream of edits of a file157// in a project has user {project_id} even if it is being accessed by158// an account.159private user: User;160private storage: StorageOptions;161private client?: Client;162private persistClient: PersistStreamClient;163164constructor({165name,166project_id,167account_id,168config,169start_seq,170ephemeral = false,171client,172}: CoreStreamOptions) {173super();174logger.debug("constructor", name);175if (client == null) {176throw Error("client must be specified");177}178this.client = client;179this.user = { account_id, project_id };180this.name = name;181this.storage = {182path: storagePath({ account_id, project_id, name }),183ephemeral,184};185this._start_seq = start_seq;186this.configOptions = config;187return new Proxy(this, {188get(target, prop) {189return typeof prop == "string" && isNumericString(prop)190? target.get(parseInt(prop))191: target[String(prop)];192},193});194}195196private initialized = false;197init = async () => {198if (this.initialized) {199throw Error("init can only be called once");200}201this.initialized = true;202if (this.client == null) {203this.client = await conat();204}205this.persistClient = persist({206client: this.client,207user: this.user,208storage: this.storage,209});210this.persistClient.on("error", (err) => {211if (!process.env.COCALC_TEST_MODE) {212console.log(`WARNING: persistent stream issue -- ${err}`);213}214});215await this.getAllFromPersist({216start_seq: this._start_seq,217noEmit: true,218});219220await until(221async () => {222if (this.client == null) {223return true;224}225try {226this.configOptions = await this.config(this.configOptions);227return true;228} catch (err) {229if (err.code == 403) {230// fatal permission error231throw err;232}233}234return false;235},236{ start: 750 },237);238239// NOTE: if we miss a message between getAllFromLeader and when we start listening,240// it will get filled in, due to sequence number tracking.241this.listen();242};243244config = async (245config: Partial<Configuration> = {},246): Promise<Configuration> => {247if (this.storage == null) {248throw Error("bug -- storage must be set");249}250return await this.persistClient.config({ config });251};252253close = () => {254logger.debug("close", this.name);255delete this.client;256this.removeAllListeners();257this.persistClient?.close();258// @ts-ignore259delete this.persistClient;260// @ts-ignore261delete this.kv;262// @ts-ignore263delete this.messages;264// @ts-ignore265delete this.raw;266// @ts-ignore267delete this.msgIDs;268// @ts-ignore269delete this.storage;270};271272inventory = async (): Promise<PartialInventory> => {273return await this.persistClient.inventory();274};275276// NOTE: It's assumed elsewhere that getAllFromPersist will not throw,277// and will keep retrying until (1) it works, or (2) self is closed,278// or (3) there is a fatal failure, e.g., lack of permissions.279private getAllFromPersist = async ({280start_seq = 0,281noEmit,282}: { start_seq?: number; noEmit?: boolean } = {}) => {283if (this.storage == null) {284throw Error("bug -- storage must be set");285}286await until(287async () => {288if (this.client == null) {289return true;290}291try {292// console.log("get persistent stream", { start_seq }, this.storage);293const sub = await this.persistClient.getAll({294start_seq,295});296// console.log("got sub", { noEmit });297while (true) {298const { value, done } = await sub.next();299if (done) {300return true;301}302const messages = value as StoredMessage[];303const seq = this.processPersistentMessages(messages, {304noEmit,305noSeqCheck: true,306});307if (seq != null) {308// we update start_seq in case we need to try again309start_seq = seq! + 1;310}311}312} catch (err) {313if (err.code == 403) {314// fatal permission error315throw err;316}317if (err.code == 429) {318// too many users319throw err;320}321if (!process.env.COCALC_TEST_MODE) {322console.log(323`WARNING: getAllFromPersist - failed -- ${err}, code=${err.code}`,324);325}326}327return false;328},329{ start: 750 },330);331};332333private processPersistentMessages = (334messages: (SetOperation | DeleteOperation | StoredMessage)[],335opts: { noEmit?: boolean; noSeqCheck?: boolean },336) => {337// console.log("processPersistentMessages", messages.length, " messages");338if (this.raw === undefined) {339// closed340return;341}342let seq = undefined;343for (const mesg of messages) {344try {345this.processPersistentMessage(mesg, opts);346if (mesg["seq"] != null) {347seq = mesg["seq"];348}349} catch (err) {350console.log("WARNING: issue processing message", mesg, err);351}352}353return seq;354};355356private processPersistentMessage = (357mesg: SetOperation | DeleteOperation | StoredMessage,358opts: { noEmit?: boolean; noSeqCheck?: boolean },359) => {360if ((mesg as DeleteOperation).op == "delete") {361this.processPersistentDelete(mesg as DeleteOperation, opts);362} else {363// set is the default364this.processPersistentSet(mesg as SetOperation, opts);365}366};367368private processPersistentDelete = (369{ seqs }: DeleteOperation,370{ noEmit }: { noEmit?: boolean },371) => {372if (this.raw == null) return;373//console.log("processPersistentDelete", seqs);374const X = new Set<number>(seqs);375// seqs is a list of integers. We remove376// every entry from this.raw, this.messages, and this.kv377// where this.raw.seq is in X by mutating raw/messages/kv,378// not by making new objects (since external references).379// This is a rare operation so we're not worried too much380// about performance.381const keys: { [seq: number]: string } = {};382for (const key in this.kv) {383const seq = this.kv[key]?.raw?.seq;384if (X.has(seq)) {385delete this.kv[key];386keys[key] = seq;387}388}389const indexes: number[] = [];390for (let i = 0; i < this.raw.length; i++) {391const seq = this.raw[i].seq;392if (X.has(seq)) {393indexes.push(i);394if (!noEmit) {395this.emitChange({396mesg: undefined,397raw: { seq },398key: keys[seq],399prev: this.messages[i],400});401}402}403}404405//console.log({ indexes, seqs, noEmit });406// remove this.raw[i] and this.messages[i] for all i in indexes,407// with special case to be fast in the very common case of contiguous indexes.408if (indexes.length > 1 && indexes.every((v, i) => v === indexes[0] + i)) {409// Contiguous: bulk remove for performance410const start = indexes[0];411const deleteCount = indexes.length;412this.raw.splice(start, deleteCount);413this.messages.splice(start, deleteCount);414} else {415// Non-contiguous: fallback to individual reverse splices416for (let i = indexes.length - 1; i >= 0; i--) {417const idx = indexes[i];418this.raw.splice(idx, 1);419this.messages.splice(idx, 1);420}421}422};423424private processPersistentSetLargestSeq: number = 0;425private missingMessages = new Set<number>();426private processPersistentSet = (427{ seq, time, key, encoding, raw: data, headers, msgID }: SetOperation,428{429noEmit,430noSeqCheck,431}: {432noEmit?: boolean;433noSeqCheck?: boolean;434},435) => {436if (this.raw == null) return;437if (!noSeqCheck && this.processPersistentSetLargestSeq > 0) {438const expected = this.processPersistentSetLargestSeq + 1;439if (seq > expected) {440log(441"processPersistentSet -- detected missed seq number",442{ seq, expected: this.processPersistentSetLargestSeq + 1 },443this.storage,444);445// We record that some are missing.446for (let s = expected; s <= seq - 1; s++) {447this.missingMessages.add(s);448this.getAllMissingMessages();449}450}451}452453if (seq > this.processPersistentSetLargestSeq) {454this.processPersistentSetLargestSeq = seq;455}456457const mesg = decode({ encoding, data });458// console.log("processPersistentSet", seq, mesg)459const raw = {460timestamp: time,461headers,462seq,463raw: data,464key,465} as RawMsg;466if (seq > (this.raw.slice(-1)[0]?.seq ?? 0)) {467// easy fast initial load to the end of the list (common special case)468this.messages.push(mesg);469this.raw.push(raw);470} else {471// [ ] TODO: insert in the correct place. This should only472// happen when calling load of old ata. The algorithm below is473// dumb and could be replaced by a binary search. However, we'll474// change how we batch load so there's less point.475let i = 0;476while (i < this.raw.length && this.raw[i].seq < seq) {477i += 1;478}479this.raw.splice(i, 0, raw);480this.messages.splice(i, 0, mesg);481}482let prev: T | undefined = undefined;483if (typeof key == "string") {484prev = this.kv[key]?.mesg;485if (raw.headers?.[COCALC_TOMBSTONE_HEADER]) {486delete this.kv[key];487} else {488if (this.kv[key] !== undefined) {489const { raw } = this.kv[key];490this.kvChangeBytes += raw.raw.length;491}492493this.kv[key] = { raw, mesg };494495if (this.kvChangeBytes >= KEY_GC_THRESH) {496this.gcKv();497}498}499}500this.lastSeq = Math.max(this.lastSeq, seq);501if (!noEmit) {502this.emitChange({ mesg, raw, key, prev, msgID });503}504};505506private emitChange = (e: ChangeEvent<T>) => {507if (this.raw == null) return;508this.emit("change", e);509};510511private listen = async () => {512log("core-stream: listen", this.storage);513await until(514async () => {515if (this.client == null) {516return true;517}518try {519log("core-stream: START listening on changefeed", this.storage);520const changefeed = await this.persistClient.changefeed();521// console.log("listening on the changefeed...", this.storage);522for await (const updates of changefeed) {523// console.log("changefeed", this.storage, updates);524log("core-stream: process updates", updates, this.storage);525if (this.client == null) return true;526this.processPersistentMessages(updates, {527noEmit: false,528noSeqCheck: false,529});530}531// console.log("DONE listening on the changefeed...", this.storage);532} catch (err) {533// console.log("error listening on the changefeed...");534// This normally doesn't happen but could if a persist server is being restarted535// frequently or things are seriously broken. We cause this in536// backend/conat/test/core/core-stream-break.test.ts537if (!process.env.COCALC_TEST_MODE) {538log(539`WARNING: core-stream changefeed error -- ${err}`,540this.storage,541);542}543}544log("core-stream: STOP listening on changefeed", this.storage);545// above loop exits when the persistent server546// stops sending messages for some reason. In that547// case we reconnect, picking up where we left off:548if (this.client == null) return true;549log(550"core-stream: get missing from when changefeed ended",551this.storage,552);553await this.getAllFromPersist({554start_seq: this.lastSeq + 1,555noEmit: false,556});557return false;558},559{ start: 500, max: 7500, decay: 1.2 },560);561};562563publish = async (564mesg: T,565options?: PublishOptions,566): Promise<{ seq: number; time: number } | undefined> => {567if (mesg === undefined) {568if (options?.key !== undefined) {569// undefined can't be JSON encoded, so we can't possibly represent it, and this570// *must* be treated as a delete.571this.deleteKv(options?.key, { previousSeq: options?.previousSeq });572return;573} else {574throw Error("stream non-kv publish - mesg must not be 'undefined'");575}576}577578if (options?.msgID && this.msgIDs.has(options.msgID)) {579// it's a dup580return;581}582const md = messageData(mesg, { headers: options?.headers });583const x = await this.persistClient.set({584key: options?.key,585messageData: md,586previousSeq: options?.previousSeq,587msgID: options?.msgID,588ttl: options?.ttl,589timeout: options?.timeout,590});591if (options?.msgID) {592this.msgIDs?.add(options.msgID);593}594return x;595};596597publishMany = async (598messages: { mesg: T; options?: PublishOptions }[],599): Promise<600({ seq: number; time: number } | { error: string; code?: any })[]601> => {602let result: (603| { seq: number; time: number }604| { error: string; code?: any }605)[] = [];606607for (let i = 0; i < messages.length; i += PUBLISH_MANY_BATCH_SIZE) {608const batch = messages.slice(i, i + PUBLISH_MANY_BATCH_SIZE);609result = result.concat(await this.publishMany0(batch));610}611612return result;613};614615private publishMany0 = async (616messages: { mesg: T; options?: PublishOptions }[],617): Promise<618({ seq: number; time: number } | { error: string; code?: any })[]619> => {620const v: SetOptions[] = [];621let timeout: number | undefined = undefined;622for (const { mesg, options } of messages) {623if (options?.timeout) {624if (timeout === undefined) {625timeout = options.timeout;626} else {627timeout = Math.min(timeout, options.timeout);628}629}630const md = messageData(mesg, { headers: options?.headers });631v.push({632key: options?.key,633messageData: md,634previousSeq: options?.previousSeq,635msgID: options?.msgID,636ttl: options?.ttl,637});638}639return await this.persistClient.setMany(v, { timeout });640};641642get = (n?): T | T[] => {643if (n == null) {644return this.getAll();645} else {646return this.messages[n];647}648};649650seq = (n: number): number | undefined => {651return this.raw[n]?.seq;652};653654getAll = (): T[] => {655return [...this.messages];656};657658get length(): number {659return this.messages.length;660}661662get start_seq(): number | undefined {663return this._start_seq;664}665666headers = (n: number): { [key: string]: any } | undefined => {667return this.raw[n]?.headers;668};669670// key:value interface for subset of messages pushed with key option set.671// NOTE: This does NOT throw an error if our local seq is out of date (leave that672// to dkv built on this).673setKv = async (674key: string,675mesg: T,676options?: {677headers?: Headers;678previousSeq?: number;679},680): Promise<{ seq: number; time: number } | undefined> => {681return await this.publish(mesg, { ...options, key });682};683684setKvMany = async (685x: {686key: string;687mesg: T;688options?: {689headers?: Headers;690previousSeq?: number;691};692}[],693): Promise<694({ seq: number; time: number } | { error: string; code?: any })[]695> => {696const messages: { mesg: T; options?: PublishOptions }[] = [];697for (const { key, mesg, options } of x) {698messages.push({ mesg, options: { ...options, key } });699}700return await this.publishMany(messages);701};702703deleteKv = async (704key: string,705options?: {706msgID?: string;707previousSeq?: number;708},709) => {710if (this.kv[key] === undefined) {711// nothing to do712return;713}714return await this.publish(null as any, {715...options,716headers: { [COCALC_TOMBSTONE_HEADER]: true },717key,718ttl: DEFAULT_TOMBSTONE_TTL,719});720};721722getKv = (key: string): T | undefined => {723return this.kv[key]?.mesg;724};725726hasKv = (key: string): boolean => {727return this.kv?.[key] !== undefined;728};729730getAllKv = (): { [key: string]: T } => {731const all: { [key: string]: T } = {};732for (const key in this.kv) {733all[key] = this.kv[key].mesg;734}735return all;736};737738// efficient way to get just the keys -- use this instead of739// getAllKv if you just need the keys.740keysKv = (): string[] => {741return Object.keys(this.kv);742};743744seqKv = (key: string): number | undefined => {745return this.kv[key]?.raw.seq;746};747748timeKv = (key?: string): Date | { [key: string]: Date } | undefined => {749if (key === undefined) {750const all: { [key: string]: Date } = {};751for (const key in this.kv) {752all[key] = new Date(this.kv[key].raw.timestamp);753}754return all;755}756const r = this.kv[key]?.raw;757if (r == null) {758return;759}760return new Date(r.timestamp);761};762763headersKv = (key: string): { [key: string]: any } | undefined => {764return this.kv[key]?.raw?.headers;765};766767get lengthKv(): number {768return Object.keys(this.kv).length;769}770771// load older messages starting at start_seq up to the oldest message772// we currently have.773load = async ({774start_seq,775noEmit,776}: {777start_seq: number;778noEmit?: boolean;779}) => {780// This is used for loading more TimeTravel history781if (this.storage == null) {782throw Error("bug");783}784// this is one before the oldest we have785const end_seq = (this.raw[0]?.seq ?? this._start_seq ?? 1) - 1;786if (start_seq > end_seq) {787// nothing to load788return;789}790// we're moving start_seq back to this point791this._start_seq = start_seq;792const sub = await this.persistClient.getAll({793start_seq,794end_seq,795});796for await (const updates of sub) {797this.processPersistentMessages(updates, { noEmit, noSeqCheck: true });798}799};800801private getAllMissingMessages = reuseInFlight(async () => {802await until(803async () => {804if (this.client == null || this.missingMessages.size == 0) {805return true;806}807try {808const missing = Array.from(this.missingMessages);809missing.sort();810log("core-stream: getMissingSeq", missing, this.storage);811const sub = await this.persistClient.getAll({812start_seq: missing[0],813end_seq: missing[missing.length - 1],814});815for await (const updates of sub) {816this.processPersistentMessages(updates, {817noEmit: false,818noSeqCheck: true,819});820}821for (const seq of missing) {822this.missingMessages.delete(seq);823}824} catch (err) {825log(826"core-stream: WARNING -- issue getting missing updates",827err,828this.storage,829);830}831return false;832},833{ start: 1000, max: 15000, decay: 1.3 },834);835});836837// get server assigned time of n-th message in stream838time = (n: number): Date | undefined => {839const r = this.raw[n];840if (r == null) {841return;842}843return new Date(r.timestamp);844};845846times = () => {847const v: (Date | undefined)[] = [];848for (let i = 0; i < this.length; i++) {849v.push(this.time(i));850}851return v;852};853854stats = ({855start_seq = 1,856}: {857start_seq?: number;858} = {}): { count: number; bytes: number } | undefined => {859if (this.raw == null) {860return;861}862let count = 0;863let bytes = 0;864for (const { raw, seq } of this.raw) {865if (seq == null) {866continue;867}868if (seq < start_seq) {869continue;870}871count += 1;872bytes += raw.length;873}874return { count, bytes };875};876877// delete all messages up to and including the878// one at position index, i.e., this.messages[index]879// is deleted.880// NOTE: For ephemeral streams, clients will NOT see the result of a delete,881// except when they load the stream later. For persistent streams all882// **connected** clients will see the delete. THAT said, this is not a "proper"883// distributed computing primitive with tombstones, etc. This is primarily884// meant for reducing space usage, and shouldn't be relied on for885// any other purpose.886delete = async ({887all,888last_index,889seq,890last_seq,891key,892}: {893// give exactly ONE parameter -- by default nothing happens with no params894// all: delete everything895all?: boolean;896// last_index: everything up to and including index'd message897last_index?: number;898// seq: delete message with this sequence number899seq?: number;900// last_seq: delete everything up to and including this sequence number901last_seq?: number;902// key: delete the message with this key903key?: string;904} = {}): Promise<{ seqs: number[] }> => {905let opts;906if (all) {907opts = { all: true };908} else if (last_index != null) {909if (last_index >= this.raw.length) {910opts = { all: true };911} else if (last_index < 0) {912return { seqs: [] };913} else {914const last_seq = this.raw[last_index].seq;915if (last_seq === undefined) {916throw Error(`BUG: invalid index ${last_index}`);917}918opts = { last_seq };919}920} else if (seq != null) {921opts = { seq };922} else if (last_seq != null) {923opts = { last_seq };924} else if (key != null) {925const seq = this.kv[key]?.raw?.seq;926if (seq === undefined) {927return { seqs: [] };928}929opts = { seq };930}931return await this.persistClient.delete(opts);932};933934// delete messages that are no longer needed since newer values have been written935gcKv = () => {936this.kvChangeBytes = 0;937for (let i = 0; i < this.raw.length; i++) {938const key = this.raw[i].key;939if (key !== undefined) {940if (this.raw[i].raw.length > 0 && this.raw[i] !== this.kv[key].raw) {941this.raw[i] = {942...this.raw[i],943headers: undefined,944raw: Buffer.from(""),945} as RawMsg;946this.messages[i] = undefined as T;947}948}949}950};951}952953export interface PublishOptions {954// headers for this message955headers?: Headers;956// unique id for this message to dedup so if you send the same957// message more than once with the same id it doesn't get published958// multiple times.959msgID?: string;960// key -- if specified a key field is also stored on the server,961// and any previous messages with the same key are deleted. Also,962// an entry is set in this.kv[key] so that this.getKv(key), etc. work.963key?: string;964// if key is specified and previousSeq is set, the server throws965// an error if the sequence number of the current key is966// not previousSeq. We use this with this.seqKv(key) to967// provide read/change/write semantics and to know when we968// should resovle a merge conflict. This is ignored if969// key is not specified.970previousSeq?: number;971// if set to a number of ms AND the config option allow_msg_ttl972// is set on this persistent stream, then973// this message will be deleted after the given amount of time (in ms).974ttl?: number;975timeout?: number;976}977978export const cache = refCache<CoreStreamOptions, CoreStream>({979name: "core-stream",980createObject: async (options: CoreStreamOptions) => {981if (options.client == null) {982options = { ...options, client: await conat() };983}984const cstream = new CoreStream(options);985await cstream.init();986return cstream;987},988createKey: ({ client, ...options }) => {989return jsonStableStringify(options)!;990},991});992993export async function cstream<T>(994options: CoreStreamOptions,995): Promise<CoreStream<T>> {996return await cache(options);997}9989991000