Path: blob/master/src/packages/conat/sync/synctable-kv.ts
1452 views
/*12345*/67import { keys } from "lodash";8import { client_db } from "@cocalc/util/db-schema/client-db";9import type { State } from "@cocalc/conat/types";10import type { Client } from "@cocalc/conat/core/client";11import { EventEmitter } from "events";12import { dkv as createDkv, type DKV } from "./dkv";13import { dko as createDko, type DKO } from "./dko";14import jsonStableStringify from "json-stable-stringify";15import { toKey } from "@cocalc/conat/util";16import { wait } from "@cocalc/util/async-wait";17import { fromJS, Map } from "immutable";18import type { JSONValue } from "@cocalc/util/types";19import type { Configuration } from "@cocalc/conat/sync/core-stream";2021export class SyncTableKV extends EventEmitter {22public readonly table;23private query;24private atomic: boolean;25private primaryKeys: string[];26private project_id?: string;27private account_id?: string;28private state: State = "disconnected";29private dkv?: DKV | DKO;30private client: Client;31private getHook: Function;32private config?: Partial<Configuration>;33private desc?: JSONValue;34private ephemeral?: boolean;3536constructor({37query,38client,39account_id,40project_id,41atomic,42immutable,43config,44desc,45ephemeral,46}: {47query;48client: Client;49account_id?: string;50project_id?: string;51atomic?: boolean;52immutable?: boolean;53config?: Partial<Configuration>;54desc?: JSONValue;55ephemeral?: boolean;56}) {57super();58this.setMaxListeners(1000);59this.atomic = !!atomic;60this.getHook = immutable ? fromJS : (x) => x;61this.query = query;62this.config = config;63this.client = client;64this.desc = desc;65this.ephemeral = ephemeral;66this.table = keys(query)[0];67if (query[this.table][0].string_id && query[this.table][0].project_id) {68this.project_id = query[this.table][0].project_id;69} else {70this.account_id = account_id ?? query[this.table][0].account_id;71this.project_id = project_id;72}73this.primaryKeys = client_db.primary_keys(this.table);74}7576private set_state = (state: State): void => {77this.state = state;78this.emit(state);79};8081get_state = () => {82return this.state;83};8485// WARNING: be *VERY* careful before changing how the name is86// derived from the query, since if you change this all the current87// data in conat that caches the changefeeds is basically lost88// and users MUST refresh their browsers (and maybe projects restart?)89// to get new changefeeds, since they are watching something given90// by this name. I.e., this name shouldn't ever be changed.91// The point of the name is that it uniquely identifies the92// changefeed query, so just using the query itself should be fine.93// A big choice here is the full name or just something short like the94// sha1 hash, but I've chosen the full name, since then it is always easy95// to know what the query was, i.e., use base64 decoding then you96// have the query. It's less efficient though since the conat subjects97// can be long, depending on the query.98// This way if we are just watching general conat traffic and see something99// suspicious, even if we have no idea initially where it came from,100// we can easily see by decoding it.101// Including even the fields with no values distinguishes different102// changefeeds that pick off different columns from the database.103// PLAN: Longterm there's no doubt that changefeeds in postgresql will104// be eliminated from cocalc completely, and at that point situation105// will melt away.106private getName = () => {107const spec = this.query[this.table][0];108if (spec.string_id) {109// special case -- the tables with a string_id never touch the database110// and are used with *different* spec at the same time to coordinate111// between browser and project, so we can't use the spec.112return `${this.table}:${spec.string_id}`;113}114return `${this.table}:${jsonStableStringify(spec)}`;115};116117init = async () => {118const name = this.getName();119if (this.atomic) {120this.dkv = await createDkv({121client: this.client,122name,123account_id: this.account_id,124project_id: this.project_id,125config: this.config,126desc: this.desc,127ephemeral: this.ephemeral,128});129} else {130this.dkv = await createDko({131client: this.client,132name,133account_id: this.account_id,134project_id: this.project_id,135config: this.config,136desc: this.desc,137ephemeral: this.ephemeral,138});139}140// For some reason this one line confuses typescript and break building the compute server package (nothing else similar happens).141// Do not remove. The error is that "this.dkv.on" is not callable.142// @ts-ignore143this.dkv.on("change", (x) => {144if (!this.atomic) {145if (x.value === undefined) {146// delete147x = { ...x, prev: this.dkv?.get(x.key) };148} else {149// change150x = { ...x, value: this.dkv?.get(x.key) };151}152}153// change api was to emit array of keys.154// We also use this packages/sync/table/changefeed-conat.ts which needs the value,155// so we emit that object second.156this.emit("change", [x.key], x);157});158this.set_state("connected");159};160161getKey = (obj_or_key): string => {162if (typeof obj_or_key == "string") {163return obj_or_key;164}165let obj = obj_or_key;166if (Map.isMap(obj)) {167obj = obj.toJS();168}169if (this.primaryKeys.length === 1) {170return toKey(obj[this.primaryKeys[0]] ?? "")!;171} else {172// compound primary key173return toKey(this.primaryKeys.map((pk) => obj[pk]))!;174}175};176177set = (obj) => {178if (this.dkv == null) throw Error("closed");179if (Map.isMap(obj)) {180obj = obj.toJS();181}182this.dkv.set(this.getKey(obj), obj);183};184185delete = (obj_or_key) => {186if (this.dkv == null) throw Error("closed");187this.dkv.delete(this.getKey(obj_or_key));188};189190get = (obj_or_key?) => {191if (this.dkv == null) throw Error("closed");192if (obj_or_key == null) {193return this.getHook(this.dkv.getAll());194}195return this.getHook(this.dkv.get(this.getKey(obj_or_key)));196};197198get_one = () => {199if (this.dkv == null) throw Error("closed");200// TODO: insanely inefficient, especially if !atomic!201for (const key in this.dkv.getAll()) {202return this.get(key);203}204};205206save = async () => {207await this.dkv?.save();208};209210close = async () => {211if (this.state == "closed") return;212this.set_state("closed");213this.removeAllListeners();214await this.dkv?.close();215delete this.dkv;216// @ts-ignore217delete this.client;218};219220public async wait(until: Function, timeout: number = 30): Promise<any> {221if (this.state == "closed") {222throw Error("wait: must not be closed");223}224return await wait({225obj: this,226until,227timeout,228change_event: "change",229});230}231}232233234