Path: blob/master/src/packages/conat/sync/dstream.ts
1452 views
/*1Eventually Consistent Distributed Message Stream23DEVELOPMENT:456# in node -- note the package directory!!7~/cocalc/src/packages/backend node89> s = await require("@cocalc/backend/conat/sync").dstream({name:'test'});10> s = await require("@cocalc/backend/conat/sync").dstream({project_id:cc.current().project_id,name:'foo'});01112See the guide for dkv, since it's very similar, especially for use in a browser.13*/1415import { EventEmitter } from "events";16import {17CoreStream,18type RawMsg,19type ChangeEvent,20type PublishOptions,21} from "./core-stream";22import { randomId } from "@cocalc/conat/names";23import { reuseInFlight } from "@cocalc/util/reuse-in-flight";24import { isNumericString } from "@cocalc/util/misc";25import refCache from "@cocalc/util/refcache";26import {27type Client,28type Headers,29ConatError,30} from "@cocalc/conat/core/client";31import jsonStableStringify from "json-stable-stringify";32import type { JSONValue } from "@cocalc/util/types";33import { Configuration } from "./core-stream";34import { conat } from "@cocalc/conat/client";35import { delay, map as awaitMap } from "awaiting";36import { asyncThrottle, until } from "@cocalc/util/async-utils";37import {38inventory,39type Inventory,40INVENTORY_UPDATE_INTERVAL,41} from "./inventory";42import { getLogger } from "@cocalc/conat/client";4344const logger = getLogger("sync:dstream");4546export interface DStreamOptions {47// what it's called by us48name: string;49account_id?: string;50project_id?: string;51config?: Partial<Configuration>;52// only load historic messages starting at the given seq number.53start_seq?: number;54desc?: JSONValue;5556client?: Client;57noAutosave?: boolean;58ephemeral?: boolean;5960noCache?: boolean;61noInventory?: boolean;62}6364export class DStream<T = any> extends EventEmitter {65public readonly name: string;66private stream: CoreStream;67private messages: T[];68private raw: RawMsg[];69private noAutosave: boolean;70// TODO: using Map for these will be better because we use .length a bunch, which is O(n) instead of O(1).71private local: { [id: string]: T } = {};72private publishOptions: {73[id: string]: { headers?: Headers };74} = {};75private saved: { [seq: number]: T } = {};76private opts: DStreamOptions;7778constructor(opts: DStreamOptions) {79super();80logger.debug("constructor", opts.name);81if (opts.client == null) {82throw Error("client must be specified");83}84this.opts = opts;85this.noAutosave = !!opts.noAutosave;86this.name = opts.name;87this.stream = new CoreStream(opts);88this.messages = this.stream.messages;89this.raw = this.stream.raw;90return new Proxy(this, {91get(target, prop) {92return typeof prop == "string" && isNumericString(prop)93? target.get(parseInt(prop))94: target[String(prop)];95},96});97}9899private initialized = false;100init = async () => {101if (this.initialized) {102throw Error("init can only be called once");103}104this.initialized = true;105if (this.isClosed()) {106throw Error("closed");107}108this.stream.on("change", this.handleChange);109this.stream.on("reset", () => {110this.local = {};111this.saved = {};112});113await this.stream.init();114this.emit("connected");115};116117private handleChange = ({ mesg, raw, msgID }: ChangeEvent<T>) => {118if (raw?.seq !== undefined) {119delete this.saved[raw.seq];120}121if (mesg === undefined) {122return;123}124if (msgID) {125// this is critical with core-stream.ts, since otherwise there is a moment126// when the same message is in both this.local *and* this.messages, and you'll127// see it doubled in this.getAll().128delete this.local[msgID];129}130this.emit("change", mesg, raw?.seq);131if (this.isStable()) {132this.emit("stable");133}134};135136isStable = () => {137for (const _ in this.saved) {138return false;139}140for (const _ in this.local) {141return false;142}143return true;144};145146isClosed = () => {147return this.stream == null;148};149150close = () => {151if (this.isClosed()) {152return;153}154logger.debug("close", this.name);155const stream = this.stream;156stream.removeListener("change", this.handleChange);157// @ts-ignore158delete this.stream;159stream.close();160this.emit("closed");161this.removeAllListeners();162// @ts-ignore163delete this.local;164// @ts-ignore165delete this.messages;166// @ts-ignore167delete this.raw;168// @ts-ignore169delete this.opts;170};171172get = (n?): T | T[] => {173if (this.isClosed()) {174throw Error("closed");175}176if (n == null) {177return this.getAll();178} else {179if (n < this.messages.length) {180return this.messages[n];181}182const v = Object.keys(this.saved);183if (n < v.length + this.messages.length) {184return this.saved[n - this.messages.length];185}186return Object.values(this.local)[n - this.messages.length - v.length];187}188};189190getAll = (): T[] => {191if (this.isClosed()) {192throw Error("closed");193}194return [195...this.messages,196...Object.values(this.saved),197...Object.values(this.local),198];199};200201// sequence number of n-th message202seq = (n: number): number | undefined => {203if (n < this.raw.length) {204return this.raw[n].seq;205}206const v = Object.keys(this.saved);207if (n < v.length + this.raw.length) {208return parseInt(v[n - this.raw.length]);209}210};211212time = (n: number): Date | undefined => {213if (this.isClosed()) {214throw Error("not initialized");215}216return this.stream.time(n);217};218219// all server assigned times of messages in the stream.220times = (): (Date | undefined)[] => {221if (this.isClosed()) {222throw Error("not initialized");223}224return this.stream.times();225};226227get length(): number {228return (229this.messages.length +230Object.keys(this.saved).length +231Object.keys(this.local).length232);233}234235publish = (236mesg: T,237// NOTE: if you call this.headers(n) it is NOT visible until238// the publish is confirmed. This could be changed with more work if it matters.239options?: { headers?: Headers; ttl?: number },240): void => {241const id = randomId();242this.local[id] = mesg;243if (options != null) {244this.publishOptions[id] = options;245}246if (!this.noAutosave) {247this.save();248}249this.updateInventory();250};251252headers = (n) => {253if (this.isClosed()) {254throw Error("closed");255}256return this.stream.headers(n);257};258259push = (...args: T[]) => {260if (this.isClosed()) {261throw Error("closed");262}263for (const mesg of args) {264this.publish(mesg);265}266};267268hasUnsavedChanges = (): boolean => {269if (this.isClosed()) {270return false;271}272return Object.keys(this.local).length > 0;273};274275unsavedChanges = (): T[] => {276return Object.values(this.local);277};278279save = reuseInFlight(async () => {280await until(281async () => {282if (this.isClosed()) {283return true;284}285try {286await this.attemptToSave();287//console.log("successfully saved");288} catch (err) {289if (false && !process.env.COCALC_TEST_MODE) {290console.log(291`WARNING: dstream attemptToSave failed - ${err}`,292this.name,293);294}295}296return !this.hasUnsavedChanges();297},298{ start: 150, decay: 1.3, max: 10000 },299);300});301302private attemptToSave = async () => {303if (true) {304await this.attemptToSaveBatch();305} else {306await this.attemptToSaveParallel();307}308};309310private attemptToSaveBatch = reuseInFlight(async () => {311if (this.isClosed()) {312throw Error("closed");313}314const v: { mesg: T; options: PublishOptions }[] = [];315const ids = Object.keys(this.local);316for (const id of ids) {317const mesg = this.local[id];318const options = {319...this.publishOptions[id],320msgID: id,321};322v.push({ mesg, options });323}324const w: (325| { seq: number; time: number; error?: undefined }326| { error: string; code?: any }327)[] = await this.stream.publishMany(v);328329if (this.isClosed()) {330return;331}332333let errors = false;334for (let i = 0; i < w.length; i++) {335const id = ids[i];336if (w[i].error) {337const x = w[i] as { error: string; code?: any };338if (x.code == "reject") {339delete this.local[id];340const err = new ConatError(x.error, { code: x.code });341// err has mesg and subject set.342this.emit("reject", { err, mesg: v[i].mesg });343}344if (!process.env.COCALC_TEST_MODE) {345console.warn(346`WARNING -- error saving dstream '${this.name}' -- ${w[i].error}`,347);348}349errors = true;350continue;351}352const { seq } = w[i] as { seq: number };353if ((this.raw[this.raw.length - 1]?.seq ?? -1) < seq) {354// it still isn't in this.raw355this.saved[seq] = v[i].mesg;356}357delete this.local[id];358delete this.publishOptions[id];359}360if (errors) {361throw Error(`there were errors saving dstream '${this.name}'`);362}363});364365// non-batched version366private attemptToSaveParallel = reuseInFlight(async () => {367const f = async (id) => {368if (this.isClosed()) {369throw Error("closed");370}371const mesg = this.local[id];372try {373// @ts-ignore374const { seq } = await this.stream.publish(mesg, {375...this.publishOptions[id],376msgID: id,377});378if (this.isClosed()) {379return;380}381if ((this.raw[this.raw.length - 1]?.seq ?? -1) < seq) {382// it still isn't in this.raw383this.saved[seq] = mesg;384}385delete this.local[id];386delete this.publishOptions[id];387} catch (err) {388if (err.code == "reject") {389delete this.local[id];390// err has mesg and subject set.391this.emit("reject", { err, mesg });392} else {393if (!process.env.COCALC_TEST_MODE) {394console.warn(395`WARNING: problem saving dstream ${this.name} -- ${err}`,396);397}398}399}400if (this.isStable()) {401this.emit("stable");402}403};404// NOTE: ES6 spec guarantees "String keys are returned in the order405// in which they were added to the object."406const ids = Object.keys(this.local);407const MAX_PARALLEL = 50;408await awaitMap(ids, MAX_PARALLEL, f);409});410411// load older messages starting at start_seq412load = async (opts: { start_seq: number }) => {413if (this.isClosed()) {414throw Error("closed");415}416await this.stream.load(opts);417};418419// this is not synchronous -- it makes sure everything is saved out,420// then delete the persistent stream421// NOTE: for ephemeral streams, other clients will NOT see the result of a delete (unless they reconnect).422delete = async (opts?) => {423await this.save();424if (this.isClosed()) {425throw Error("closed");426}427return await this.stream.delete(opts);428};429430get start_seq(): number | undefined {431return this.stream?.start_seq;432}433434// get or set config435config = async (436config: Partial<Configuration> = {},437): Promise<Configuration> => {438if (this.isClosed()) {439throw Error("closed");440}441return await this.stream.config(config);442};443444private updateInventory = asyncThrottle(445async () => {446if (this.isClosed() || this.opts == null || this.opts.noInventory) {447return;448}449await delay(500);450if (this.isClosed()) {451return;452}453let inv: Inventory | undefined = undefined;454try {455const { account_id, project_id, desc } = this.opts;456const inv = await inventory({ account_id, project_id });457if (this.isClosed()) {458return;459}460const status = {461type: "stream" as "stream",462name: this.opts.name,463desc,464...(await this.stream.inventory()),465};466inv.set(status);467} catch (err) {468if (!process.env.COCALC_TEST_MODE) {469console.log(470`WARNING: unable to update inventory. name='${this.opts.name} -- ${err}'`,471);472}473} finally {474// @ts-ignore475inv?.close();476}477},478INVENTORY_UPDATE_INTERVAL,479{ leading: true, trailing: true },480);481}482483export const cache = refCache<DStreamOptions, DStream>({484name: "dstream",485createKey: (options: DStreamOptions) => {486if (!options.name) {487throw Error("name must be specified");488}489const { name, account_id, project_id } = options;490return jsonStableStringify({ name, account_id, project_id })!;491},492createObject: async (options: DStreamOptions) => {493if (options.client == null) {494options = { ...options, client: await conat() };495}496const dstream = new DStream(options);497await dstream.init();498return dstream;499},500});501502export async function dstream<T>(options: DStreamOptions): Promise<DStream<T>> {503return await cache(options);504}505506507