Path: blob/master/src/packages/conat/hub/changefeeds/client.ts
1453 views
import { type Client } from "@cocalc/conat/core/client";1import { EventIterator } from "@cocalc/util/event-iterator";2import { getLogger } from "@cocalc/conat/client";3import { SERVICE, CLIENT_KEEPALIVE, KEEPALIVE_TIMEOUT } from "./util";4import { ConatError } from "@cocalc/conat/core/client";56const logger = getLogger("hub:changefeeds:client");78type Update = any;910function changefeedSubject({ account_id }: { account_id: string }) {11return `${SERVICE}.account-${account_id}`;12}1314export type Changefeed = EventIterator<{ error?: string; update: Update }>;1516export function changefeed({17query,18options,19client,20account_id,21}: {22query: object;23options?: object[];24client: Client;25account_id: string;26}) {27const table = Object.keys(query)[0];28const socket = client.socket.connect(changefeedSubject({ account_id }), {29reconnection: false,30keepAlive: CLIENT_KEEPALIVE,31keepAliveTimeout: KEEPALIVE_TIMEOUT,32desc: `postgresql-changefeed-${table}`,33});34logger.debug("creating changefeed", { table, options });35socket.write({ query, options });36const cf = new EventIterator<{ error?: string; update: Update }>(37socket,38"data",39{40map: (args) => {41const { error, code, update } = args[0] ?? {};42if (error) {43// console.log("changefeed: error returned from server, query");44throw new ConatError(error, { code });45} else {46return update;47}48},49onEnd: () => {50// console.log("changefeed: onEnd", query);51socket.close();52},53},54);55socket.on("closed", () => {56cf.throw(Error("closed"));57});58socket.on("disconnected", () => {59// console.log("changefeed: disconnected", query, socket.subject, socket.id);60cf.throw(Error("disconnected"));61});62return cf;63}646566