Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/sync/table/synctable.ts
1447 views
1
/*
2
* This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.
3
* License: MS-RSL – see LICENSE.md for details
4
*/
5
6
/*
7
8
Variations: Instead of making this class really complicated
9
with many different ways to do sync (e.g, changefeeds, project
10
websockets, unit testing, etc.), we have one single approach via
11
a Client that has a certain interface. Then we implement different
12
Clients that have this interface, in order to support different
13
ways of orchestrating a SyncTable.
14
*/
15
16
// If true, will log to the console a huge amount of
17
// info about every get/set
18
let DEBUG: boolean = false;
19
20
// enable default conat database backed changefeed.
21
// for this to work you must explicitly run the server in @cocalc/database/conat/changefeeds
22
// We only turn this off for a mock testing mode.
23
const USE_CONAT = true && !process.env.COCALC_TEST_MODE;
24
25
export function set_debug(x: boolean): void {
26
DEBUG = x;
27
}
28
29
import { delay } from "awaiting";
30
import { global_cache_decref } from "./global-cache";
31
import { EventEmitter } from "events";
32
import { Map, fromJS, List } from "immutable";
33
import { keys, throttle } from "lodash";
34
import { callback2, cancel_scheduled, once } from "@cocalc/util/async-utils";
35
import { wait } from "@cocalc/util/async-wait";
36
import { query_function } from "./query-function";
37
import { assert_uuid, copy, is_array, is_object, len } from "@cocalc/util/misc";
38
import * as schema from "@cocalc/util/schema";
39
import mergeDeep from "@cocalc/util/immutable-deep-merge";
40
import { reuseInFlight } from "@cocalc/util/reuse-in-flight";
41
import { Changefeed } from "./changefeed";
42
import { ConatChangefeed } from "./changefeed-conat";
43
import { parse_query, to_key } from "./util";
44
import { isTestClient } from "@cocalc/sync/editor/generic/util";
45
46
import type { Client } from "@cocalc/sync/client/types";
47
export type { Client };
48
49
export type Query = any; // TODO typing
50
export type QueryOptions = any[]; // TODO typing
51
52
export type MergeType = "deep" | "shallow" | "none";
53
54
export interface VersionedChange {
55
obj: { [key: string]: any };
56
version: number;
57
}
58
59
export interface TimedChange {
60
obj: { [key: string]: any };
61
time: number; // ms since epoch
62
}
63
64
function is_fatal(err: string): boolean {
65
return err.indexOf("FATAL") != -1;
66
}
67
68
export type State = "disconnected" | "connected" | "closed";
69
70
export class SyncTable extends EventEmitter {
71
private changefeed?: Changefeed | ConatChangefeed;
72
private query: Query;
73
private client_query: any;
74
private primary_keys: string[];
75
private options: QueryOptions;
76
public readonly client: Client;
77
private throttle_changes?: number;
78
private throttled_emit_changes?: Function;
79
private last_server_time: number = 0;
80
private error: { error: string; query: Query } | undefined = undefined;
81
82
// Immutable map -- the value of this synctable.
83
private value?: Map<string, Map<string, any>>;
84
private last_save: Map<string, Map<string, any>> = Map();
85
86
// Which records we have changed (and when, by server time),
87
// that haven't been sent to the backend.
88
private changes: { [key: string]: number } = {};
89
90
// The version of each record.
91
private versions: { [key: string]: number } = {};
92
93
// The inital version is only used in the project, where we
94
// just assume the clock is right. If this were totally
95
// off/changed, then clients would get confused -- until they
96
// close and open the file or refresh their browser. It might
97
// be better to switch to storing the current version number
98
// on disk.
99
private initial_version: number = Date.now();
100
101
// disconnected <--> connected --> closed
102
private state: State;
103
public table: string;
104
private schema: any;
105
private emit_change: Function;
106
public reference_count: number = 0;
107
public cache_key: string | undefined;
108
// Which fields the user is allowed to set/change.
109
// Gets updated during init.
110
private set_fields: string[] = [];
111
// Which fields *must* be included in any set query.
112
// Also updated during init.
113
private required_set_fields: { [key: string]: boolean } = {};
114
115
// Coerce types and generally do strong checking of all
116
// types using the schema. Do this unless you have a very
117
// good reason not to!
118
private coerce_types: boolean = true;
119
120
// If set, then the table is assumed to be managed
121
// entirely externally (using events).
122
// This is used by the synctables that are managed
123
// entirely by the project (e.g., sync-doc support).
124
private no_db_set: boolean = false;
125
126
// Set only for some tables that are hosted directly on a project (not database),
127
// e.g., the project_status and listings.
128
private project_id?: string;
129
130
private last_has_uncommitted_changes?: boolean = undefined;
131
132
// This is used only in synctable-project.ts for a communications channel
133
// for Jupyter on compute servers.
134
public channel?: any;
135
136
constructor(
137
query,
138
options: any[],
139
client: Client,
140
throttle_changes?: number,
141
coerce_types?: boolean,
142
no_db_set?: boolean,
143
project_id?: string,
144
) {
145
super();
146
147
if (coerce_types != undefined) {
148
this.coerce_types = coerce_types;
149
}
150
if (no_db_set != undefined) {
151
this.no_db_set = no_db_set;
152
}
153
if (project_id != undefined) {
154
this.project_id = project_id;
155
}
156
157
if (is_array(query)) {
158
throw Error("must be a single query, not array of queries");
159
}
160
161
this.set_state("disconnected");
162
163
this.changefeed_on_update = this.changefeed_on_update.bind(this);
164
this.changefeed_on_close = this.changefeed_on_close.bind(this);
165
166
this.setMaxListeners(100);
167
this.query = parse_query(query);
168
this.options = options;
169
this.client = client;
170
this.throttle_changes = throttle_changes;
171
172
this.init_query();
173
this.init_throttle_changes();
174
175
// So only ever runs once at a time.
176
this.save = reuseInFlight(this.save.bind(this));
177
this.first_connect();
178
}
179
180
/* PUBLIC API */
181
182
// is_ready is true if the table has been initialized and not yet closed.
183
// It might *not* be currently connected, due to a temporary network
184
// disconnect. When is_ready is true you can read and write to this table,
185
// but there is no guarantee things aren't temporarily stale.
186
public is_ready(): boolean {
187
return this.value != null && this.state !== "closed";
188
}
189
190
/*
191
Return true if there are changes to this synctable that
192
have NOT been confirmed as saved to the backend database.
193
(Always returns false when not yet initialized.)
194
*/
195
public has_uncommitted_changes(): boolean {
196
if (this.state === "closed") {
197
return false; // if closed, can't have any uncommitted changes.
198
}
199
return len(this.changes) !== 0;
200
}
201
202
/* Gets records from this table.
203
- arg = not given: returns everything (as an
204
immutable map from key to obj)
205
- arg = array of keys; return map from key to obj
206
- arg = single key; returns corresponding object
207
208
This is NOT a generic query mechanism. SyncTable
209
is really best thought of as a key:value store!
210
*/
211
public get(arg?): Map<string, any> | undefined {
212
this.assert_not_closed("get");
213
214
if (this.value == null) {
215
throw Error("table not yet initialized");
216
}
217
218
if (arg == null) {
219
return this.value;
220
}
221
222
if (is_array(arg)) {
223
let x: Map<string, Map<string, any>> = Map();
224
for (const k of arg) {
225
const key: string | undefined = to_key(k);
226
if (key != null) {
227
const y = this.value.get(key);
228
if (y != null) {
229
x = x.set(key, y);
230
}
231
}
232
}
233
return x;
234
} else {
235
const key = to_key(arg);
236
if (key != null) {
237
return this.value.get(key);
238
}
239
}
240
}
241
242
/* Return the number of records in the table. */
243
public size(): number {
244
this.assert_not_closed("size");
245
if (this.value == null) {
246
throw Error("table not yet initialized");
247
}
248
return this.value.size;
249
}
250
251
/*
252
Get one record from this table. Especially useful when
253
there is only one record, which is an important special
254
case (a so-called "wide" table?.)
255
*/
256
public get_one(arg?): Map<string, any> | undefined {
257
if (this.value == null) {
258
throw Error("table not yet initialized");
259
}
260
261
if (arg == null) {
262
return this.value.toSeq().first();
263
} else {
264
// get only returns (at most) one object, so it's "get_one".
265
return this.get(arg);
266
}
267
}
268
269
private async wait_until_value(): Promise<void> {
270
if (this.value != null) return;
271
// can't save until server sends state. We wait.
272
await once(this, "init-value-server");
273
if (this.value == null) {
274
throw Error("bug -- change should initialize value");
275
}
276
}
277
278
/*
279
Ensure any unsent changes are sent to the backend.
280
When this function returns there are no unsent changes,
281
since it keeps calling _save until nothing has changed
282
locally.
283
*/
284
public async save(): Promise<void> {
285
const dbg = this.dbg("save");
286
//console.log("synctable SAVE");
287
if (this.state === "closed") {
288
// Not possible to save. save is wrapped in
289
// reuseInFlight, which debounces, so it's very
290
// reasonable that an attempt to call this would
291
// finally fire after a close (which is sync).
292
// Throwing an error hit would (and did) actually
293
// crash projects on the backend in production,
294
// so this has to be a warning.
295
dbg("WARNING: called save on closed synctable");
296
return;
297
}
298
if (this.value == null) {
299
// nothing to save yet
300
return;
301
}
302
303
while (this.has_uncommitted_changes()) {
304
if (this.error) {
305
// do not try to save when there's an error since that
306
// won't help. Need to attempt to fix it first.
307
dbg("WARNING: not saving ", this.error);
308
return;
309
}
310
//console.log("SAVE -- has uncommitted changes, so trying again.");
311
if (this.state !== "connected") {
312
// wait for state change.
313
// This could take a long time, and that is fine.
314
await once(this, "state");
315
}
316
if (this.state === "connected") {
317
if (!(await this._save())) {
318
this.update_has_uncommitted_changes();
319
return;
320
}
321
}
322
// else switched to something else (?), so
323
// loop around and wait again for a change...
324
}
325
}
326
327
private update_has_uncommitted_changes(): void {
328
const cur = this.has_uncommitted_changes();
329
if (cur !== this.last_has_uncommitted_changes) {
330
this.emit("has-uncommitted-changes", cur);
331
this.last_has_uncommitted_changes = cur;
332
}
333
}
334
335
/*
336
set -- Changes (or creates) one entry in the table.
337
The input field changes is either an Immutable.js Map or a JS Object map.
338
If changes does not have the primary key then a random record is updated,
339
and there *must* be at least one record. Exception: computed primary
340
keys will be computed (see stuff about computed primary keys above).
341
The second parameter 'merge' can be one of three values:
342
'deep' : (DEFAULT) deep merges the changes into the record, keep as much info as possible.
343
'shallow': shallow merges, replacing keys by corresponding values
344
'none' : do no merging at all -- just replace record completely
345
Raises an exception if something goes wrong doing the set.
346
Returns updated value otherwise.
347
348
DOES NOT cause a save.
349
350
NOTE: we always use db schema to ensure types are correct,
351
converting if necessary. This has a performance impact,
352
but is worth it for sanity's sake!!!
353
*/
354
public set(
355
changes: any,
356
merge: MergeType = "deep",
357
fire_change_event: boolean = true,
358
): any {
359
if (this.value == null) {
360
throw Error("can't set until table is initialized");
361
}
362
363
if (!Map.isMap(changes)) {
364
changes = fromJS(changes);
365
if (!is_object(changes)) {
366
throw Error(
367
"type error -- changes must be an immutable.js Map or JS map",
368
);
369
}
370
}
371
if (DEBUG) {
372
//console.log(`set('${this.table}'): ${JSON.stringify(changes.toJS())}`);
373
}
374
// For sanity!
375
changes = this.do_coerce_types(changes as any);
376
// Ensure that each key is allowed to be set.
377
if (this.client_query.set == null) {
378
throw Error(`users may not set ${this.table}`);
379
}
380
381
const can_set = this.client_query.set.fields;
382
changes.map((_, k) => {
383
if (can_set[k] === undefined) {
384
throw Error(`users may not set ${this.table}.${k}`);
385
}
386
});
387
// Determine the primary key's value
388
let key: string | undefined = this.obj_to_key(changes);
389
if (key == null) {
390
// attempt to compute primary key if it is a computed primary key
391
let key0 = this.computed_primary_key(changes);
392
key = to_key(key0);
393
if (key == null && this.primary_keys.length === 1) {
394
// use a "random" primary key from existing data
395
key0 = key = this.value.keySeq().first();
396
}
397
if (key == null) {
398
throw Error(
399
`must specify primary key ${this.primary_keys.join(
400
",",
401
)}, have at least one record, or have a computed primary key`,
402
);
403
}
404
// Now key is defined
405
if (this.primary_keys.length === 1) {
406
changes = changes.set(this.primary_keys[0], key0);
407
} else if (this.primary_keys.length > 1) {
408
if (key0 == null) {
409
// to satisfy typescript.
410
throw Error("bug -- computed primary key must be an array");
411
}
412
let i = 0;
413
for (const pk of this.primary_keys) {
414
changes = changes.set(pk, key0[i]);
415
i += 1;
416
}
417
}
418
}
419
420
// Get the current value
421
const cur = this.value.get(key);
422
let new_val;
423
424
if (cur == null) {
425
// No record with the given primary key. Require that
426
// all the this.required_set_fields are specified, or
427
// it will become impossible to sync this table to
428
// the backend.
429
for (const k in this.required_set_fields) {
430
if (changes.get(k) == null) {
431
throw Error(`must specify field '${k}' for new records`);
432
}
433
}
434
// If no current value, then next value is easy -- it equals the current value in all cases.
435
new_val = changes;
436
} else {
437
// Use the appropriate merge strategy to get the next val.
438
switch (merge) {
439
case "deep":
440
new_val = mergeDeep(cur, changes);
441
break;
442
case "shallow":
443
new_val = cur.merge(changes);
444
break;
445
case "none":
446
new_val = changes;
447
break;
448
default:
449
throw Error("merge must be one of 'deep', 'shallow', 'none'");
450
}
451
}
452
453
if (new_val.equals(cur)) {
454
// nothing actually changed, so nothing further to do.
455
return new_val;
456
}
457
458
// clear error state -- the change may be just what is needed
459
// to fix the error, e.g., attempting to save an invalid account
460
// setting, then fixing it.
461
this.clearError();
462
463
for (const field in this.required_set_fields) {
464
if (!new_val.has(field)) {
465
throw Error(
466
`missing required set field ${field} of table ${this.table}`,
467
);
468
}
469
}
470
471
// Something changed:
472
this.value = this.value.set(key, new_val);
473
this.changes[key] = this.unique_server_time();
474
this.update_has_uncommitted_changes();
475
if (this.client.is_project()) {
476
// project assigns versions
477
const version = this.increment_version(key);
478
const obj = new_val.toJS();
479
this.emit("versioned-changes", [{ obj, version }]);
480
} else {
481
// browser gets them assigned...
482
this.null_version(key);
483
// also touch to indicate activity and make sure project running,
484
// in some cases.
485
this.touch_project();
486
}
487
if (fire_change_event) {
488
this.emit_change([key]);
489
}
490
491
return new_val;
492
}
493
494
private async touch_project(): Promise<void> {
495
if (this.project_id != null) {
496
try {
497
await this.client.touch_project(this.project_id);
498
} catch (err) {
499
// not fatal
500
console.warn("touch_project -- ", this.project_id, err);
501
}
502
}
503
}
504
505
public close_no_async(): void {
506
if (this.state === "closed") {
507
// already closed
508
return;
509
}
510
// decrement the reference to this synctable
511
if (global_cache_decref(this)) {
512
// close: not zero -- so don't close it yet --
513
// still in use by possibly multiple clients
514
return;
515
}
516
517
if (this.throttled_emit_changes != null) {
518
cancel_scheduled(this.throttled_emit_changes);
519
delete this.throttled_emit_changes;
520
}
521
522
this.client.removeListener("disconnected", this.disconnected);
523
this.close_changefeed();
524
this.set_state("closed");
525
this.removeAllListeners();
526
delete this.value;
527
}
528
529
public async close(fatal: boolean = false): Promise<void> {
530
if (this.state === "closed") {
531
// already closed
532
return;
533
}
534
this.dbg("close")({ fatal });
535
if (!fatal) {
536
// do a last attempt at a save (so we don't lose data),
537
// then really close.
538
await this.save(); // attempt last save to database.
539
/*
540
The moment the sync part of _save is done, we remove listeners
541
and clear everything up. It's critical that as soon as close
542
is called that there be no possible way any further connect
543
events (etc) can make this SyncTable
544
do anything!! That finality assumption is made
545
elsewhere (e.g in @cocalc/project).
546
*/
547
}
548
this.close_no_async();
549
}
550
551
public async wait(until: Function, timeout: number = 30): Promise<any> {
552
this.assert_not_closed("wait");
553
554
return await wait({
555
obj: this,
556
until,
557
timeout,
558
change_event: "change-no-throttle",
559
});
560
}
561
562
/* INTERNAL PRIVATE METHODS */
563
564
private async first_connect(): Promise<void> {
565
try {
566
await this.connect();
567
this.update_has_uncommitted_changes();
568
} catch (err) {
569
console.warn(
570
`synctable: failed to connect (table=${this.table}), error=${err}`,
571
this.query,
572
);
573
this.close(true);
574
}
575
}
576
577
private set_state(state: State): void {
578
this.state = state;
579
this.emit(state);
580
}
581
582
public get_state(): State {
583
return this.state;
584
}
585
586
public get_table(): string {
587
return this.table;
588
}
589
590
private set_throttle_changes(): void {
591
// No throttling of change events, unless explicitly requested
592
// *or* part of the schema.
593
if (this.throttle_changes != null) return;
594
const t = schema.SCHEMA[this.table];
595
if (t == null) return;
596
const u = t.user_query;
597
if (u == null) return;
598
const g = u.get;
599
if (g == null) return;
600
this.throttle_changes = g.throttle_changes;
601
}
602
603
private init_throttle_changes(): void {
604
this.set_throttle_changes();
605
606
if (!this.throttle_changes) {
607
this.emit_change = (changed_keys: string[]) => {
608
this.emit("change", changed_keys);
609
this.emit("change-no-throttle", changed_keys);
610
};
611
return;
612
}
613
614
// throttle emitting of change events
615
let all_changed_keys = {};
616
const do_emit_changes = () => {
617
//console.log("#{this.table} -- emitting changes", keys(all_changed_keys))
618
// CRITICAL: some code depends on emitting change even
619
// for the *empty* list of keys!
620
// E.g., projects page won't load for new users. This
621
// is the *change* from not loaded to being loaded,
622
// which does make sense.
623
this.emit("change", keys(all_changed_keys));
624
all_changed_keys = {};
625
};
626
this.throttled_emit_changes = throttle(
627
do_emit_changes,
628
this.throttle_changes,
629
);
630
this.emit_change = (changed_keys) => {
631
//console.log("emit_change", changed_keys);
632
this.dbg("emit_change")(changed_keys);
633
//console.log("#{this.table} -- queue changes", changed_keys)
634
for (const key of changed_keys) {
635
all_changed_keys[key] = true;
636
}
637
this.emit("change-no-throttle", changed_keys);
638
if (this.throttled_emit_changes != null) {
639
this.throttled_emit_changes();
640
}
641
};
642
}
643
644
private dbg(_f?: string): Function {
645
if (!DEBUG) {
646
return () => {};
647
}
648
if (this.client.is_project()) {
649
return this.client.dbg(
650
`SyncTable('${JSON.stringify(this.query)}').${_f}`,
651
);
652
} else {
653
return (...args) => {
654
console.log(`synctable("${this.table}").${_f}: `, ...args);
655
};
656
}
657
}
658
659
private connect = async (): Promise<void> => {
660
const dbg = this.dbg("connect");
661
dbg();
662
this.assert_not_closed("connect");
663
if (this.state === "connected") {
664
return;
665
}
666
667
// 1. save, in case we have any local unsaved changes,
668
// then sync with upstream.
669
if (this.value != null) {
670
dbg("send off any local unsaved changes first");
671
await this.save();
672
}
673
674
// 2. Now actually setup the changefeed.
675
// (Even if this.no_db_set is set, this still may do
676
// an initial query to the database. However, the changefeed
677
// does nothing further.)
678
dbg("actually setup changefeed");
679
await this.create_changefeed();
680
681
dbg("connect should have succeeded");
682
};
683
684
private async create_changefeed(): Promise<void> {
685
const dbg = this.dbg("create_changefeed");
686
if (this.get_state() == "closed") {
687
dbg("closed so don't do anything ever again");
688
return;
689
}
690
dbg("creating changefeed connection...");
691
let initval;
692
try {
693
initval = await this.create_changefeed_connection();
694
if (!initval) {
695
throw Error("closed while creating changefeed");
696
}
697
} catch (err) {
698
dbg("failed to create changefeed", err.toString());
699
// Typically this happens if synctable closed while
700
// creating the connection...
701
this.close();
702
}
703
if (this.state == "closed") {
704
return;
705
}
706
dbg("got changefeed, now initializing table data");
707
const changed_keys = this.update_all(initval);
708
dbg("setting state to connected");
709
this.set_state("connected");
710
711
// NOTE: Can't emit change event until after
712
// switching state to connected, which is why
713
// we do it here.
714
this.emit_change(changed_keys);
715
}
716
717
private close_changefeed(): void {
718
if (this.changefeed == null) return;
719
this.remove_changefeed_handlers();
720
this.changefeed.close();
721
delete this.changefeed;
722
}
723
724
private create_changefeed_connection = async (): Promise<any[]> => {
725
let delay_ms: number = 3000;
726
let warned = false;
727
let first = true;
728
while (true) {
729
this.close_changefeed();
730
if (
731
USE_CONAT &&
732
!isTestClient(this.client) &&
733
this.client.is_browser() &&
734
!this.project_id
735
) {
736
this.changefeed = new ConatChangefeed({
737
account_id: this.client.client_id?.()!,
738
query: this.query,
739
options: this.options,
740
});
741
// This init_changefeed_handlers MUST be initialized here since this.changefeed might
742
// get closed very soon, and missing a close event would be very, very bad.
743
this.init_changefeed_handlers();
744
} else {
745
this.changefeed = new Changefeed(this.changefeed_options());
746
this.init_changefeed_handlers();
747
await this.wait_until_ready_to_query_db();
748
}
749
try {
750
const initval = await this.changefeed.connect();
751
752
if (this.changefeed.get_state() == "closed" || !initval) {
753
throw Error("closed during creation");
754
}
755
if (warned) {
756
console.log(`SUCCESS creating ${this.table} changefeed`);
757
}
758
return initval;
759
} catch (err) {
760
if (is_fatal(err.toString())) {
761
console.warn("FATAL creating initial changefeed", this.table, err);
762
this.close(true);
763
throw err;
764
}
765
if (err.code == 429) {
766
const message = `${err}`;
767
console.log(message);
768
this.client.alert_message?.({
769
title: `Too Many Requests (${this.table})`,
770
message,
771
type: "error",
772
});
773
await delay(30 * 1000);
774
}
775
if (first) {
776
// don't warn the first time
777
first = false;
778
} else {
779
// This can happen because we might suddenly NOT be ready
780
// to query db immediately after we are ready...
781
warned = true;
782
console.log(
783
`WARNING: ${this.table} -- failed to create changefeed connection; will retry in ${delay_ms}ms -- ${err}`,
784
);
785
}
786
await delay(delay_ms);
787
delay_ms = Math.min(20000, delay_ms * 1.25);
788
}
789
}
790
};
791
792
private async wait_until_ready_to_query_db(): Promise<void> {
793
const dbg = this.dbg("wait_until_ready_to_query_db");
794
795
// Wait until we're ready to query the database.
796
let client_state: string;
797
798
if (this.schema.anonymous || this.client.is_project()) {
799
// For anonymous tables (and for project accessing db),
800
// this just means the client is connected.
801
client_state = "connected";
802
} else {
803
// For non-anonymous tables, the client
804
// has to actually be signed in.
805
client_state = "signed_in";
806
}
807
808
if (this.client[`is_${client_state}`]()) {
809
dbg("state already achieved -- no need to wait");
810
return;
811
}
812
813
await once(this.client, client_state);
814
dbg(`success -- client emited ${client_state}`);
815
}
816
817
private changefeed_options() {
818
return {
819
do_query: query_function(this.client.query, this.table),
820
query_cancel: this.client.query_cancel.bind(this.client),
821
options: this.options,
822
query: this.query,
823
table: this.table,
824
};
825
}
826
827
// awkward code due to typescript weirdness using both
828
// ConatChangefeed and Changefeed types (for unit testing).
829
private init_changefeed_handlers(): void {
830
const c = this.changefeed as EventEmitter | null;
831
if (c == null) return;
832
c.on("update", this.changefeed_on_update);
833
c.on("close", this.changefeed_on_close);
834
}
835
836
private remove_changefeed_handlers(): void {
837
const c = this.changefeed as EventEmitter | null;
838
if (c == null) return;
839
c.removeListener("update", this.changefeed_on_update);
840
c.removeListener("close", this.changefeed_on_close);
841
}
842
843
private changefeed_on_update(change): void {
844
this.update_change(change);
845
}
846
847
private changefeed_on_close(): void {
848
this.dbg("changefeed_on_close")();
849
this.set_state("disconnected");
850
this.create_changefeed();
851
}
852
853
private disconnected(why: string): void {
854
const dbg = this.dbg("disconnected");
855
dbg(`why=${why}`);
856
if (this.state === "disconnected") {
857
dbg("already disconnected");
858
return;
859
}
860
this.set_state("disconnected");
861
}
862
863
private obj_to_key(_): string | undefined {
864
// Return string key used in the immutable map in
865
// which this table is stored.
866
throw Error("this.obj_to_key must be set during initialization");
867
}
868
869
private init_query(): void {
870
// Check that the query is probably valid, and
871
// record the table and schema
872
const tables = keys(this.query);
873
if (len(tables) !== 1) {
874
throw Error("must query only a single table");
875
}
876
this.table = tables[0];
877
this.schema = schema.SCHEMA[this.table];
878
if (this.schema == null) {
879
throw Error(`unknown schema for table ${this.table}`);
880
}
881
if (this.client.is_project()) {
882
this.client_query = this.schema.project_query;
883
} else {
884
this.client_query = this.schema.user_query;
885
}
886
if (this.client_query == null) {
887
throw Error(`no query schema allowing queries to ${this.table}`);
888
}
889
if (!is_array(this.query[this.table])) {
890
throw Error("must be a multi-document query");
891
}
892
this.primary_keys = schema.client_db.primary_keys(this.table);
893
// Check that all primary keys are in the query.
894
for (const primary_key of this.primary_keys) {
895
if (this.query[this.table][0][primary_key] === undefined) {
896
throw Error(
897
`must include each primary key in query of table '${this.table}', but you missed '${primary_key}'`,
898
);
899
}
900
}
901
// Check that all keys in the query are allowed by the schema.
902
for (const query_key of keys(this.query[this.table][0])) {
903
if (this.client_query.get.fields[query_key] === undefined) {
904
throw Error(
905
`every key in query of table '${this.table}' must` +
906
` be a valid user get field in the schema but '${query_key}' is not`,
907
);
908
}
909
}
910
911
// Function this.to_key to extract primary key from object
912
if (this.primary_keys.length === 1) {
913
// very common case
914
const pk = this.primary_keys[0];
915
this.obj_to_key = (obj) => {
916
if (obj == null) {
917
return;
918
}
919
if (Map.isMap(obj)) {
920
return to_key(obj.get(pk));
921
} else {
922
return to_key(obj[pk]);
923
}
924
};
925
} else {
926
// compound primary key
927
this.obj_to_key = (obj) => {
928
if (obj == null) {
929
return;
930
}
931
const v: any[] = [];
932
if (Map.isMap(obj)) {
933
for (const pk of this.primary_keys) {
934
const a = obj.get(pk);
935
if (a == null) {
936
return;
937
}
938
v.push(a);
939
}
940
} else {
941
for (const pk of this.primary_keys) {
942
const a = obj[pk];
943
if (a == null) {
944
return;
945
}
946
v.push(a);
947
}
948
}
949
return to_key(v);
950
};
951
}
952
953
if (this.client_query != null && this.client_query.set != null) {
954
// Initialize set_fields and required_set_fields.
955
const set = this.client_query.set;
956
for (const field of keys(this.query[this.table][0])) {
957
if (set.fields != null && set.fields[field]) {
958
this.set_fields.push(field);
959
}
960
if (set.required_fields != null && set.required_fields[field]) {
961
this.required_set_fields[field] = true;
962
}
963
}
964
}
965
}
966
967
/* Send all unsent changes.
968
This function must not be called more than once at a time.
969
Returns boolean:
970
false -- there are no additional changes to be saved
971
true -- new changes may have appeared during the _save that
972
need to be saved.
973
974
If writing to the database results in an error (but not due to no network),
975
then an error state is set (which client can consult), an even is emitted,
976
and we do not try to write to the database again until that error
977
state is cleared. One way it can be cleared is by changing the table.
978
*/
979
private async _save(): Promise<boolean> {
980
//console.log("_save");
981
const dbg = this.dbg("_save");
982
dbg();
983
if (this.get_state() == "closed") {
984
return false;
985
}
986
if (this.client_query.set == null) {
987
// Nothing to do -- can never set anything for this table.
988
// There are some tables (e.g., stats) where the remote values
989
// could change while user is offline, and the code below would
990
// result in warnings.
991
return false;
992
}
993
//console.log("_save", this.table);
994
dbg("waiting for network");
995
await this.wait_until_ready_to_query_db();
996
if (this.get_state() == "closed") {
997
return false;
998
}
999
dbg("waiting for value");
1000
await this.wait_until_value();
1001
if (this.get_state() == "closed") {
1002
return false;
1003
}
1004
if (len(this.changes) === 0) {
1005
return false;
1006
}
1007
if (this.value == null) {
1008
throw Error("value must not be null");
1009
}
1010
1011
// Send our changes to the server.
1012
const query: any[] = [];
1013
const timed_changes: TimedChange[] = [];
1014
const proposed_keys: { [key: string]: boolean } = {};
1015
const changes = copy(this.changes);
1016
//console.log("_save: send ", changes);
1017
for (const key in this.changes) {
1018
if (this.versions[key] === 0) {
1019
proposed_keys[key] = true;
1020
}
1021
const x = this.value.get(key);
1022
if (x == null) {
1023
throw Error("delete is not implemented");
1024
}
1025
const obj = x.toJS();
1026
1027
if (!this.no_db_set) {
1028
// qobj is the db query version of obj, or at least the part
1029
// of it that expresses what changed.
1030
const qobj = {};
1031
// Set the primary key part:
1032
if (this.primary_keys.length === 1) {
1033
qobj[this.primary_keys[0]] = key;
1034
} else {
1035
// unwrap compound primary key
1036
const v = JSON.parse(key);
1037
let i = 0;
1038
for (const primary_key of this.primary_keys) {
1039
qobj[primary_key] = v[i];
1040
i += 1;
1041
}
1042
}
1043
// Can only send set_field sets to the database. Of these,
1044
// only send what actually changed.
1045
const prev = this.last_save.get(key);
1046
for (const k of this.set_fields) {
1047
if (!x.has(k)) continue;
1048
if (prev == null) {
1049
qobj[k] = obj[k];
1050
continue;
1051
}
1052
1053
// Convert to List to get a clean way to *compare* no
1054
// matter whether they are immutable.js objects or not!
1055
const a = List([x.get(k)]);
1056
const b = List([prev.get(k)]);
1057
if (!a.equals(b)) {
1058
qobj[k] = obj[k];
1059
}
1060
}
1061
1062
for (const k in this.required_set_fields) {
1063
if (qobj[k] == null) {
1064
qobj[k] = obj[k];
1065
}
1066
}
1067
1068
query.push({ [this.table]: qobj });
1069
}
1070
timed_changes.push({ obj, time: this.changes[key] });
1071
}
1072
dbg("sending timed-changes", timed_changes);
1073
this.emit("timed-changes", timed_changes);
1074
1075
if (!this.no_db_set) {
1076
try {
1077
const value = this.value;
1078
dbg("doing database query");
1079
await callback2(this.client.query, {
1080
query,
1081
options: [{ set: true }], // force it to be a set query
1082
});
1083
this.last_save = value; // success -- don't have to save this stuff anymore...
1084
} catch (err) {
1085
this.setError(err, query);
1086
dbg("db query failed", err);
1087
if (is_fatal(err.toString())) {
1088
console.warn("FATAL doing set", this.table, err);
1089
this.close(true);
1090
throw err;
1091
}
1092
// NOTE: we do not show entire log since the number
1093
// of entries in the query can be very large and just
1094
// converting them all to text could use a lot of memory (?).
1095
console.warn(
1096
`_save('${this.table}') set query error:`,
1097
err,
1098
" queries: ",
1099
query[0],
1100
"...",
1101
query.length - 1,
1102
" omitted",
1103
);
1104
return true;
1105
}
1106
}
1107
1108
if (this.get_state() == "closed") return false;
1109
if (this.value == null) {
1110
// should not happen
1111
return false;
1112
}
1113
1114
if (this.no_db_set) {
1115
// Not using changefeeds, so have to depend on other mechanisms
1116
// to update state. Wait until changes to proposed keys are
1117
// acknowledged by their version being assigned.
1118
try {
1119
dbg("waiting until versions are updated");
1120
await this.wait_until_versions_are_updated(proposed_keys, 5000);
1121
} catch (err) {
1122
dbg("waiting for versions timed out / failed");
1123
// took too long -- try again to send and receive changes.
1124
return true;
1125
}
1126
}
1127
1128
dbg("Record that we successfully sent these changes");
1129
for (const key in changes) {
1130
if (changes[key] == this.changes[key]) {
1131
delete this.changes[key];
1132
}
1133
}
1134
this.update_has_uncommitted_changes();
1135
1136
const is_done = len(this.changes) === 0;
1137
dbg("done? ", is_done);
1138
return !is_done;
1139
}
1140
1141
private setError(error: string, query: Query): void {
1142
console.warn("WARNING: Synctable error -- ", error);
1143
this.error = { error, query };
1144
}
1145
1146
public clearError(): void {
1147
this.error = undefined;
1148
this.emit("clear-error");
1149
}
1150
1151
private async wait_until_versions_are_updated(
1152
proposed_keys: { [key: string]: boolean },
1153
timeout_ms: number,
1154
): Promise<void> {
1155
const start_ms = Date.now();
1156
while (len(proposed_keys) > 0) {
1157
for (const key in proposed_keys) {
1158
if (this.versions[key] > 0) {
1159
delete proposed_keys[key];
1160
}
1161
}
1162
if (len(proposed_keys) > 0) {
1163
const elapsed_ms = Date.now() - start_ms;
1164
const keys: string[] = await once(
1165
this,
1166
"increased-versions",
1167
timeout_ms - elapsed_ms,
1168
);
1169
for (const key of keys) {
1170
delete proposed_keys[key];
1171
}
1172
}
1173
}
1174
}
1175
1176
// Return modified immutable Map, with all types coerced to be
1177
// as specified in the schema, if possible, or throw an exception.
1178
private do_coerce_types(
1179
changes: Map<string | number, any>,
1180
): Map<string | number, any> {
1181
if (!Map.isMap(changes)) {
1182
changes = Map(changes);
1183
}
1184
if (!this.coerce_types) {
1185
// no-op if coerce_types isn't set.
1186
return changes;
1187
}
1188
const t = schema.SCHEMA[this.table];
1189
if (t == null) {
1190
throw Error(`Missing schema for table ${this.table}`);
1191
}
1192
const fields = copy(t.fields);
1193
if (fields == null) {
1194
throw Error(`Missing fields part of schema for table ${this.table}`);
1195
}
1196
let specs;
1197
if (t.virtual != null) {
1198
if (t.virtual === true) {
1199
throw Error(`t.virtual can't be true for ${this.table}`);
1200
}
1201
const x = schema.SCHEMA[t.virtual];
1202
if (x == null) {
1203
throw Error(`invalid virtual table spec for ${this.table}`);
1204
}
1205
specs = copy(x.fields);
1206
if (specs == null) {
1207
throw Error(`invalid virtual table spec for ${this.table}`);
1208
}
1209
} else {
1210
specs = fields;
1211
}
1212
1213
if (typeof this.query != "string") {
1214
// explicit query (not just from schema)
1215
let x = this.query[this.table];
1216
if (is_array(x)) {
1217
x = x[0];
1218
}
1219
for (const k in fields) {
1220
if (x[k] === undefined) {
1221
delete fields[k];
1222
}
1223
}
1224
}
1225
return Map(
1226
changes.map((value, field) => {
1227
if (typeof field !== "string") {
1228
// satisfy typescript.
1229
return;
1230
}
1231
if (value == null) {
1232
// do not coerce null types
1233
return value;
1234
}
1235
if (fields[field] == null) {
1236
//console.warn(changes, fields);
1237
throw Error(
1238
`Cannot coerce: no field '${field}' in table '${this.table}'`,
1239
);
1240
}
1241
const spec = specs[field];
1242
let desired: string | undefined = spec.type || spec.pg_type;
1243
if (desired == null) {
1244
throw Error(`Cannot coerce: no type info for field ${field}`);
1245
}
1246
desired = desired.toLowerCase();
1247
1248
const actual = typeof value;
1249
if (desired === actual) {
1250
return value;
1251
}
1252
1253
// We can add more or less later...
1254
if (desired === "string" || desired.slice(0, 4) === "char") {
1255
if (actual !== "string") {
1256
// ensure is a string
1257
return `${value}`;
1258
}
1259
return value;
1260
}
1261
if (desired === "timestamp") {
1262
if (!(value instanceof Date)) {
1263
// make it a Date object. (usually converting from string rep)
1264
return new Date(value);
1265
}
1266
return value;
1267
}
1268
if (desired === "integer") {
1269
// always fine to do this -- will round floats, fix strings, etc.
1270
return parseInt(value);
1271
}
1272
if (desired === "number") {
1273
// actual wasn't number, so parse:
1274
return parseFloat(value);
1275
}
1276
if (desired === "array") {
1277
if (!List.isList(value)) {
1278
value = fromJS(value);
1279
if (!List.isList(value)) {
1280
throw Error(
1281
`field ${field} of table ${this.table} (value=${changes.get(
1282
field,
1283
)}) must convert to an immutable.js List`,
1284
);
1285
}
1286
}
1287
return value;
1288
}
1289
if (desired === "map") {
1290
if (!Map.isMap(value)) {
1291
value = Map(value);
1292
if (!Map.isMap(value)) {
1293
throw Error(
1294
`field ${field} of table ${this.table} (value=${changes.get(
1295
field,
1296
)}) must convert to an immutable.js Map`,
1297
);
1298
}
1299
}
1300
return value;
1301
}
1302
if (desired === "boolean") {
1303
// actual wasn't boolean, so coerce.
1304
return !!value;
1305
}
1306
if (desired === "uuid") {
1307
assert_uuid(value);
1308
return value;
1309
}
1310
return value;
1311
}),
1312
);
1313
}
1314
1315
/*
1316
Handle an update of all records from the database.
1317
This happens on initialization, and also if we
1318
disconnect and reconnect.
1319
*/
1320
private update_all(v: any[]): any[] {
1321
//const dbg = this.dbg("update_all");
1322
1323
if (this.state === "closed") {
1324
// nothing to do -- just ignore updates from db
1325
throw Error("makes no sense to do update_all when state is closed.");
1326
}
1327
1328
this.emit("before-change");
1329
// Restructure the array of records in v as a mapping
1330
// from the primary key to the corresponding record.
1331
const x = {};
1332
for (const y of v) {
1333
const key = this.obj_to_key(y);
1334
if (key != null) {
1335
x[key] = y;
1336
// initialize all version numbers
1337
this.versions[key] = this.initial_version;
1338
}
1339
}
1340
const changed_keys = keys(x); // of course all keys have been changed.
1341
this.emit("increased-versions", changed_keys);
1342
1343
this.value = fromJS(x);
1344
if (this.value == null) {
1345
throw Error("bug");
1346
}
1347
this.last_save = this.value;
1348
if (this.coerce_types) {
1349
// Ensure all values are properly coerced, as specified
1350
// in the database schema. This is important, e.g., since
1351
// when mocking the client db query, JSON is involved and
1352
// timestamps are not parsed to Date objects.
1353
this.value = <Map<string, Map<string, any>>>this.value.map((val, _) => {
1354
if (val == null) {
1355
throw Error("val must not be null");
1356
}
1357
return this.do_coerce_types(val);
1358
});
1359
}
1360
1361
// It's possibly that nothing changed (e.g., typical case
1362
// on reconnect!) so we check.
1363
// If something really did change, we set the server
1364
// state to what we just got, and
1365
// also inform listeners of which records changed (by giving keys).
1366
//console.log("update_all: changed_keys=", changed_keys)
1367
if (this.state === "connected") {
1368
// When not yet connected, initial change is emitted
1369
// by function that sets up the changefeed. We are
1370
// connected here, so we are responsible for emitting
1371
// this change.
1372
this.emit_change(changed_keys);
1373
}
1374
1375
this.emit("init-value-server");
1376
return changed_keys;
1377
}
1378
1379
public initial_version_for_browser_client(): VersionedChange[] {
1380
if (this.value == null) {
1381
throw Error("value must not be null");
1382
}
1383
const x: VersionedChange[] = [];
1384
this.value.forEach((val, key) => {
1385
if (val == null) {
1386
throw Error("val must be non-null");
1387
}
1388
const obj = val.toJS();
1389
if (obj == null) {
1390
throw Error("obj must be non-null");
1391
}
1392
if (key == null) {
1393
throw Error("key must not be null");
1394
}
1395
const version = this.versions[key];
1396
if (version == null) {
1397
throw Error("version must not be null");
1398
}
1399
1400
x.push({ obj, version });
1401
});
1402
return x;
1403
}
1404
1405
public init_browser_client(changes: VersionedChange[]): void {
1406
const dbg = this.dbg("init_browser_client");
1407
dbg(`applying ${changes.length} versioned changes`);
1408
// The value before doing init (which happens precisely when project
1409
// synctable is reset). See note below.
1410
const before = this.value;
1411
const received_keys = this.apply_changes_to_browser_client(changes);
1412
if (before != null) {
1413
before.forEach((_, key) => {
1414
if (key == null || received_keys[key]) return; // received as part of init
1415
if (this.changes[key] && this.versions[key] == 0) return; // not event sent yet
1416
// This key was known and confirmed sent before init, but
1417
// didn't get sent back this time. So it was lost somehow,
1418
// e.g., due to not getting saved to the database and the project
1419
// (or table in the project) getting restarted.
1420
dbg(`found lost: key=${key}`);
1421
// So we will try to send out it again.
1422
if (!this.changes[key]) {
1423
this.changes[key] = this.unique_server_time();
1424
this.update_has_uncommitted_changes();
1425
}
1426
// So we don't view it as having any known version
1427
// assigned by project, since the project lost it.
1428
this.null_version(key);
1429
});
1430
if (len(this.changes) > 0) {
1431
this.save(); // kick off a save of our unsaved lost work back to the project.
1432
}
1433
}
1434
/*
1435
NOTE: The problem solved here is the following. Imagine the project
1436
synctable is killed, and it has acknowledge a change C from a
1437
web browser client, but has NOT saved that change to the central
1438
postgreSQL database (or someday, maybe a local SQLite database).
1439
Then when the project synctable is resurrected, it uses the database
1440
for its initial state, and it knows nothing about C. The
1441
browser thinks that C has been successfully written and broadcast
1442
to everybody, so the browser doesn't send C again. The result is
1443
that the browser and the project would be forever out of sync.
1444
Note that we only care about lost changes that some browser knows
1445
about -- if no browser knows about them, then the fact they are
1446
lost won't break sync. Also, for file editing, data is regularly
1447
saved to disk, so if the browser sends a change that is lost due to
1448
the project being killed before writing to the database, then the
1449
browser terminates too, then that change is completely lost. However,
1450
everybody will start again with at least the last version of the file
1451
**saved to disk,** which is basically what people may expect as a
1452
worst case.
1453
1454
The solution to the above problem is to look at what key:value pairs
1455
we know about that the project didn't just send back to us. If there
1456
are any that were reported as committed, but they vanished, then we
1457
set them as unsent and send them again.
1458
*/
1459
}
1460
1461
public apply_changes_to_browser_client(changes: VersionedChange[]): {
1462
[key: string]: boolean;
1463
} {
1464
const dbg = this.dbg("apply_changes_to_browser_client");
1465
dbg("got ", changes.length, "changes");
1466
this.assert_not_closed("apply_changes_to_browser_client");
1467
if (this.value == null) {
1468
// initializing the synctable for the first time.
1469
this.value = Map();
1470
}
1471
1472
this.emit("before-change");
1473
const changed_keys: string[] = [];
1474
const increased_versions: string[] = [];
1475
const received_keys: { [key: string]: boolean } = {};
1476
for (const change of changes) {
1477
const { obj, version } = change;
1478
const new_val = this.do_coerce_types(fromJS(obj));
1479
const key = this.obj_to_key(new_val);
1480
if (key == null) {
1481
throw Error("object results in null key");
1482
}
1483
received_keys[key] = true;
1484
const cur_version = this.versions[key] ? this.versions[key] : 0;
1485
if (cur_version > version) {
1486
// nothing further to do.
1487
continue;
1488
}
1489
if (this.handle_new_val(new_val, undefined, "insert", false)) {
1490
// really did make a change.
1491
changed_keys.push(key);
1492
}
1493
// Update our version number to the newer version.
1494
this.versions[key] = version;
1495
increased_versions.push(key);
1496
}
1497
1498
if (increased_versions.length > 0) {
1499
this.emit("increased-versions", increased_versions);
1500
}
1501
1502
if (changed_keys.length > 0) {
1503
this.emit_change(changed_keys);
1504
}
1505
return received_keys;
1506
}
1507
1508
public apply_changes_from_browser_client(changes: TimedChange[]): void {
1509
const dbg = this.dbg("apply_changes_from_browser_client");
1510
dbg("project <-- changes -- client", JSON.stringify(changes));
1511
const changed_keys: string[] = [];
1512
const versioned_changes: VersionedChange[] = [];
1513
for (const change of changes) {
1514
const { obj, time } = change;
1515
if (obj == null) {
1516
throw Error("obj must not be null");
1517
}
1518
const new_val = this.do_coerce_types(fromJS(obj));
1519
const key = this.obj_to_key(new_val); // must have been coerced!
1520
if (key == null) {
1521
throw Error("object results in null key");
1522
}
1523
const cur_time = this.changes[key];
1524
if (cur_time != null && cur_time > time) {
1525
dbg("already have a more recent version");
1526
// We already have a more recent update to this object.
1527
// We push that new version out again, just in case.
1528
if (this.value == null) {
1529
throw Error("value must not be null");
1530
}
1531
let obj: any = this.value.get(key);
1532
if (obj == null) {
1533
throw Error(`there must be an object in this.value with key ${key}`);
1534
}
1535
obj = obj.toJS();
1536
const version = this.versions[key];
1537
if (version == null) {
1538
throw Error(`object with key ${key} must have a version`);
1539
}
1540
versioned_changes.push({ obj, version });
1541
continue;
1542
}
1543
if (this.handle_new_val(new_val, undefined, "insert", false)) {
1544
const version = this.increment_version(key);
1545
this.changes[key] = time;
1546
this.update_has_uncommitted_changes();
1547
versioned_changes.push({ obj: new_val.toJS(), version });
1548
changed_keys.push(key);
1549
}
1550
}
1551
if (changed_keys.length > 0) {
1552
this.emit_change(changed_keys);
1553
}
1554
if (versioned_changes.length > 0) {
1555
this.emit("versioned-changes", versioned_changes);
1556
}
1557
dbg("project -- versioned --> clients", JSON.stringify(versioned_changes));
1558
}
1559
1560
private increment_version(key: string): number {
1561
if (this.versions[key] == null) {
1562
this.versions[key] = this.initial_version;
1563
} else {
1564
this.versions[key] += 1;
1565
}
1566
this.emit("increased-versions", [key]);
1567
return this.versions[key];
1568
}
1569
1570
private null_version(key: string): void {
1571
this.versions[key] = 0;
1572
}
1573
1574
/*
1575
Apply one incoming change from the database to the
1576
in-memory table.
1577
*/
1578
private update_change(change): void {
1579
if (this.state === "closed") {
1580
// We might get a few more updates even after
1581
// canceling the changefeed, so we just ignore them.
1582
return;
1583
}
1584
if (this.value == null) {
1585
console.warn(`update_change(${this.table}): ignored`);
1586
return;
1587
}
1588
this.emit("before-change");
1589
const changed_keys: string[] = [];
1590
const key = this.handle_new_val(
1591
change.new_val,
1592
change.old_val,
1593
change.action,
1594
this.coerce_types,
1595
change.key,
1596
);
1597
if (key != null) {
1598
changed_keys.push(key);
1599
}
1600
1601
//console.log("update_change: changed_keys=", changed_keys)
1602
if (changed_keys.length > 0) {
1603
//console.log("_update_change: change")
1604
this.emit_change(changed_keys);
1605
}
1606
}
1607
1608
// Returns current time (in ms since epoch) on server,
1609
// but if there are multiple requests at the same time,
1610
// the clock is artificially incremented to ensure uniqueness.
1611
// Also, this time is thus always strictly increasing.
1612
private unique_server_time(): number {
1613
let tm = this.client.server_time().valueOf();
1614
if (tm <= this.last_server_time) {
1615
tm = this.last_server_time + 1;
1616
}
1617
this.last_server_time = tm;
1618
return tm;
1619
}
1620
1621
// - returns key only if obj actually changed things.
1622
private handle_new_val(
1623
new_val: any,
1624
old_val: any,
1625
action: string,
1626
coerce: boolean,
1627
key?: string,
1628
): string | undefined {
1629
if (this.value == null) {
1630
// to satisfy typescript.
1631
throw Error("value must be initialized");
1632
}
1633
1634
if (action === "delete") {
1635
if (!key) {
1636
old_val = fromJS(old_val);
1637
if (old_val == null) {
1638
throw Error(
1639
"old_val must not be null or key must be specified for delete action",
1640
);
1641
}
1642
if (coerce && this.coerce_types) {
1643
old_val = this.do_coerce_types(old_val);
1644
}
1645
key = this.obj_to_key(old_val);
1646
}
1647
if (key == null || !this.value.has(key)) {
1648
return; // already gone
1649
}
1650
this.value = this.value.delete(key);
1651
return key;
1652
}
1653
1654
new_val = fromJS(new_val);
1655
if (new_val == null) {
1656
throw Error("new_val must not be null for insert or update action");
1657
}
1658
if (coerce && this.coerce_types) {
1659
new_val = this.do_coerce_types(new_val);
1660
}
1661
key = this.obj_to_key(new_val);
1662
if (key == null) {
1663
// This means the primary key is null or missing, which
1664
// shouldn't happen. Maybe it could in some edge case.
1665
// For now, we shouldn't let this break everything, so:
1666
return undefined;
1667
// throw Error("key must not be null");
1668
}
1669
const cur_val = this.value.get(key);
1670
if (action === "update" && cur_val != null) {
1671
// For update actions, we shallow *merge* in the change.
1672
// For insert action, we just replace the whole thing.
1673
new_val = cur_val.merge(new_val);
1674
}
1675
if (!new_val.equals(cur_val)) {
1676
this.value = this.value.set(key, new_val);
1677
return key;
1678
}
1679
return undefined;
1680
}
1681
1682
/*
1683
obj is an immutable.js Map without the primary key
1684
set. If the database schema defines a way to compute
1685
the primary key from other keys, try to use it here.
1686
This function returns the computed primary key (array or string)
1687
if it works, and returns undefined otherwise.
1688
*/
1689
private computed_primary_key(obj): string[] | string | undefined {
1690
let f;
1691
if (this.primary_keys.length === 1) {
1692
f = this.client_query.set.fields[this.primary_keys[0]];
1693
if (typeof f === "function") {
1694
return f(obj.toJS(), schema.client_db);
1695
} else {
1696
return;
1697
}
1698
} else {
1699
const v: string[] = [];
1700
for (const pk of this.primary_keys) {
1701
f = this.client_query.set.fields[pk];
1702
if (typeof f === "function") {
1703
v.push(f(obj.toJS(), schema.client_db));
1704
} else {
1705
return;
1706
}
1707
}
1708
return v;
1709
}
1710
}
1711
1712
private assert_not_closed(desc: string): void {
1713
if (this.state === "closed") {
1714
//console.trace();
1715
throw Error(
1716
`the synctable "${this.table}" must not be closed -- ${desc}`,
1717
);
1718
}
1719
}
1720
1721
// **WARNING:** Right now this *barely* works at all... due to
1722
// barely being implemented since I mostly haven't needed it.
1723
// It will delete the object from the database, but if some
1724
// client still has the object, they can end up just writing
1725
// it back.
1726
public async delete(obj): Promise<void> {
1727
// Table spec must have set.delete = true.
1728
// This function does a direct database query to delete
1729
// the entry with primary key described by obj from
1730
// the database. That will have the side effect slightly
1731
// later of removing the object from this table. This
1732
// thus works differently than making changes or
1733
// creating new entries, at least right now (since
1734
// implementing this properly is a lot of work but
1735
// not used much).
1736
1737
const query = { [this.table]: obj };
1738
const options = [{ delete: true }];
1739
await callback2(this.client.query, { query, options });
1740
}
1741
}
1742
1743