Path: blob/master/src/packages/sync/table/changefeed-conat.ts
1447 views
/*1* This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.2* License: MS-RSL – see LICENSE.md for details3*/45import { EventEmitter } from "events";6import { changefeed, type Changefeed } from "@cocalc/conat/hub/changefeeds";7import { conat } from "@cocalc/conat/client";89// low level debugging of changefeeds10const LOW_LEVEL_DEBUG = false;11const log = LOW_LEVEL_DEBUG12? (...args) => {13console.log("changefeed: ", ...args);14}15: (..._args) => {};1617export class ConatChangefeed extends EventEmitter {18private account_id: string;19private query;20private options;21private state: "disconnected" | "connected" | "closed" = "disconnected";22private cf?: Changefeed;2324constructor({25account_id,26query,27options,28}: {29account_id: string;30query;31options?;32}) {33super();34this.account_id = account_id;35this.query = query;36this.options = options;37}3839log = (...args) => {40if (!LOW_LEVEL_DEBUG) return;41log(this.query, ...args);42};4344connect = async () => {45this.log("connecting...");46this.cf = changefeed({47client: await conat(),48account_id: this.account_id,49query: this.query,50options: this.options,51});52const { value, done } = await this.cf.next();53if (done) {54this.log("closed before receiving any values");55this.close();56return;57}58this.log("connected");59this.state = "connected";60this.watch();61return value[Object.keys(value)[0]];62};6364close = (): void => {65this.log("close");66if (this.state == "closed") {67return;68}69this.cf?.close();70delete this.cf;71this.state = "closed";72this.emit("close"); // yes "close" not "closed" ;-(73};7475get_state = (): string => {76return this.state;77};7879private watch = async () => {80if (this.cf == null || this.state == "closed") {81return;82}83try {84for await (const x of this.cf) {85// this.log("got message ", x);86// @ts-ignore87if (this.state == "closed") {88return;89}90this.emit("update", x);91}92} catch (err) {93this.log("got error", err);94}95this.log("watch ended", this.query);96this.close();97};98}99100101