Path: blob/master/src/packages/conat/sync/synctable-stream.ts
1452 views
/*1Conat implementation of the idea of a "SyncTable", but2for streaming data.34**This is ONLY for the scope of patches in a single5project and IS NOT USED IN ANY WAY WITH POSTGRESQL.**67It uses a conat persistent stream to store the elements8in a well defined order.9*/1011import jsonStableStringify from "json-stable-stringify";12import { keys } from "lodash";13import { cmp_Date, is_array, isValidUUID } from "@cocalc/util/misc";14import { client_db } from "@cocalc/util/db-schema/client-db";15import { EventEmitter } from "events";16import { dstream, DStream } from "./dstream";17import { fromJS, Map } from "immutable";18import type { Configuration } from "@cocalc/conat/sync/core-stream";19import type { Client } from "@cocalc/conat/core/client";2021export type State = "disconnected" | "connected" | "closed";2223function toKey(x): string | undefined {24if (x === undefined) {25return undefined;26} else if (typeof x === "object") {27return jsonStableStringify(x);28} else {29return `${x}`;30}31}3233export class SyncTableStream extends EventEmitter {34public readonly table;35private primaryKeys: string[];36private project_id?: string;37private path: string;38private string_id: string;39private data: any = {};40private state: State = "disconnected";41private dstream?: DStream;42private client: Client;43private getHook: Function;44private config?: Partial<Configuration>;45private start_seq?: number;46private noInventory?: boolean;47private ephemeral?: boolean;4849constructor({50query,51client,52account_id: _account_id,53project_id,54immutable,55config,56start_seq,57noInventory,58ephemeral,59}: {60query;61client: Client;62account_id?: string;63project_id?: string;64immutable?: boolean;65config?: Partial<Configuration>;66start_seq?: number;67noInventory?: boolean;68ephemeral?: boolean;69}) {70super();71this.client = client;72this.noInventory = noInventory;73this.ephemeral = ephemeral;74this.setMaxListeners(1000);75this.getHook = immutable ? fromJS : (x) => x;76this.config = config;77this.start_seq = start_seq;78const table = keys(query)[0];79this.table = table;80if (table != "patches") {81throw Error("only the patches table is supported");82}83this.project_id = project_id ?? query[table][0].project_id;84if (!isValidUUID(this.project_id)) {85throw Error("query MUST specify a valid project_id");86}87this.path = query[table][0].path;88if (!this.path) {89throw Error("path MUST be specified");90}91query[table][0].string_id = this.string_id = client_db.sha1(92this.project_id,93this.path,94);95this.primaryKeys = client_db.primary_keys(table);96}9798init = async () => {99const name = patchesStreamName({ string_id: this.string_id });100this.dstream = await dstream({101name,102client: this.client,103project_id: this.project_id,104config: this.config,105desc: { path: this.path },106start_seq: this.start_seq,107noInventory: this.noInventory,108ephemeral: this.ephemeral,109});110this.dstream.on("change", (mesg) => {111this.handle(mesg, true);112});113this.dstream.on("reject", (err) => {114console.warn("synctable-stream: rejected - ", err);115});116for (const mesg of this.dstream.getAll()) {117this.handle(mesg, false);118}119this.setState("connected");120};121122private setState = (state: State): void => {123this.state = state;124this.emit(state);125};126127get_state = () => {128return this.state;129};130131private primaryString = (obj): string => {132const obj2 = { ...obj, string_id: this.string_id };133return toKey(this.primaryKeys.map((pk) => obj2[pk]))!;134};135136getKey = this.primaryString;137138set = (obj) => {139if (Map.isMap(obj)) {140obj = obj.toJS();141}142// console.log("set", obj);143// delete string_id since it is redundant info144const key = this.primaryString(obj);145const { string_id, ...obj2 } = obj;146if (this.data[key] != null) {147throw Error(148`object with key ${key} was already written to the stream -- written data cannot be modified`,149);150return;151}152// console.log("set - publish", obj);153if (this.dstream == null) {154throw Error("closed");155}156this.dstream.publish(obj2);157};158159private handle = (obj, changeEvent: boolean) => {160if (this.state == "closed") {161return true;162}163const key = this.primaryString(obj);164this.data[key] = { ...obj };165if (changeEvent) {166this.emit("change", [key]);167}168};169170get = (obj?) => {171if (obj == null) {172return this.getHook(this.data);173}174if (typeof obj == "string") {175return this.getHook(this.data[obj]);176}177if (is_array(obj)) {178const x: any = {};179for (const key of obj) {180x[this.primaryString(key)] = this.get(key);181}182return this.getHook(x);183}184let key;185if (typeof obj == "object") {186key = this.primaryString(obj);187} else {188key = `${key}`;189}190return this.getHook(this.data[key]);191};192193getSortedTimes = () => {194return Object.values(this.data)195.map(({ time }) => time)196.sort(cmp_Date);197};198199close = () => {200if (this.state === "closed") {201// already closed202return;203}204this.setState("closed");205this.removeAllListeners();206this.dstream?.close();207delete this.dstream;208// @ts-ignore209delete this.client;210};211212delete = async (_obj) => {213throw Error("delete: not implemented for stream synctable");214};215216save = () => {217this.dstream?.save();218};219220has_uncommitted_changes = () => {221return this.dstream?.hasUnsavedChanges();222};223}224225export function patchesStreamName({226project_id,227path,228string_id,229}: {230project_id?: string;231path?: string;232string_id?: string;233}): string {234if (!string_id) {235if (!project_id || !path) {236throw Error("one of string_id or both project_id and path must be given");237}238string_id = client_db.sha1(project_id, path);239}240if (!string_id) {241throw Error("bug");242}243return `patches:${string_id}`;244}245246247