Path: blob/master/src/packages/database/postgres/changefeed.ts
1503 views
/*1* This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.2* License: MS-RSL – see LICENSE.md for details3*/45/*6The Changes class is a useful building block7for making changefeeds. It lets you watch when given8columns change in a given table, and be notified9when a where condition is satisfied.1011IMPORTANT: If an error event is emitted then12Changes object will close and not work any further!13You must recreate it.14*/1516import { EventEmitter } from "events";17import * as misc from "@cocalc/util/misc";18import { opToFunction, OPERATORS, Operator } from "@cocalc/util/db-schema";19import { callback } from "awaiting";20import { PostgreSQL, QuerySelect } from "./types";21import { query } from "./changefeed-query";2223type WhereCondition = Function | object | object[];2425type ChangeAction = "delete" | "insert" | "update";26function parse_action(obj: string): ChangeAction {27const s: string = `${obj.toLowerCase()}`;28if (s === "delete" || s === "insert" || s === "update") {29return s;30}31throw Error(`invalid action "${s}"`);32}3334export interface ChangeEvent {35action: ChangeAction;36new_val?: object;37old_val?: object;38}3940export class Changes extends EventEmitter {41private db: PostgreSQL;42private table: string;43private select: QuerySelect;44private watch: string[];45private where: WhereCondition;4647private trigger_name: string;48private closed: boolean;49private condition?: { [field: string]: Function };50private match_condition: Function;5152private val_update_cache: { [key: string]: any } = {};5354constructor(55db: PostgreSQL,56table: string,57select: QuerySelect,58watch: string[],59where: WhereCondition,60cb: Function,61) {62super();63this.db = db;64this.table = table;65this.select = select;66this.watch = watch;67this.where = where;68this.init(cb);69}7071init = async (cb: Function): Promise<void> => {72this.dbg("constructor")(73`select=${misc.to_json(this.select)}, watch=${misc.to_json(74this.watch,75)}, @_where=${misc.to_json(this.where)}`,76);7778try {79this.init_where();80} catch (e) {81cb(`error initializing where conditions -- ${e}`);82return;83}8485try {86this.trigger_name = await callback(87this.db._listen,88this.table,89this.select,90this.watch,91);92} catch (err) {93cb(err);94return;95}96this.db.on(this.trigger_name, this.handle_change);97// NOTE: we close on *connect*, not on disconnect, since then clients98// that try to reconnect will only try to do so when we have an actual99// connection to the database. No point in worrying them while trying100// to reconnect, which only makes matters worse (as they panic and101// requests pile up!).102103// This setMaxListeners is here because I keep getting warning about104// this despite setting it in the db constructor. Putting this here105// definitely does work, whereas having it only in the constructor106// definitely does NOT. Don't break this without thought, as it has very bad107// consequences when the database connection drops.108this.db.setMaxListeners(0);109110this.db.once("connect", this.close);111cb(undefined, this);112};113114private dbg = (f: string): Function => {115return this.db._dbg(`Changes(table='${this.table}').${f}`);116};117118// this breaks the changefeed -- client must recreate it; nothing further will work at all.119private fail = (err): void => {120if (this.closed) {121return;122}123this.dbg("_fail")(`err='${err}'`);124this.emit("error", new Error(err));125this.close();126};127128close = (): void => {129if (this.closed) {130return;131}132this.emit("close", { action: "close" });133this.removeAllListeners();134if (this.db != null) {135this.db.removeListener(this.trigger_name, this.handle_change);136this.db.removeListener("connect", this.close);137this.db._stop_listening(this.table, this.select, this.watch);138}139misc.close(this);140this.closed = true;141};142143insert = async (where): Promise<void> => {144const where0: { [field: string]: any } = {};145for (const k in where) {146const v = where[k];147where0[`${k} = $`] = v;148}149let results: { [field: string]: any }[];150try {151results = await query({152db: this.db,153select: this.watch.concat(misc.keys(this.select)),154table: this.table,155where: where0,156one: false,157});158} catch (err) {159this.fail(err); // this is game over160return;161}162for (const x of results) {163if (this.match_condition(x)) {164misc.map_mutate_out_undefined_and_null(x);165const change: ChangeEvent = { action: "insert", new_val: x };166this.emit("change", change);167}168}169};170171delete = (where): void => {172// listener is meant to delete everything that *matches* the where, so173// there is no need to actually do a query.174const change: ChangeEvent = { action: "delete", old_val: where };175this.emit("change", change);176};177178private handle_change = async (mesg): Promise<void> => {179if (this.closed) {180return;181}182// this.dbg("handle_change")(JSON.stringify(mesg));183if (mesg[0] === "DELETE") {184if (!this.match_condition(mesg[2])) {185return;186}187this.emit("change", { action: "delete", old_val: mesg[2] });188return;189}190let k: string, r: ChangeEvent, v: any;191if (typeof mesg[0] !== "string") {192throw Error(`invalid mesg -- mesg[0] must be a string`);193}194let action: ChangeAction = parse_action(mesg[0]);195if (!this.match_condition(mesg[1])) {196// object does not match condition197if (action !== "update") {198// new object that doesn't match condition -- nothing to do.199return;200}201// fill in for each part that we watch in new object the same202// data in the old object, in case it is missing.203// TODO: when is this actually needed?204for (k in mesg[1]) {205v = mesg[1][k];206if (mesg[2][k] == null) {207mesg[2][k] = v;208}209}210if (this.match_condition(mesg[2])) {211// the old object was in our changefeed, but the UPDATE made it not212// anymore, so we emit delete action.213this.emit("change", { action: "delete", old_val: mesg[2] });214}215// Nothing more to do.216return;217}218if (this.watch.length === 0) {219// No additional columns are being watched at all -- we only220// care about what's in the mesg.221r = { action, new_val: mesg[1] };222this.emit("change", r);223return;224}225// Additional columns are watched so we must do a query to get them.226// There's no way around this due to the size limits on postgres LISTEN/NOTIFY.227const where = {};228for (k in mesg[1]) {229v = mesg[1][k];230where[`${k} = $`] = v;231}232let result: undefined | { [field: string]: any };233try {234result = await query({235db: this.db,236select: this.watch,237table: this.table,238where,239one: true,240});241} catch (err) {242this.fail(err);243return;244}245246// we do know from stacktraces that new_val_update is called after closed247// this must have happened during waiting on the query. aborting early.248if (this.closed) {249return;250}251252if (result == null) {253// This happens when record isn't deleted, but some254// update results in the object being removed from our255// selection criterion... which we view as "delete".256this.emit("change", { action: "delete", old_val: mesg[1] });257return;258}259260const key = JSON.stringify(mesg[1]);261const this_val = misc.merge(result, mesg[1]);262let new_val;263if (action == "update") {264const x = this.new_val_update(mesg[1], this_val, key);265if (x == null) {266// happens if this.closed is true -- double check for safety (and typescript).267return;268}269action = x.action; // may be insert in case no previous cached info.270new_val = x.new_val;271} else {272// not update and not delete (could have been a delete and write273// before we did above query, so treat as insert).274action = "insert";275new_val = this_val;276}277this.val_update_cache[key] = this_val;278279r = { action, new_val };280this.emit("change", r);281};282283private new_val_update = (284primary_part: { [key: string]: any },285this_val: { [key: string]: any },286key: string,287):288| { new_val: { [key: string]: any }; action: "insert" | "update" }289| undefined => {290if (this.closed) {291return;292}293const prev_val = this.val_update_cache[key];294if (prev_val == null) {295return { new_val: this_val, action: "insert" }; // not enough info to make a diff296}297this.dbg("new_val_update")(`${JSON.stringify({ this_val, prev_val })}`);298299// Send only the fields that changed between300// prev_val and this_val, along with the primary part.301const new_val = misc.copy(primary_part);302// Not using lodash isEqual below, since we want equal Date objects303// to compare as equal. If JSON is randomly re-ordered, that's fine since304// it is just slightly less efficienct.305for (const field in this_val) {306if (307new_val[field] === undefined &&308JSON.stringify(this_val[field]) != JSON.stringify(prev_val[field])309) {310new_val[field] = this_val[field];311}312}313for (const field in prev_val) {314if (prev_val[field] != null && this_val[field] == null) {315// field was deleted / set to null -- we must inform in the update316new_val[field] = null;317}318}319return { new_val, action: "update" };320};321322private init_where = (): void => {323if (typeof this.where === "function") {324// user provided function325this.match_condition = this.where;326return;327}328329let w: any[];330if (misc.is_object(this.where)) {331w = [this.where];332} else {333// TODO: misc.is_object needs to be a typescript checker instead, so334// this as isn't needed.335w = this.where as object[];336}337338this.condition = {};339const add_condition = (field: string, op: Operator, val: any): void => {340if (this.condition == null) {341return; // won't happen342}343let f: Function, g: Function;344field = field.trim();345if (field[0] === '"') {346// de-quote347field = field.slice(1, field.length - 1);348}349if (this.select[field] == null) {350throw Error(351`'${field}' must be in select="${JSON.stringify(this.select)}"`,352);353}354if (misc.is_object(val)) {355throw Error(`val (=${misc.to_json(val)}) must not be an object`);356}357if (misc.is_array(val)) {358if (op === "=" || op === "==") {359// containment360f = function (x) {361for (const v of val) {362if (x === v) {363return true;364}365}366return false;367};368} else if (op === "!=" || op === "<>") {369// not contained in370f = function (x) {371for (const v of val) {372if (x === v) {373return false;374}375}376return true;377};378} else {379throw Error("if val is an array, then op must be = or !=");380}381} else if (misc.is_date(val)) {382// Inputs to condition come back as JSON, which doesn't know383// about timestamps, so we convert them to date objects.384if (op == "=" || op == "==") {385f = (x) => new Date(x).valueOf() - val.valueOf() === 0;386} else if (op == "!=" || op == "<>") {387f = (x) => new Date(x).valueOf() - val.valueOf() !== 0;388} else {389g = opToFunction(op);390f = (x) => g(new Date(x), val);391}392} else {393g = opToFunction(op);394f = (x) => g(x, val);395}396this.condition[field] = f;397};398399for (const obj of w) {400if (misc.is_object(obj)) {401for (const k in obj) {402const val = obj[k];403/*404k should be of one of the following forms405- "field op $::TYPE"406- "field op $" or407- "field op any($)"408- "$ op any(field)"409- 'field' (defaults to =)410where op is one of =, <, >, <=, >=, !=411412val must be:413- something where javascript === and comparisons works as you expect!414- or an array, in which case op must be = or !=, and we ALWAYS do inclusion (analogue of any).415*/416if (k.startsWith("$")) {417/*418The "$ op any(field)" is used, e.g., for having multiple owners419of a single thing, e.g.,:420421pg_where: [{ "$::UUID = ANY(owner_account_ids)": "account_id" }]422423where we need to get the field(=owner_account_ids) and check that424val(=account_id) is in it, at the javascript level.425*/426if (k.includes("<") || k.includes(">")) {427throw Error("only = and != are supported");428}429const isEquals = !k.includes("!=");430const i = k.toLowerCase().indexOf("any(");431if (i == -1) {432throw Error(433"condition must be $=ANY(...) or $!=ANY(...) -- missing close paren",434);435}436const j = k.lastIndexOf(")");437if (j == -1) {438throw Error(439"condition must be $=ANY(...) or $!=ANY(...) -- missing close parent",440);441}442const field = k.slice(i + 4, j);443if (isEquals) {444this.condition[field] = (x) => !!x?.includes(val);445} else {446this.condition[field] = (x) => !x?.includes(val);447}448} else {449let found = false;450for (const op of OPERATORS) {451const i = k.indexOf(op);452if (i !== -1) {453const field = k.slice(0, i).trim();454add_condition(field, op, val);455found = true;456break;457}458}459if (!found) {460throw Error(`unable to parse '${k}'`);461}462}463}464} else if (typeof obj === "string") {465let found = false;466for (const op of OPERATORS) {467const i = obj.indexOf(op);468if (i !== -1) {469add_condition(470obj.slice(0, i),471op,472eval(obj.slice(i + op.length).trim()),473);474found = true;475break;476}477}478if (!found) {479throw Error(`unable to parse '${obj}'`);480}481} else {482throw Error("NotImplementedError");483}484}485if (misc.len(this.condition) === 0) {486delete this.condition;487}488489this.match_condition = (obj: object): boolean => {490//console.log '_match_condition', obj491if (this.condition == null) {492return true;493}494for (const field in this.condition) {495const f = this.condition[field];496if (!f(obj[field])) {497//console.log 'failed due to field ', field498return false;499}500}501return true;502};503};504}505506507