Path: blob/master/src/packages/sync/table/synctable.ts
1447 views
/*1* This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.2* License: MS-RSL – see LICENSE.md for details3*/45/*67Variations: Instead of making this class really complicated8with many different ways to do sync (e.g, changefeeds, project9websockets, unit testing, etc.), we have one single approach via10a Client that has a certain interface. Then we implement different11Clients that have this interface, in order to support different12ways of orchestrating a SyncTable.13*/1415// If true, will log to the console a huge amount of16// info about every get/set17let DEBUG: boolean = false;1819// enable default conat database backed changefeed.20// for this to work you must explicitly run the server in @cocalc/database/conat/changefeeds21// We only turn this off for a mock testing mode.22const USE_CONAT = true && !process.env.COCALC_TEST_MODE;2324export function set_debug(x: boolean): void {25DEBUG = x;26}2728import { delay } from "awaiting";29import { global_cache_decref } from "./global-cache";30import { EventEmitter } from "events";31import { Map, fromJS, List } from "immutable";32import { keys, throttle } from "lodash";33import { callback2, cancel_scheduled, once } from "@cocalc/util/async-utils";34import { wait } from "@cocalc/util/async-wait";35import { query_function } from "./query-function";36import { assert_uuid, copy, is_array, is_object, len } from "@cocalc/util/misc";37import * as schema from "@cocalc/util/schema";38import mergeDeep from "@cocalc/util/immutable-deep-merge";39import { reuseInFlight } from "@cocalc/util/reuse-in-flight";40import { Changefeed } from "./changefeed";41import { ConatChangefeed } from "./changefeed-conat";42import { parse_query, to_key } from "./util";43import { isTestClient } from "@cocalc/sync/editor/generic/util";4445import type { Client } from "@cocalc/sync/client/types";46export type { Client };4748export type Query = any; // TODO typing49export type QueryOptions = any[]; // TODO typing5051export type MergeType = "deep" | "shallow" | "none";5253export interface VersionedChange {54obj: { [key: string]: any };55version: number;56}5758export interface TimedChange {59obj: { [key: string]: any };60time: number; // ms since epoch61}6263function is_fatal(err: string): boolean {64return err.indexOf("FATAL") != -1;65}6667export type State = "disconnected" | "connected" | "closed";6869export class SyncTable extends EventEmitter {70private changefeed?: Changefeed | ConatChangefeed;71private query: Query;72private client_query: any;73private primary_keys: string[];74private options: QueryOptions;75public readonly client: Client;76private throttle_changes?: number;77private throttled_emit_changes?: Function;78private last_server_time: number = 0;79private error: { error: string; query: Query } | undefined = undefined;8081// Immutable map -- the value of this synctable.82private value?: Map<string, Map<string, any>>;83private last_save: Map<string, Map<string, any>> = Map();8485// Which records we have changed (and when, by server time),86// that haven't been sent to the backend.87private changes: { [key: string]: number } = {};8889// The version of each record.90private versions: { [key: string]: number } = {};9192// The inital version is only used in the project, where we93// just assume the clock is right. If this were totally94// off/changed, then clients would get confused -- until they95// close and open the file or refresh their browser. It might96// be better to switch to storing the current version number97// on disk.98private initial_version: number = Date.now();99100// disconnected <--> connected --> closed101private state: State;102public table: string;103private schema: any;104private emit_change: Function;105public reference_count: number = 0;106public cache_key: string | undefined;107// Which fields the user is allowed to set/change.108// Gets updated during init.109private set_fields: string[] = [];110// Which fields *must* be included in any set query.111// Also updated during init.112private required_set_fields: { [key: string]: boolean } = {};113114// Coerce types and generally do strong checking of all115// types using the schema. Do this unless you have a very116// good reason not to!117private coerce_types: boolean = true;118119// If set, then the table is assumed to be managed120// entirely externally (using events).121// This is used by the synctables that are managed122// entirely by the project (e.g., sync-doc support).123private no_db_set: boolean = false;124125// Set only for some tables that are hosted directly on a project (not database),126// e.g., the project_status and listings.127private project_id?: string;128129private last_has_uncommitted_changes?: boolean = undefined;130131// This is used only in synctable-project.ts for a communications channel132// for Jupyter on compute servers.133public channel?: any;134135constructor(136query,137options: any[],138client: Client,139throttle_changes?: number,140coerce_types?: boolean,141no_db_set?: boolean,142project_id?: string,143) {144super();145146if (coerce_types != undefined) {147this.coerce_types = coerce_types;148}149if (no_db_set != undefined) {150this.no_db_set = no_db_set;151}152if (project_id != undefined) {153this.project_id = project_id;154}155156if (is_array(query)) {157throw Error("must be a single query, not array of queries");158}159160this.set_state("disconnected");161162this.changefeed_on_update = this.changefeed_on_update.bind(this);163this.changefeed_on_close = this.changefeed_on_close.bind(this);164165this.setMaxListeners(100);166this.query = parse_query(query);167this.options = options;168this.client = client;169this.throttle_changes = throttle_changes;170171this.init_query();172this.init_throttle_changes();173174// So only ever runs once at a time.175this.save = reuseInFlight(this.save.bind(this));176this.first_connect();177}178179/* PUBLIC API */180181// is_ready is true if the table has been initialized and not yet closed.182// It might *not* be currently connected, due to a temporary network183// disconnect. When is_ready is true you can read and write to this table,184// but there is no guarantee things aren't temporarily stale.185public is_ready(): boolean {186return this.value != null && this.state !== "closed";187}188189/*190Return true if there are changes to this synctable that191have NOT been confirmed as saved to the backend database.192(Always returns false when not yet initialized.)193*/194public has_uncommitted_changes(): boolean {195if (this.state === "closed") {196return false; // if closed, can't have any uncommitted changes.197}198return len(this.changes) !== 0;199}200201/* Gets records from this table.202- arg = not given: returns everything (as an203immutable map from key to obj)204- arg = array of keys; return map from key to obj205- arg = single key; returns corresponding object206207This is NOT a generic query mechanism. SyncTable208is really best thought of as a key:value store!209*/210public get(arg?): Map<string, any> | undefined {211this.assert_not_closed("get");212213if (this.value == null) {214throw Error("table not yet initialized");215}216217if (arg == null) {218return this.value;219}220221if (is_array(arg)) {222let x: Map<string, Map<string, any>> = Map();223for (const k of arg) {224const key: string | undefined = to_key(k);225if (key != null) {226const y = this.value.get(key);227if (y != null) {228x = x.set(key, y);229}230}231}232return x;233} else {234const key = to_key(arg);235if (key != null) {236return this.value.get(key);237}238}239}240241/* Return the number of records in the table. */242public size(): number {243this.assert_not_closed("size");244if (this.value == null) {245throw Error("table not yet initialized");246}247return this.value.size;248}249250/*251Get one record from this table. Especially useful when252there is only one record, which is an important special253case (a so-called "wide" table?.)254*/255public get_one(arg?): Map<string, any> | undefined {256if (this.value == null) {257throw Error("table not yet initialized");258}259260if (arg == null) {261return this.value.toSeq().first();262} else {263// get only returns (at most) one object, so it's "get_one".264return this.get(arg);265}266}267268private async wait_until_value(): Promise<void> {269if (this.value != null) return;270// can't save until server sends state. We wait.271await once(this, "init-value-server");272if (this.value == null) {273throw Error("bug -- change should initialize value");274}275}276277/*278Ensure any unsent changes are sent to the backend.279When this function returns there are no unsent changes,280since it keeps calling _save until nothing has changed281locally.282*/283public async save(): Promise<void> {284const dbg = this.dbg("save");285//console.log("synctable SAVE");286if (this.state === "closed") {287// Not possible to save. save is wrapped in288// reuseInFlight, which debounces, so it's very289// reasonable that an attempt to call this would290// finally fire after a close (which is sync).291// Throwing an error hit would (and did) actually292// crash projects on the backend in production,293// so this has to be a warning.294dbg("WARNING: called save on closed synctable");295return;296}297if (this.value == null) {298// nothing to save yet299return;300}301302while (this.has_uncommitted_changes()) {303if (this.error) {304// do not try to save when there's an error since that305// won't help. Need to attempt to fix it first.306dbg("WARNING: not saving ", this.error);307return;308}309//console.log("SAVE -- has uncommitted changes, so trying again.");310if (this.state !== "connected") {311// wait for state change.312// This could take a long time, and that is fine.313await once(this, "state");314}315if (this.state === "connected") {316if (!(await this._save())) {317this.update_has_uncommitted_changes();318return;319}320}321// else switched to something else (?), so322// loop around and wait again for a change...323}324}325326private update_has_uncommitted_changes(): void {327const cur = this.has_uncommitted_changes();328if (cur !== this.last_has_uncommitted_changes) {329this.emit("has-uncommitted-changes", cur);330this.last_has_uncommitted_changes = cur;331}332}333334/*335set -- Changes (or creates) one entry in the table.336The input field changes is either an Immutable.js Map or a JS Object map.337If changes does not have the primary key then a random record is updated,338and there *must* be at least one record. Exception: computed primary339keys will be computed (see stuff about computed primary keys above).340The second parameter 'merge' can be one of three values:341'deep' : (DEFAULT) deep merges the changes into the record, keep as much info as possible.342'shallow': shallow merges, replacing keys by corresponding values343'none' : do no merging at all -- just replace record completely344Raises an exception if something goes wrong doing the set.345Returns updated value otherwise.346347DOES NOT cause a save.348349NOTE: we always use db schema to ensure types are correct,350converting if necessary. This has a performance impact,351but is worth it for sanity's sake!!!352*/353public set(354changes: any,355merge: MergeType = "deep",356fire_change_event: boolean = true,357): any {358if (this.value == null) {359throw Error("can't set until table is initialized");360}361362if (!Map.isMap(changes)) {363changes = fromJS(changes);364if (!is_object(changes)) {365throw Error(366"type error -- changes must be an immutable.js Map or JS map",367);368}369}370if (DEBUG) {371//console.log(`set('${this.table}'): ${JSON.stringify(changes.toJS())}`);372}373// For sanity!374changes = this.do_coerce_types(changes as any);375// Ensure that each key is allowed to be set.376if (this.client_query.set == null) {377throw Error(`users may not set ${this.table}`);378}379380const can_set = this.client_query.set.fields;381changes.map((_, k) => {382if (can_set[k] === undefined) {383throw Error(`users may not set ${this.table}.${k}`);384}385});386// Determine the primary key's value387let key: string | undefined = this.obj_to_key(changes);388if (key == null) {389// attempt to compute primary key if it is a computed primary key390let key0 = this.computed_primary_key(changes);391key = to_key(key0);392if (key == null && this.primary_keys.length === 1) {393// use a "random" primary key from existing data394key0 = key = this.value.keySeq().first();395}396if (key == null) {397throw Error(398`must specify primary key ${this.primary_keys.join(399",",400)}, have at least one record, or have a computed primary key`,401);402}403// Now key is defined404if (this.primary_keys.length === 1) {405changes = changes.set(this.primary_keys[0], key0);406} else if (this.primary_keys.length > 1) {407if (key0 == null) {408// to satisfy typescript.409throw Error("bug -- computed primary key must be an array");410}411let i = 0;412for (const pk of this.primary_keys) {413changes = changes.set(pk, key0[i]);414i += 1;415}416}417}418419// Get the current value420const cur = this.value.get(key);421let new_val;422423if (cur == null) {424// No record with the given primary key. Require that425// all the this.required_set_fields are specified, or426// it will become impossible to sync this table to427// the backend.428for (const k in this.required_set_fields) {429if (changes.get(k) == null) {430throw Error(`must specify field '${k}' for new records`);431}432}433// If no current value, then next value is easy -- it equals the current value in all cases.434new_val = changes;435} else {436// Use the appropriate merge strategy to get the next val.437switch (merge) {438case "deep":439new_val = mergeDeep(cur, changes);440break;441case "shallow":442new_val = cur.merge(changes);443break;444case "none":445new_val = changes;446break;447default:448throw Error("merge must be one of 'deep', 'shallow', 'none'");449}450}451452if (new_val.equals(cur)) {453// nothing actually changed, so nothing further to do.454return new_val;455}456457// clear error state -- the change may be just what is needed458// to fix the error, e.g., attempting to save an invalid account459// setting, then fixing it.460this.clearError();461462for (const field in this.required_set_fields) {463if (!new_val.has(field)) {464throw Error(465`missing required set field ${field} of table ${this.table}`,466);467}468}469470// Something changed:471this.value = this.value.set(key, new_val);472this.changes[key] = this.unique_server_time();473this.update_has_uncommitted_changes();474if (this.client.is_project()) {475// project assigns versions476const version = this.increment_version(key);477const obj = new_val.toJS();478this.emit("versioned-changes", [{ obj, version }]);479} else {480// browser gets them assigned...481this.null_version(key);482// also touch to indicate activity and make sure project running,483// in some cases.484this.touch_project();485}486if (fire_change_event) {487this.emit_change([key]);488}489490return new_val;491}492493private async touch_project(): Promise<void> {494if (this.project_id != null) {495try {496await this.client.touch_project(this.project_id);497} catch (err) {498// not fatal499console.warn("touch_project -- ", this.project_id, err);500}501}502}503504public close_no_async(): void {505if (this.state === "closed") {506// already closed507return;508}509// decrement the reference to this synctable510if (global_cache_decref(this)) {511// close: not zero -- so don't close it yet --512// still in use by possibly multiple clients513return;514}515516if (this.throttled_emit_changes != null) {517cancel_scheduled(this.throttled_emit_changes);518delete this.throttled_emit_changes;519}520521this.client.removeListener("disconnected", this.disconnected);522this.close_changefeed();523this.set_state("closed");524this.removeAllListeners();525delete this.value;526}527528public async close(fatal: boolean = false): Promise<void> {529if (this.state === "closed") {530// already closed531return;532}533this.dbg("close")({ fatal });534if (!fatal) {535// do a last attempt at a save (so we don't lose data),536// then really close.537await this.save(); // attempt last save to database.538/*539The moment the sync part of _save is done, we remove listeners540and clear everything up. It's critical that as soon as close541is called that there be no possible way any further connect542events (etc) can make this SyncTable543do anything!! That finality assumption is made544elsewhere (e.g in @cocalc/project).545*/546}547this.close_no_async();548}549550public async wait(until: Function, timeout: number = 30): Promise<any> {551this.assert_not_closed("wait");552553return await wait({554obj: this,555until,556timeout,557change_event: "change-no-throttle",558});559}560561/* INTERNAL PRIVATE METHODS */562563private async first_connect(): Promise<void> {564try {565await this.connect();566this.update_has_uncommitted_changes();567} catch (err) {568console.warn(569`synctable: failed to connect (table=${this.table}), error=${err}`,570this.query,571);572this.close(true);573}574}575576private set_state(state: State): void {577this.state = state;578this.emit(state);579}580581public get_state(): State {582return this.state;583}584585public get_table(): string {586return this.table;587}588589private set_throttle_changes(): void {590// No throttling of change events, unless explicitly requested591// *or* part of the schema.592if (this.throttle_changes != null) return;593const t = schema.SCHEMA[this.table];594if (t == null) return;595const u = t.user_query;596if (u == null) return;597const g = u.get;598if (g == null) return;599this.throttle_changes = g.throttle_changes;600}601602private init_throttle_changes(): void {603this.set_throttle_changes();604605if (!this.throttle_changes) {606this.emit_change = (changed_keys: string[]) => {607this.emit("change", changed_keys);608this.emit("change-no-throttle", changed_keys);609};610return;611}612613// throttle emitting of change events614let all_changed_keys = {};615const do_emit_changes = () => {616//console.log("#{this.table} -- emitting changes", keys(all_changed_keys))617// CRITICAL: some code depends on emitting change even618// for the *empty* list of keys!619// E.g., projects page won't load for new users. This620// is the *change* from not loaded to being loaded,621// which does make sense.622this.emit("change", keys(all_changed_keys));623all_changed_keys = {};624};625this.throttled_emit_changes = throttle(626do_emit_changes,627this.throttle_changes,628);629this.emit_change = (changed_keys) => {630//console.log("emit_change", changed_keys);631this.dbg("emit_change")(changed_keys);632//console.log("#{this.table} -- queue changes", changed_keys)633for (const key of changed_keys) {634all_changed_keys[key] = true;635}636this.emit("change-no-throttle", changed_keys);637if (this.throttled_emit_changes != null) {638this.throttled_emit_changes();639}640};641}642643private dbg(_f?: string): Function {644if (!DEBUG) {645return () => {};646}647if (this.client.is_project()) {648return this.client.dbg(649`SyncTable('${JSON.stringify(this.query)}').${_f}`,650);651} else {652return (...args) => {653console.log(`synctable("${this.table}").${_f}: `, ...args);654};655}656}657658private connect = async (): Promise<void> => {659const dbg = this.dbg("connect");660dbg();661this.assert_not_closed("connect");662if (this.state === "connected") {663return;664}665666// 1. save, in case we have any local unsaved changes,667// then sync with upstream.668if (this.value != null) {669dbg("send off any local unsaved changes first");670await this.save();671}672673// 2. Now actually setup the changefeed.674// (Even if this.no_db_set is set, this still may do675// an initial query to the database. However, the changefeed676// does nothing further.)677dbg("actually setup changefeed");678await this.create_changefeed();679680dbg("connect should have succeeded");681};682683private async create_changefeed(): Promise<void> {684const dbg = this.dbg("create_changefeed");685if (this.get_state() == "closed") {686dbg("closed so don't do anything ever again");687return;688}689dbg("creating changefeed connection...");690let initval;691try {692initval = await this.create_changefeed_connection();693if (!initval) {694throw Error("closed while creating changefeed");695}696} catch (err) {697dbg("failed to create changefeed", err.toString());698// Typically this happens if synctable closed while699// creating the connection...700this.close();701}702if (this.state == "closed") {703return;704}705dbg("got changefeed, now initializing table data");706const changed_keys = this.update_all(initval);707dbg("setting state to connected");708this.set_state("connected");709710// NOTE: Can't emit change event until after711// switching state to connected, which is why712// we do it here.713this.emit_change(changed_keys);714}715716private close_changefeed(): void {717if (this.changefeed == null) return;718this.remove_changefeed_handlers();719this.changefeed.close();720delete this.changefeed;721}722723private create_changefeed_connection = async (): Promise<any[]> => {724let delay_ms: number = 3000;725let warned = false;726let first = true;727while (true) {728this.close_changefeed();729if (730USE_CONAT &&731!isTestClient(this.client) &&732this.client.is_browser() &&733!this.project_id734) {735this.changefeed = new ConatChangefeed({736account_id: this.client.client_id?.()!,737query: this.query,738options: this.options,739});740// This init_changefeed_handlers MUST be initialized here since this.changefeed might741// get closed very soon, and missing a close event would be very, very bad.742this.init_changefeed_handlers();743} else {744this.changefeed = new Changefeed(this.changefeed_options());745this.init_changefeed_handlers();746await this.wait_until_ready_to_query_db();747}748try {749const initval = await this.changefeed.connect();750751if (this.changefeed.get_state() == "closed" || !initval) {752throw Error("closed during creation");753}754if (warned) {755console.log(`SUCCESS creating ${this.table} changefeed`);756}757return initval;758} catch (err) {759if (is_fatal(err.toString())) {760console.warn("FATAL creating initial changefeed", this.table, err);761this.close(true);762throw err;763}764if (err.code == 429) {765const message = `${err}`;766console.log(message);767this.client.alert_message?.({768title: `Too Many Requests (${this.table})`,769message,770type: "error",771});772await delay(30 * 1000);773}774if (first) {775// don't warn the first time776first = false;777} else {778// This can happen because we might suddenly NOT be ready779// to query db immediately after we are ready...780warned = true;781console.log(782`WARNING: ${this.table} -- failed to create changefeed connection; will retry in ${delay_ms}ms -- ${err}`,783);784}785await delay(delay_ms);786delay_ms = Math.min(20000, delay_ms * 1.25);787}788}789};790791private async wait_until_ready_to_query_db(): Promise<void> {792const dbg = this.dbg("wait_until_ready_to_query_db");793794// Wait until we're ready to query the database.795let client_state: string;796797if (this.schema.anonymous || this.client.is_project()) {798// For anonymous tables (and for project accessing db),799// this just means the client is connected.800client_state = "connected";801} else {802// For non-anonymous tables, the client803// has to actually be signed in.804client_state = "signed_in";805}806807if (this.client[`is_${client_state}`]()) {808dbg("state already achieved -- no need to wait");809return;810}811812await once(this.client, client_state);813dbg(`success -- client emited ${client_state}`);814}815816private changefeed_options() {817return {818do_query: query_function(this.client.query, this.table),819query_cancel: this.client.query_cancel.bind(this.client),820options: this.options,821query: this.query,822table: this.table,823};824}825826// awkward code due to typescript weirdness using both827// ConatChangefeed and Changefeed types (for unit testing).828private init_changefeed_handlers(): void {829const c = this.changefeed as EventEmitter | null;830if (c == null) return;831c.on("update", this.changefeed_on_update);832c.on("close", this.changefeed_on_close);833}834835private remove_changefeed_handlers(): void {836const c = this.changefeed as EventEmitter | null;837if (c == null) return;838c.removeListener("update", this.changefeed_on_update);839c.removeListener("close", this.changefeed_on_close);840}841842private changefeed_on_update(change): void {843this.update_change(change);844}845846private changefeed_on_close(): void {847this.dbg("changefeed_on_close")();848this.set_state("disconnected");849this.create_changefeed();850}851852private disconnected(why: string): void {853const dbg = this.dbg("disconnected");854dbg(`why=${why}`);855if (this.state === "disconnected") {856dbg("already disconnected");857return;858}859this.set_state("disconnected");860}861862private obj_to_key(_): string | undefined {863// Return string key used in the immutable map in864// which this table is stored.865throw Error("this.obj_to_key must be set during initialization");866}867868private init_query(): void {869// Check that the query is probably valid, and870// record the table and schema871const tables = keys(this.query);872if (len(tables) !== 1) {873throw Error("must query only a single table");874}875this.table = tables[0];876this.schema = schema.SCHEMA[this.table];877if (this.schema == null) {878throw Error(`unknown schema for table ${this.table}`);879}880if (this.client.is_project()) {881this.client_query = this.schema.project_query;882} else {883this.client_query = this.schema.user_query;884}885if (this.client_query == null) {886throw Error(`no query schema allowing queries to ${this.table}`);887}888if (!is_array(this.query[this.table])) {889throw Error("must be a multi-document query");890}891this.primary_keys = schema.client_db.primary_keys(this.table);892// Check that all primary keys are in the query.893for (const primary_key of this.primary_keys) {894if (this.query[this.table][0][primary_key] === undefined) {895throw Error(896`must include each primary key in query of table '${this.table}', but you missed '${primary_key}'`,897);898}899}900// Check that all keys in the query are allowed by the schema.901for (const query_key of keys(this.query[this.table][0])) {902if (this.client_query.get.fields[query_key] === undefined) {903throw Error(904`every key in query of table '${this.table}' must` +905` be a valid user get field in the schema but '${query_key}' is not`,906);907}908}909910// Function this.to_key to extract primary key from object911if (this.primary_keys.length === 1) {912// very common case913const pk = this.primary_keys[0];914this.obj_to_key = (obj) => {915if (obj == null) {916return;917}918if (Map.isMap(obj)) {919return to_key(obj.get(pk));920} else {921return to_key(obj[pk]);922}923};924} else {925// compound primary key926this.obj_to_key = (obj) => {927if (obj == null) {928return;929}930const v: any[] = [];931if (Map.isMap(obj)) {932for (const pk of this.primary_keys) {933const a = obj.get(pk);934if (a == null) {935return;936}937v.push(a);938}939} else {940for (const pk of this.primary_keys) {941const a = obj[pk];942if (a == null) {943return;944}945v.push(a);946}947}948return to_key(v);949};950}951952if (this.client_query != null && this.client_query.set != null) {953// Initialize set_fields and required_set_fields.954const set = this.client_query.set;955for (const field of keys(this.query[this.table][0])) {956if (set.fields != null && set.fields[field]) {957this.set_fields.push(field);958}959if (set.required_fields != null && set.required_fields[field]) {960this.required_set_fields[field] = true;961}962}963}964}965966/* Send all unsent changes.967This function must not be called more than once at a time.968Returns boolean:969false -- there are no additional changes to be saved970true -- new changes may have appeared during the _save that971need to be saved.972973If writing to the database results in an error (but not due to no network),974then an error state is set (which client can consult), an even is emitted,975and we do not try to write to the database again until that error976state is cleared. One way it can be cleared is by changing the table.977*/978private async _save(): Promise<boolean> {979//console.log("_save");980const dbg = this.dbg("_save");981dbg();982if (this.get_state() == "closed") {983return false;984}985if (this.client_query.set == null) {986// Nothing to do -- can never set anything for this table.987// There are some tables (e.g., stats) where the remote values988// could change while user is offline, and the code below would989// result in warnings.990return false;991}992//console.log("_save", this.table);993dbg("waiting for network");994await this.wait_until_ready_to_query_db();995if (this.get_state() == "closed") {996return false;997}998dbg("waiting for value");999await this.wait_until_value();1000if (this.get_state() == "closed") {1001return false;1002}1003if (len(this.changes) === 0) {1004return false;1005}1006if (this.value == null) {1007throw Error("value must not be null");1008}10091010// Send our changes to the server.1011const query: any[] = [];1012const timed_changes: TimedChange[] = [];1013const proposed_keys: { [key: string]: boolean } = {};1014const changes = copy(this.changes);1015//console.log("_save: send ", changes);1016for (const key in this.changes) {1017if (this.versions[key] === 0) {1018proposed_keys[key] = true;1019}1020const x = this.value.get(key);1021if (x == null) {1022throw Error("delete is not implemented");1023}1024const obj = x.toJS();10251026if (!this.no_db_set) {1027// qobj is the db query version of obj, or at least the part1028// of it that expresses what changed.1029const qobj = {};1030// Set the primary key part:1031if (this.primary_keys.length === 1) {1032qobj[this.primary_keys[0]] = key;1033} else {1034// unwrap compound primary key1035const v = JSON.parse(key);1036let i = 0;1037for (const primary_key of this.primary_keys) {1038qobj[primary_key] = v[i];1039i += 1;1040}1041}1042// Can only send set_field sets to the database. Of these,1043// only send what actually changed.1044const prev = this.last_save.get(key);1045for (const k of this.set_fields) {1046if (!x.has(k)) continue;1047if (prev == null) {1048qobj[k] = obj[k];1049continue;1050}10511052// Convert to List to get a clean way to *compare* no1053// matter whether they are immutable.js objects or not!1054const a = List([x.get(k)]);1055const b = List([prev.get(k)]);1056if (!a.equals(b)) {1057qobj[k] = obj[k];1058}1059}10601061for (const k in this.required_set_fields) {1062if (qobj[k] == null) {1063qobj[k] = obj[k];1064}1065}10661067query.push({ [this.table]: qobj });1068}1069timed_changes.push({ obj, time: this.changes[key] });1070}1071dbg("sending timed-changes", timed_changes);1072this.emit("timed-changes", timed_changes);10731074if (!this.no_db_set) {1075try {1076const value = this.value;1077dbg("doing database query");1078await callback2(this.client.query, {1079query,1080options: [{ set: true }], // force it to be a set query1081});1082this.last_save = value; // success -- don't have to save this stuff anymore...1083} catch (err) {1084this.setError(err, query);1085dbg("db query failed", err);1086if (is_fatal(err.toString())) {1087console.warn("FATAL doing set", this.table, err);1088this.close(true);1089throw err;1090}1091// NOTE: we do not show entire log since the number1092// of entries in the query can be very large and just1093// converting them all to text could use a lot of memory (?).1094console.warn(1095`_save('${this.table}') set query error:`,1096err,1097" queries: ",1098query[0],1099"...",1100query.length - 1,1101" omitted",1102);1103return true;1104}1105}11061107if (this.get_state() == "closed") return false;1108if (this.value == null) {1109// should not happen1110return false;1111}11121113if (this.no_db_set) {1114// Not using changefeeds, so have to depend on other mechanisms1115// to update state. Wait until changes to proposed keys are1116// acknowledged by their version being assigned.1117try {1118dbg("waiting until versions are updated");1119await this.wait_until_versions_are_updated(proposed_keys, 5000);1120} catch (err) {1121dbg("waiting for versions timed out / failed");1122// took too long -- try again to send and receive changes.1123return true;1124}1125}11261127dbg("Record that we successfully sent these changes");1128for (const key in changes) {1129if (changes[key] == this.changes[key]) {1130delete this.changes[key];1131}1132}1133this.update_has_uncommitted_changes();11341135const is_done = len(this.changes) === 0;1136dbg("done? ", is_done);1137return !is_done;1138}11391140private setError(error: string, query: Query): void {1141console.warn("WARNING: Synctable error -- ", error);1142this.error = { error, query };1143}11441145public clearError(): void {1146this.error = undefined;1147this.emit("clear-error");1148}11491150private async wait_until_versions_are_updated(1151proposed_keys: { [key: string]: boolean },1152timeout_ms: number,1153): Promise<void> {1154const start_ms = Date.now();1155while (len(proposed_keys) > 0) {1156for (const key in proposed_keys) {1157if (this.versions[key] > 0) {1158delete proposed_keys[key];1159}1160}1161if (len(proposed_keys) > 0) {1162const elapsed_ms = Date.now() - start_ms;1163const keys: string[] = await once(1164this,1165"increased-versions",1166timeout_ms - elapsed_ms,1167);1168for (const key of keys) {1169delete proposed_keys[key];1170}1171}1172}1173}11741175// Return modified immutable Map, with all types coerced to be1176// as specified in the schema, if possible, or throw an exception.1177private do_coerce_types(1178changes: Map<string | number, any>,1179): Map<string | number, any> {1180if (!Map.isMap(changes)) {1181changes = Map(changes);1182}1183if (!this.coerce_types) {1184// no-op if coerce_types isn't set.1185return changes;1186}1187const t = schema.SCHEMA[this.table];1188if (t == null) {1189throw Error(`Missing schema for table ${this.table}`);1190}1191const fields = copy(t.fields);1192if (fields == null) {1193throw Error(`Missing fields part of schema for table ${this.table}`);1194}1195let specs;1196if (t.virtual != null) {1197if (t.virtual === true) {1198throw Error(`t.virtual can't be true for ${this.table}`);1199}1200const x = schema.SCHEMA[t.virtual];1201if (x == null) {1202throw Error(`invalid virtual table spec for ${this.table}`);1203}1204specs = copy(x.fields);1205if (specs == null) {1206throw Error(`invalid virtual table spec for ${this.table}`);1207}1208} else {1209specs = fields;1210}12111212if (typeof this.query != "string") {1213// explicit query (not just from schema)1214let x = this.query[this.table];1215if (is_array(x)) {1216x = x[0];1217}1218for (const k in fields) {1219if (x[k] === undefined) {1220delete fields[k];1221}1222}1223}1224return Map(1225changes.map((value, field) => {1226if (typeof field !== "string") {1227// satisfy typescript.1228return;1229}1230if (value == null) {1231// do not coerce null types1232return value;1233}1234if (fields[field] == null) {1235//console.warn(changes, fields);1236throw Error(1237`Cannot coerce: no field '${field}' in table '${this.table}'`,1238);1239}1240const spec = specs[field];1241let desired: string | undefined = spec.type || spec.pg_type;1242if (desired == null) {1243throw Error(`Cannot coerce: no type info for field ${field}`);1244}1245desired = desired.toLowerCase();12461247const actual = typeof value;1248if (desired === actual) {1249return value;1250}12511252// We can add more or less later...1253if (desired === "string" || desired.slice(0, 4) === "char") {1254if (actual !== "string") {1255// ensure is a string1256return `${value}`;1257}1258return value;1259}1260if (desired === "timestamp") {1261if (!(value instanceof Date)) {1262// make it a Date object. (usually converting from string rep)1263return new Date(value);1264}1265return value;1266}1267if (desired === "integer") {1268// always fine to do this -- will round floats, fix strings, etc.1269return parseInt(value);1270}1271if (desired === "number") {1272// actual wasn't number, so parse:1273return parseFloat(value);1274}1275if (desired === "array") {1276if (!List.isList(value)) {1277value = fromJS(value);1278if (!List.isList(value)) {1279throw Error(1280`field ${field} of table ${this.table} (value=${changes.get(1281field,1282)}) must convert to an immutable.js List`,1283);1284}1285}1286return value;1287}1288if (desired === "map") {1289if (!Map.isMap(value)) {1290value = Map(value);1291if (!Map.isMap(value)) {1292throw Error(1293`field ${field} of table ${this.table} (value=${changes.get(1294field,1295)}) must convert to an immutable.js Map`,1296);1297}1298}1299return value;1300}1301if (desired === "boolean") {1302// actual wasn't boolean, so coerce.1303return !!value;1304}1305if (desired === "uuid") {1306assert_uuid(value);1307return value;1308}1309return value;1310}),1311);1312}13131314/*1315Handle an update of all records from the database.1316This happens on initialization, and also if we1317disconnect and reconnect.1318*/1319private update_all(v: any[]): any[] {1320//const dbg = this.dbg("update_all");13211322if (this.state === "closed") {1323// nothing to do -- just ignore updates from db1324throw Error("makes no sense to do update_all when state is closed.");1325}13261327this.emit("before-change");1328// Restructure the array of records in v as a mapping1329// from the primary key to the corresponding record.1330const x = {};1331for (const y of v) {1332const key = this.obj_to_key(y);1333if (key != null) {1334x[key] = y;1335// initialize all version numbers1336this.versions[key] = this.initial_version;1337}1338}1339const changed_keys = keys(x); // of course all keys have been changed.1340this.emit("increased-versions", changed_keys);13411342this.value = fromJS(x);1343if (this.value == null) {1344throw Error("bug");1345}1346this.last_save = this.value;1347if (this.coerce_types) {1348// Ensure all values are properly coerced, as specified1349// in the database schema. This is important, e.g., since1350// when mocking the client db query, JSON is involved and1351// timestamps are not parsed to Date objects.1352this.value = <Map<string, Map<string, any>>>this.value.map((val, _) => {1353if (val == null) {1354throw Error("val must not be null");1355}1356return this.do_coerce_types(val);1357});1358}13591360// It's possibly that nothing changed (e.g., typical case1361// on reconnect!) so we check.1362// If something really did change, we set the server1363// state to what we just got, and1364// also inform listeners of which records changed (by giving keys).1365//console.log("update_all: changed_keys=", changed_keys)1366if (this.state === "connected") {1367// When not yet connected, initial change is emitted1368// by function that sets up the changefeed. We are1369// connected here, so we are responsible for emitting1370// this change.1371this.emit_change(changed_keys);1372}13731374this.emit("init-value-server");1375return changed_keys;1376}13771378public initial_version_for_browser_client(): VersionedChange[] {1379if (this.value == null) {1380throw Error("value must not be null");1381}1382const x: VersionedChange[] = [];1383this.value.forEach((val, key) => {1384if (val == null) {1385throw Error("val must be non-null");1386}1387const obj = val.toJS();1388if (obj == null) {1389throw Error("obj must be non-null");1390}1391if (key == null) {1392throw Error("key must not be null");1393}1394const version = this.versions[key];1395if (version == null) {1396throw Error("version must not be null");1397}13981399x.push({ obj, version });1400});1401return x;1402}14031404public init_browser_client(changes: VersionedChange[]): void {1405const dbg = this.dbg("init_browser_client");1406dbg(`applying ${changes.length} versioned changes`);1407// The value before doing init (which happens precisely when project1408// synctable is reset). See note below.1409const before = this.value;1410const received_keys = this.apply_changes_to_browser_client(changes);1411if (before != null) {1412before.forEach((_, key) => {1413if (key == null || received_keys[key]) return; // received as part of init1414if (this.changes[key] && this.versions[key] == 0) return; // not event sent yet1415// This key was known and confirmed sent before init, but1416// didn't get sent back this time. So it was lost somehow,1417// e.g., due to not getting saved to the database and the project1418// (or table in the project) getting restarted.1419dbg(`found lost: key=${key}`);1420// So we will try to send out it again.1421if (!this.changes[key]) {1422this.changes[key] = this.unique_server_time();1423this.update_has_uncommitted_changes();1424}1425// So we don't view it as having any known version1426// assigned by project, since the project lost it.1427this.null_version(key);1428});1429if (len(this.changes) > 0) {1430this.save(); // kick off a save of our unsaved lost work back to the project.1431}1432}1433/*1434NOTE: The problem solved here is the following. Imagine the project1435synctable is killed, and it has acknowledge a change C from a1436web browser client, but has NOT saved that change to the central1437postgreSQL database (or someday, maybe a local SQLite database).1438Then when the project synctable is resurrected, it uses the database1439for its initial state, and it knows nothing about C. The1440browser thinks that C has been successfully written and broadcast1441to everybody, so the browser doesn't send C again. The result is1442that the browser and the project would be forever out of sync.1443Note that we only care about lost changes that some browser knows1444about -- if no browser knows about them, then the fact they are1445lost won't break sync. Also, for file editing, data is regularly1446saved to disk, so if the browser sends a change that is lost due to1447the project being killed before writing to the database, then the1448browser terminates too, then that change is completely lost. However,1449everybody will start again with at least the last version of the file1450**saved to disk,** which is basically what people may expect as a1451worst case.14521453The solution to the above problem is to look at what key:value pairs1454we know about that the project didn't just send back to us. If there1455are any that were reported as committed, but they vanished, then we1456set them as unsent and send them again.1457*/1458}14591460public apply_changes_to_browser_client(changes: VersionedChange[]): {1461[key: string]: boolean;1462} {1463const dbg = this.dbg("apply_changes_to_browser_client");1464dbg("got ", changes.length, "changes");1465this.assert_not_closed("apply_changes_to_browser_client");1466if (this.value == null) {1467// initializing the synctable for the first time.1468this.value = Map();1469}14701471this.emit("before-change");1472const changed_keys: string[] = [];1473const increased_versions: string[] = [];1474const received_keys: { [key: string]: boolean } = {};1475for (const change of changes) {1476const { obj, version } = change;1477const new_val = this.do_coerce_types(fromJS(obj));1478const key = this.obj_to_key(new_val);1479if (key == null) {1480throw Error("object results in null key");1481}1482received_keys[key] = true;1483const cur_version = this.versions[key] ? this.versions[key] : 0;1484if (cur_version > version) {1485// nothing further to do.1486continue;1487}1488if (this.handle_new_val(new_val, undefined, "insert", false)) {1489// really did make a change.1490changed_keys.push(key);1491}1492// Update our version number to the newer version.1493this.versions[key] = version;1494increased_versions.push(key);1495}14961497if (increased_versions.length > 0) {1498this.emit("increased-versions", increased_versions);1499}15001501if (changed_keys.length > 0) {1502this.emit_change(changed_keys);1503}1504return received_keys;1505}15061507public apply_changes_from_browser_client(changes: TimedChange[]): void {1508const dbg = this.dbg("apply_changes_from_browser_client");1509dbg("project <-- changes -- client", JSON.stringify(changes));1510const changed_keys: string[] = [];1511const versioned_changes: VersionedChange[] = [];1512for (const change of changes) {1513const { obj, time } = change;1514if (obj == null) {1515throw Error("obj must not be null");1516}1517const new_val = this.do_coerce_types(fromJS(obj));1518const key = this.obj_to_key(new_val); // must have been coerced!1519if (key == null) {1520throw Error("object results in null key");1521}1522const cur_time = this.changes[key];1523if (cur_time != null && cur_time > time) {1524dbg("already have a more recent version");1525// We already have a more recent update to this object.1526// We push that new version out again, just in case.1527if (this.value == null) {1528throw Error("value must not be null");1529}1530let obj: any = this.value.get(key);1531if (obj == null) {1532throw Error(`there must be an object in this.value with key ${key}`);1533}1534obj = obj.toJS();1535const version = this.versions[key];1536if (version == null) {1537throw Error(`object with key ${key} must have a version`);1538}1539versioned_changes.push({ obj, version });1540continue;1541}1542if (this.handle_new_val(new_val, undefined, "insert", false)) {1543const version = this.increment_version(key);1544this.changes[key] = time;1545this.update_has_uncommitted_changes();1546versioned_changes.push({ obj: new_val.toJS(), version });1547changed_keys.push(key);1548}1549}1550if (changed_keys.length > 0) {1551this.emit_change(changed_keys);1552}1553if (versioned_changes.length > 0) {1554this.emit("versioned-changes", versioned_changes);1555}1556dbg("project -- versioned --> clients", JSON.stringify(versioned_changes));1557}15581559private increment_version(key: string): number {1560if (this.versions[key] == null) {1561this.versions[key] = this.initial_version;1562} else {1563this.versions[key] += 1;1564}1565this.emit("increased-versions", [key]);1566return this.versions[key];1567}15681569private null_version(key: string): void {1570this.versions[key] = 0;1571}15721573/*1574Apply one incoming change from the database to the1575in-memory table.1576*/1577private update_change(change): void {1578if (this.state === "closed") {1579// We might get a few more updates even after1580// canceling the changefeed, so we just ignore them.1581return;1582}1583if (this.value == null) {1584console.warn(`update_change(${this.table}): ignored`);1585return;1586}1587this.emit("before-change");1588const changed_keys: string[] = [];1589const key = this.handle_new_val(1590change.new_val,1591change.old_val,1592change.action,1593this.coerce_types,1594change.key,1595);1596if (key != null) {1597changed_keys.push(key);1598}15991600//console.log("update_change: changed_keys=", changed_keys)1601if (changed_keys.length > 0) {1602//console.log("_update_change: change")1603this.emit_change(changed_keys);1604}1605}16061607// Returns current time (in ms since epoch) on server,1608// but if there are multiple requests at the same time,1609// the clock is artificially incremented to ensure uniqueness.1610// Also, this time is thus always strictly increasing.1611private unique_server_time(): number {1612let tm = this.client.server_time().valueOf();1613if (tm <= this.last_server_time) {1614tm = this.last_server_time + 1;1615}1616this.last_server_time = tm;1617return tm;1618}16191620// - returns key only if obj actually changed things.1621private handle_new_val(1622new_val: any,1623old_val: any,1624action: string,1625coerce: boolean,1626key?: string,1627): string | undefined {1628if (this.value == null) {1629// to satisfy typescript.1630throw Error("value must be initialized");1631}16321633if (action === "delete") {1634if (!key) {1635old_val = fromJS(old_val);1636if (old_val == null) {1637throw Error(1638"old_val must not be null or key must be specified for delete action",1639);1640}1641if (coerce && this.coerce_types) {1642old_val = this.do_coerce_types(old_val);1643}1644key = this.obj_to_key(old_val);1645}1646if (key == null || !this.value.has(key)) {1647return; // already gone1648}1649this.value = this.value.delete(key);1650return key;1651}16521653new_val = fromJS(new_val);1654if (new_val == null) {1655throw Error("new_val must not be null for insert or update action");1656}1657if (coerce && this.coerce_types) {1658new_val = this.do_coerce_types(new_val);1659}1660key = this.obj_to_key(new_val);1661if (key == null) {1662// This means the primary key is null or missing, which1663// shouldn't happen. Maybe it could in some edge case.1664// For now, we shouldn't let this break everything, so:1665return undefined;1666// throw Error("key must not be null");1667}1668const cur_val = this.value.get(key);1669if (action === "update" && cur_val != null) {1670// For update actions, we shallow *merge* in the change.1671// For insert action, we just replace the whole thing.1672new_val = cur_val.merge(new_val);1673}1674if (!new_val.equals(cur_val)) {1675this.value = this.value.set(key, new_val);1676return key;1677}1678return undefined;1679}16801681/*1682obj is an immutable.js Map without the primary key1683set. If the database schema defines a way to compute1684the primary key from other keys, try to use it here.1685This function returns the computed primary key (array or string)1686if it works, and returns undefined otherwise.1687*/1688private computed_primary_key(obj): string[] | string | undefined {1689let f;1690if (this.primary_keys.length === 1) {1691f = this.client_query.set.fields[this.primary_keys[0]];1692if (typeof f === "function") {1693return f(obj.toJS(), schema.client_db);1694} else {1695return;1696}1697} else {1698const v: string[] = [];1699for (const pk of this.primary_keys) {1700f = this.client_query.set.fields[pk];1701if (typeof f === "function") {1702v.push(f(obj.toJS(), schema.client_db));1703} else {1704return;1705}1706}1707return v;1708}1709}17101711private assert_not_closed(desc: string): void {1712if (this.state === "closed") {1713//console.trace();1714throw Error(1715`the synctable "${this.table}" must not be closed -- ${desc}`,1716);1717}1718}17191720// **WARNING:** Right now this *barely* works at all... due to1721// barely being implemented since I mostly haven't needed it.1722// It will delete the object from the database, but if some1723// client still has the object, they can end up just writing1724// it back.1725public async delete(obj): Promise<void> {1726// Table spec must have set.delete = true.1727// This function does a direct database query to delete1728// the entry with primary key described by obj from1729// the database. That will have the side effect slightly1730// later of removing the object from this table. This1731// thus works differently than making changes or1732// creating new entries, at least right now (since1733// implementing this properly is a lot of work but1734// not used much).17351736const query = { [this.table]: obj };1737const options = [{ delete: true }];1738await callback2(this.client.query, { query, options });1739}1740}174117421743