Path: blob/master/src/packages/sync/table/changefeed.ts
1447 views
/*1* This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.2* License: MS-RSL – see LICENSE.md for details3*/45import { EventEmitter } from "events";6import { callback, delay } from "awaiting";7import { close } from "@cocalc/util/misc";89export type State = "closed" | "disconnected" | "connecting" | "connected";1011export class Changefeed extends EventEmitter {12private query: any;13private do_query: Function;14private query_cancel: Function;15private state: State = "disconnected";16private table: string;17private id: string;18private options: any;19private handle_update_queue: { err?: any; resp?: any }[] = [];2021constructor({22do_query,23query_cancel,24options,25query,26table,27}: {28do_query: Function;29query_cancel: Function;30options: any;31table: string;32query: any;33}) {34super();35this.do_query = do_query;36this.query_cancel = query_cancel;37this.query = query;38this.options = options;39this.table = table;40}4142// Query for state of the table, connects to the43// changefeed, and return the initial state44// of the table. Throws an exception if anything45// goes wrong.46connect = async () => {47if (this.state != "disconnected") {48throw Error(49`can only connect if state is 'disconnected' but it is ${this.state}`,50);51}52this.state = "connecting";53const resp = await callback(this.run_the_query.bind(this));54if (this.state === ("closed" as State)) {55throw Error("after running query, changefeed state is 'closed'");56}57if (resp.event === "query_cancel") {58throw Error("query-cancel");59}60if (resp.query == null || resp.query[this.table] == null) {61throw Error(`${this.table} changefeed init -- no error and no data`);62}63// Successfully completed query64this.id = resp.id;65this.state = "connected";66this.process_queue_next_tick();67return resp.query[this.table];68};6970close = (): void => {71this.state = "closed";72if (this.id != null) {73// stop listening for future updates74this.cancel_query(this.id);75}76this.emit("close");77this.removeAllListeners();78close(this);79this.state = "closed";80};8182get_state = (): string => {83return this.state;84};8586// Wait a tick, then process the queue of messages that87// arrived during initialization.88private process_queue_next_tick = async () => {89await delay(0);90while (this.state != "closed" && this.handle_update_queue.length > 0) {91const x = this.handle_update_queue.shift();92if (x != null) {93this.handle_update(x.err, x.resp);94}95}96};9798private run_the_query = (cb: Function): void => {99// This query_function gets called first on the100// initial query, then repeatedly with each changefeed101// update. The input function "cb" will be called102// precisely once, and the method handle_changefeed_update103// may get called if there are additional104// changefeed updates.105let first_time: boolean = true;106this.do_query({107query: this.query,108changes: true,109options: this.options,110cb: (err, resp) => {111if (first_time) {112cb(err, resp);113first_time = false;114} else {115this.handle_update(err, resp);116}117},118});119};120121private handle_update = (err, resp): void => {122if (this.state != "connected") {123if (this.state == "closed") {124// expected, since last updates after query cancel may get through...125return;126}127// This can and does happen when updates appear immediately128// after the first initial state is set (in run_the_query).129this.handle_update_queue.push({ err, resp });130return;131}132if (resp == null && err == null) {133err = "resp must not be null for non-error";134}135if (err || resp.event === "query_cancel") {136//if (err) console.warn("closing changefeed due to err", err);137this.close();138return;139}140if (resp.action == null) {141// Not a changefeed message. This happens, e.g., the first time142// when we use the standby server to get the changefeed.143return;144}145// Return just the new_val/old_val/action part of resp.146// console.log("resp=", resp);147const x: { new_val?: any; old_val?: any; action?: string } = {};148if (resp.new_val) {149x.new_val = resp.new_val;150}151if (resp.old_val) {152x.old_val = resp.old_val;153}154x.action = resp.action;155this.emit("update", x);156};157private cancel_query = async (id: string) => {158try {159await this.query_cancel(id);160} catch (err) {161// ignore error, which might be due to disconnecting and isn't a big deal.162// Basically anything that could cause an error would have also163// canceled the changefeed anyways.164}165};166}167168//169170171