Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/sync/editor/generic/sync-doc.ts
1450 views
1
/*
2
* This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.
3
* License: MS-RSL – see LICENSE.md for details
4
*/
5
6
/*
7
SyncDoc -- the core class for editing with a synchronized document.
8
9
This code supports both string-doc and db-doc, for editing both
10
strings and small database tables efficiently, with history,
11
undo, save to disk, etc.
12
13
This code is run *both* in browser clients and under node.js
14
in projects, and behaves slightly differently in each case.
15
16
EVENTS:
17
18
- before-change: fired before merging in changes from upstream
19
- ... TODO
20
*/
21
22
const USE_CONAT = true;
23
24
/* OFFLINE_THRESH_S - If the client becomes disconnected from
25
the backend for more than this long then---on reconnect---do
26
extra work to ensure that all snapshots are up to date (in
27
case snapshots were made when we were offline), and mark the
28
sent field of patches that weren't saved. I.e., we rebase
29
all offline changes. */
30
// const OFFLINE_THRESH_S = 5 * 60; // 5 minutes.
31
32
/* How often the local hub will autosave this file to disk if
33
it has it open and there are unsaved changes. This is very
34
important since it ensures that a user that edits a file but
35
doesn't click "Save" and closes their browser (right after
36
their edits have gone to the database), still has their
37
file saved to disk soon. This is important, e.g., for homework
38
getting collected and not missing the last few changes. It turns
39
out this is what people expect.
40
Set to 0 to disable. (But don't do that.) */
41
const FILE_SERVER_AUTOSAVE_S = 45;
42
// const FILE_SERVER_AUTOSAVE_S = 5;
43
44
// How big of files we allow users to open using syncstrings.
45
const MAX_FILE_SIZE_MB = 32;
46
47
// How frequently to check if file is or is not read only.
48
// The filesystem watcher is NOT sufficient for this, because
49
// it is NOT triggered on permissions changes. Thus we must
50
// poll for read only status periodically, unfortunately.
51
const READ_ONLY_CHECK_INTERVAL_MS = 7500;
52
53
// This parameter determines throttling when broadcasting cursor position
54
// updates. Make this larger to reduce bandwidth at the expense of making
55
// cursors less responsive.
56
const CURSOR_THROTTLE_MS = 750;
57
58
// NATS is much faster and can handle load, and cursors only uses pub/sub
59
const CURSOR_THROTTLE_NATS_MS = 150;
60
61
// Ignore file changes for this long after save to disk.
62
const RECENT_SAVE_TO_DISK_MS = 2000;
63
64
const PARALLEL_INIT = true;
65
66
import {
67
COMPUTE_THRESH_MS,
68
COMPUTER_SERVER_CURSOR_TYPE,
69
decodeUUIDtoNum,
70
SYNCDB_PARAMS as COMPUTE_SERVE_MANAGER_SYNCDB_PARAMS,
71
} from "@cocalc/util/compute/manager";
72
73
import { DEFAULT_SNAPSHOT_INTERVAL } from "@cocalc/util/db-schema/syncstring-schema";
74
75
type XPatch = any;
76
77
import { reuseInFlight } from "@cocalc/util/reuse-in-flight";
78
import { SyncTable } from "@cocalc/sync/table/synctable";
79
import {
80
callback2,
81
cancel_scheduled,
82
once,
83
retry_until_success,
84
reuse_in_flight_methods,
85
until,
86
} from "@cocalc/util/async-utils";
87
import { wait } from "@cocalc/util/async-wait";
88
import {
89
auxFileToOriginal,
90
assertDefined,
91
close,
92
endswith,
93
field_cmp,
94
filename_extension,
95
hash_string,
96
keys,
97
minutes_ago,
98
} from "@cocalc/util/misc";
99
import * as schema from "@cocalc/util/schema";
100
import { delay } from "awaiting";
101
import { EventEmitter } from "events";
102
import { Map, fromJS } from "immutable";
103
import { debounce, throttle } from "lodash";
104
import { Evaluator } from "./evaluator";
105
import { HistoryEntry, HistoryExportOptions, export_history } from "./export";
106
import { IpywidgetsState } from "./ipywidgets-state";
107
import { SortedPatchList } from "./sorted-patch-list";
108
import type {
109
Client,
110
CompressedPatch,
111
DocType,
112
Document,
113
FileWatcher,
114
Patch,
115
} from "./types";
116
import { isTestClient, patch_cmp } from "./util";
117
import { CONAT_OPEN_FILE_TOUCH_INTERVAL } from "@cocalc/util/conat";
118
import mergeDeep from "@cocalc/util/immutable-deep-merge";
119
import { JUPYTER_SYNCDB_EXTENSIONS } from "@cocalc/util/jupyter/names";
120
import { LegacyHistory } from "./legacy";
121
import { getLogger } from "@cocalc/conat/client";
122
123
const DEBUG = false;
124
125
export type State = "init" | "ready" | "closed";
126
export type DataServer = "project" | "database";
127
128
export interface SyncOpts0 {
129
project_id: string;
130
path: string;
131
client: Client;
132
patch_interval?: number;
133
134
// file_use_interval defaults to 60000.
135
// Specify 0 to disable.
136
file_use_interval?: number;
137
138
string_id?: string;
139
cursors?: boolean;
140
change_throttle?: number;
141
142
// persistent backend session in project, so only close
143
// backend when explicitly requested:
144
persistent?: boolean;
145
146
// If true, entire sync-doc is assumed ephemeral, in the
147
// sense that no edit history gets saved via patches to
148
// the database. The one syncstring record for coordinating
149
// users does get created in the database.
150
ephemeral?: boolean;
151
152
// which data/changefeed server to use
153
data_server?: DataServer;
154
}
155
156
export interface SyncOpts extends SyncOpts0 {
157
from_str: (str: string) => Document;
158
doctype: DocType;
159
}
160
161
export interface UndoState {
162
my_times: number[];
163
pointer: number;
164
without: number[];
165
final?: CompressedPatch;
166
}
167
168
// NOTE: Do not make multiple SyncDoc's for the same document, especially
169
// not on the frontend.
170
171
const logger = getLogger("sync-doc");
172
logger.debug("init");
173
174
export class SyncDoc extends EventEmitter {
175
public readonly project_id: string; // project_id that contains the doc
176
public readonly path: string; // path of the file corresponding to the doc
177
private string_id: string;
178
private my_user_id: number;
179
180
private client: Client;
181
private _from_str: (str: string) => Document; // creates a doc from a string.
182
183
// Throttling of incoming upstream patches from project to client.
184
private patch_interval: number = 250;
185
186
// This is what's actually output by setInterval -- it's
187
// not an amount of time.
188
private fileserver_autosave_timer: number = 0;
189
190
private read_only_timer: number = 0;
191
192
// throttling of change events -- e.g., is useful for course
193
// editor where we have hundreds of changes and the UI gets
194
// overloaded unless we throttle and group them.
195
private change_throttle: number = 0;
196
197
// file_use_interval throttle: default is 60s for everything
198
private file_use_interval: number;
199
private throttled_file_use?: Function;
200
201
private cursors: boolean = false; // if true, also provide cursor tracking functionality
202
private cursor_map: Map<string, any> = Map();
203
private cursor_last_time: Date = new Date(0);
204
205
// doctype: object describing document constructor
206
// (used by project to open file)
207
private doctype: DocType;
208
209
private state: State = "init";
210
211
private syncstring_table: SyncTable;
212
private patches_table: SyncTable;
213
private cursors_table: SyncTable;
214
215
public evaluator?: Evaluator;
216
217
public ipywidgets_state?: IpywidgetsState;
218
219
private patch_list?: SortedPatchList;
220
221
private last: Document;
222
private doc: Document;
223
private before_change?: Document;
224
225
private last_user_change: Date = minutes_ago(60);
226
private last_save_to_disk_time: Date = new Date(0);
227
228
private last_snapshot?: number;
229
private last_seq?: number;
230
private snapshot_interval: number;
231
232
private users: string[];
233
234
private settings: Map<string, any> = Map();
235
236
private syncstring_save_state: string = "";
237
238
// patches that this client made during this editing session.
239
private my_patches: { [time: string]: XPatch } = {};
240
241
private watch_path?: string;
242
private file_watcher?: FileWatcher;
243
244
private handle_patch_update_queue_running: boolean;
245
private patch_update_queue: string[] = [];
246
247
private undo_state: UndoState | undefined;
248
249
private save_to_disk_start_ctime: number | undefined;
250
private save_to_disk_end_ctime: number | undefined;
251
252
private persistent: boolean = false;
253
254
private last_has_unsaved_changes?: boolean = undefined;
255
256
private ephemeral: boolean = false;
257
258
private sync_is_disabled: boolean = false;
259
private delay_sync_timer: any;
260
261
// static because we want exactly one across all docs!
262
private static computeServerManagerDoc?: SyncDoc;
263
264
private useConat: boolean;
265
legacy: LegacyHistory;
266
267
constructor(opts: SyncOpts) {
268
super();
269
if (opts.string_id === undefined) {
270
this.string_id = schema.client_db.sha1(opts.project_id, opts.path);
271
} else {
272
this.string_id = opts.string_id;
273
}
274
275
for (const field of [
276
"project_id",
277
"path",
278
"client",
279
"patch_interval",
280
"file_use_interval",
281
"change_throttle",
282
"cursors",
283
"doctype",
284
"from_patch_str",
285
"persistent",
286
"data_server",
287
"ephemeral",
288
]) {
289
if (opts[field] != undefined) {
290
this[field] = opts[field];
291
}
292
}
293
294
this.legacy = new LegacyHistory({
295
project_id: this.project_id,
296
path: this.path,
297
client: this.client,
298
});
299
300
// NOTE: Do not use conat in test mode, since there we use a minimal
301
// "fake" client that does all communication internally and doesn't
302
// use conat. We also use this for the messages composer.
303
this.useConat = USE_CONAT && !isTestClient(opts.client);
304
if (this.ephemeral) {
305
// So the doctype written to the database reflects the
306
// ephemeral state. Here ephemeral determines whether
307
// or not patches are written to the database by the
308
// project.
309
this.doctype.opts = { ...this.doctype.opts, ephemeral: true };
310
}
311
if (this.cursors) {
312
// similarly to ephemeral, but for cursors. We track them
313
// on the backend since they can also be very useful, e.g.,
314
// with jupyter they are used for connecting remote compute,
315
// and **should** also be used for broadcasting load and other
316
// status information (TODO).
317
this.doctype.opts = { ...this.doctype.opts, cursors: true };
318
}
319
this._from_str = opts.from_str;
320
321
// Initialize to time when we create the syncstring, so we don't
322
// see our own cursor when we refresh the browser (before we move
323
// to update this).
324
this.cursor_last_time = this.client?.server_time();
325
326
reuse_in_flight_methods(this, [
327
"save",
328
"save_to_disk",
329
"load_from_disk",
330
"handle_patch_update_queue",
331
]);
332
333
if (this.change_throttle) {
334
this.emit_change = throttle(this.emit_change, this.change_throttle);
335
}
336
337
this.setMaxListeners(100);
338
339
this.init();
340
}
341
342
/*
343
Initialize everything.
344
This should be called *exactly* once by the constructor,
345
and no other time. It tries to set everything up. If
346
the browser isn't connected to the network, it'll wait
347
until it is (however long, etc.). If this fails, it closes
348
this SyncDoc.
349
*/
350
private initialized = false;
351
private init = async () => {
352
if (this.initialized) {
353
throw Error("init can only be called once");
354
}
355
// const start = Date.now();
356
this.assert_not_closed("init");
357
const log = this.dbg("init");
358
await until(
359
async () => {
360
if (this.state != "init") {
361
return true;
362
}
363
try {
364
log("initializing all tables...");
365
await this.initAll();
366
log("initAll succeeded");
367
return true;
368
} catch (err) {
369
console.trace(err);
370
const m = `WARNING: problem initializing ${this.path} -- ${err}`;
371
log(m);
372
// log always:
373
console.log(m);
374
}
375
log("wait then try again");
376
return false;
377
},
378
{ start: 3000, max: 15000, decay: 1.3 },
379
);
380
381
// Success -- everything initialized with no issues.
382
this.set_state("ready");
383
this.init_watch();
384
this.emit_change(); // from nothing to something.
385
};
386
387
// True if this client is responsible for managing
388
// the state of this document with respect to
389
// the file system. By default, the project is responsible,
390
// but it could be something else (e.g., a compute server!). It's
391
// important that whatever algorithm determines this, it is
392
// a function of state that is eventually consistent.
393
// IMPORTANT: whether or not we are the file server can
394
// change over time, so if you call isFileServer and
395
// set something up (e.g., autosave or a watcher), based
396
// on the result, you need to clear it when the state
397
// changes. See the function handleComputeServerManagerChange.
398
private isFileServer = reuseInFlight(async () => {
399
if (this.state == "closed") return;
400
if (this.client == null || this.client.is_browser()) {
401
// browser is never the file server (yet), and doesn't need to do
402
// anything related to watching for changes in state.
403
// Someday via webassembly or browsers making users files availabl,
404
// etc., we will have this. Not today.
405
return false;
406
}
407
const computeServerManagerDoc = this.getComputeServerManagerDoc();
408
const log = this.dbg("isFileServer");
409
if (computeServerManagerDoc == null) {
410
log("not using compute server manager for this doc");
411
return this.client.is_project();
412
}
413
414
const state = computeServerManagerDoc.get_state();
415
log("compute server manager doc state: ", state);
416
if (state == "closed") {
417
log("compute server manager is closed");
418
// something really messed up
419
return this.client.is_project();
420
}
421
if (state != "ready") {
422
try {
423
log(
424
"waiting for compute server manager doc to be ready; current state=",
425
state,
426
);
427
await once(computeServerManagerDoc, "ready", 15000);
428
log("compute server manager is ready");
429
} catch (err) {
430
log(
431
"WARNING -- failed to initialize computeServerManagerDoc -- err=",
432
err,
433
);
434
return this.client.is_project();
435
}
436
}
437
438
// id of who the user *wants* to be the file server.
439
const path = this.getFileServerPath();
440
const fileServerId =
441
computeServerManagerDoc.get_one({ path })?.get("id") ?? 0;
442
if (this.client.is_project()) {
443
log(
444
"we are project, so we are fileserver if fileServerId=0 and it is ",
445
fileServerId,
446
);
447
return fileServerId == 0;
448
}
449
// at this point we have to be a compute server
450
const computeServerId = decodeUUIDtoNum(this.client.client_id());
451
// this is usually true -- but might not be if we are switching
452
// directly from one compute server to another.
453
log("we are compute server and ", { fileServerId, computeServerId });
454
return fileServerId == computeServerId;
455
});
456
457
private getFileServerPath = () => {
458
if (this.path?.endsWith("." + JUPYTER_SYNCDB_EXTENSIONS)) {
459
// treating jupyter as a weird special case here.
460
return auxFileToOriginal(this.path);
461
}
462
return this.path;
463
};
464
465
private getComputeServerManagerDoc = () => {
466
if (this.path == COMPUTE_SERVE_MANAGER_SYNCDB_PARAMS.path) {
467
// don't want to recursively explode!
468
return null;
469
}
470
if (SyncDoc.computeServerManagerDoc == null) {
471
if (this.client.is_project()) {
472
// @ts-ignore: TODO!
473
SyncDoc.computeServerManagerDoc = this.client.syncdoc({
474
path: COMPUTE_SERVE_MANAGER_SYNCDB_PARAMS.path,
475
});
476
} else {
477
// @ts-ignore: TODO!
478
SyncDoc.computeServerManagerDoc = this.client.sync_client.sync_db({
479
project_id: this.project_id,
480
...COMPUTE_SERVE_MANAGER_SYNCDB_PARAMS,
481
});
482
}
483
if (
484
SyncDoc.computeServerManagerDoc != null &&
485
!this.client.is_browser()
486
) {
487
// start watching for state changes
488
SyncDoc.computeServerManagerDoc.on(
489
"change",
490
this.handleComputeServerManagerChange,
491
);
492
}
493
}
494
return SyncDoc.computeServerManagerDoc;
495
};
496
497
private handleComputeServerManagerChange = async (keys) => {
498
if (SyncDoc.computeServerManagerDoc == null) {
499
return;
500
}
501
let relevant = false;
502
for (const key of keys ?? []) {
503
if (key.get("path") == this.path) {
504
relevant = true;
505
break;
506
}
507
}
508
if (!relevant) {
509
return;
510
}
511
const path = this.getFileServerPath();
512
const fileServerId =
513
SyncDoc.computeServerManagerDoc.get_one({ path })?.get("id") ?? 0;
514
const ourId = this.client.is_project()
515
? 0
516
: decodeUUIDtoNum(this.client.client_id());
517
// we are considering ourself the file server already if we have
518
// either a watcher or autosave on.
519
const thinkWeAreFileServer =
520
this.file_watcher != null || this.fileserver_autosave_timer;
521
const weAreFileServer = fileServerId == ourId;
522
if (thinkWeAreFileServer != weAreFileServer) {
523
// life has changed! Let's adapt.
524
if (thinkWeAreFileServer) {
525
// we were acting as the file server, but now we are not.
526
await this.save_to_disk_filesystem_owner();
527
// Stop doing things we are no longer supposed to do.
528
clearInterval(this.fileserver_autosave_timer as any);
529
this.fileserver_autosave_timer = 0;
530
// stop watching filesystem
531
await this.update_watch_path();
532
} else {
533
// load our state from the disk
534
await this.load_from_disk();
535
// we were not acting as the file server, but now we need. Let's
536
// step up to the plate.
537
// start watching filesystem
538
await this.update_watch_path(this.path);
539
// enable autosave
540
await this.init_file_autosave();
541
}
542
}
543
};
544
545
// Return id of ACTIVE remote compute server, if one is connected and pinging, or 0
546
// if none is connected. This is used by Jupyter to determine who
547
// should evaluate code.
548
// We always take the smallest id of the remote
549
// compute servers, in case there is more than one, so exactly one of them
550
// takes control. Always returns 0 if cursors are not enabled for this
551
// document, since the cursors table is used to coordinate the compute
552
// server.
553
getComputeServerId = (): number => {
554
if (!this.cursors) {
555
return 0;
556
}
557
// This info is in the "cursors" table instead of the document itself
558
// to avoid wasting space in the database longterm. Basically a remote
559
// Jupyter client that can provide compute announces this by reporting it's
560
// cursor to look a certain way.
561
const cursors = this.get_cursors({
562
maxAge: COMPUTE_THRESH_MS,
563
// don't exclude self since getComputeServerId called from the compute
564
// server also to know if it is the chosen one.
565
excludeSelf: "never",
566
});
567
const dbg = this.dbg("getComputeServerId");
568
dbg("num cursors = ", cursors.size);
569
let minId = Infinity;
570
// NOTE: similar code is in frontend/jupyter/cursor-manager.ts
571
for (const [client_id, cursor] of cursors) {
572
if (cursor.getIn(["locs", 0, "type"]) == COMPUTER_SERVER_CURSOR_TYPE) {
573
try {
574
minId = Math.min(minId, decodeUUIDtoNum(client_id));
575
} catch (err) {
576
// this should never happen unless a client were being malicious.
577
dbg(
578
"WARNING -- client_id should encode server id, but is",
579
client_id,
580
);
581
}
582
}
583
}
584
585
return isFinite(minId) ? minId : 0;
586
};
587
588
registerAsComputeServer = () => {
589
this.setCursorLocsNoThrottle([{ type: COMPUTER_SERVER_CURSOR_TYPE }]);
590
};
591
592
/* Set this user's cursors to the given locs. */
593
setCursorLocsNoThrottle = async (
594
// locs is 'any' and not any[] because of a codemirror syntax highlighting bug!
595
locs: any,
596
side_effect: boolean = false,
597
) => {
598
if (this.state != "ready") {
599
return;
600
}
601
if (this.cursors_table == null) {
602
if (!this.cursors) {
603
throw Error("cursors are not enabled");
604
}
605
// table not initialized yet
606
return;
607
}
608
if (this.useConat) {
609
const time = this.client.server_time().valueOf();
610
const x: {
611
user_id: number;
612
locs: any;
613
time: number;
614
} = {
615
user_id: this.my_user_id,
616
locs,
617
time,
618
};
619
// will actually always be non-null due to above
620
this.cursor_last_time = new Date(x.time);
621
this.cursors_table.set(x);
622
return;
623
}
624
625
const x: {
626
string_id?: string;
627
user_id: number;
628
locs: any[];
629
time?: Date;
630
} = {
631
string_id: this.string_id,
632
user_id: this.my_user_id,
633
locs,
634
};
635
const now = this.client.server_time();
636
if (!side_effect || (x.time ?? now) >= now) {
637
// the now comparison above is in case the cursor time
638
// is in the future (due to clock issues) -- always fix that.
639
x.time = now;
640
}
641
if (x.time != null) {
642
// will actually always be non-null due to above
643
this.cursor_last_time = x.time;
644
}
645
this.cursors_table.set(x, "none");
646
await this.cursors_table.save();
647
};
648
649
set_cursor_locs: typeof this.setCursorLocsNoThrottle = throttle(
650
this.setCursorLocsNoThrottle,
651
USE_CONAT ? CURSOR_THROTTLE_NATS_MS : CURSOR_THROTTLE_MS,
652
{
653
leading: true,
654
trailing: true,
655
},
656
);
657
658
private init_file_use_interval = (): void => {
659
if (this.file_use_interval == null) {
660
this.file_use_interval = 60 * 1000;
661
}
662
663
if (!this.file_use_interval || !this.client.is_browser()) {
664
// file_use_interval has to be nonzero, and we only do
665
// this for browser user.
666
return;
667
}
668
669
const file_use = async () => {
670
await delay(100); // wait a little so my_patches and gets updated.
671
// We ONLY count this and record that the file was
672
// edited if there was an actual change record in the
673
// patches log, by this user, since last time.
674
let user_is_active: boolean = false;
675
for (const tm in this.my_patches) {
676
if (new Date(parseInt(tm)) > this.last_user_change) {
677
user_is_active = true;
678
break;
679
}
680
}
681
if (!user_is_active) {
682
return;
683
}
684
this.last_user_change = new Date();
685
this.client.mark_file?.({
686
project_id: this.project_id,
687
path: this.path,
688
action: "edit",
689
ttl: this.file_use_interval,
690
});
691
};
692
this.throttled_file_use = throttle(file_use, this.file_use_interval, {
693
leading: true,
694
});
695
696
this.on("user-change", this.throttled_file_use as any);
697
};
698
699
private set_state = (state: State): void => {
700
this.state = state;
701
this.emit(state);
702
};
703
704
get_state = (): State => {
705
return this.state;
706
};
707
708
get_project_id = (): string => {
709
return this.project_id;
710
};
711
712
get_path = (): string => {
713
return this.path;
714
};
715
716
get_string_id = (): string => {
717
return this.string_id;
718
};
719
720
get_my_user_id = (): number => {
721
return this.my_user_id != null ? this.my_user_id : 0;
722
};
723
724
private assert_not_closed(desc: string): void {
725
if (this.state === "closed") {
726
//console.trace();
727
throw Error(`must not be closed -- ${desc}`);
728
}
729
}
730
731
set_doc = (doc: Document, exit_undo_mode: boolean = true): void => {
732
if (doc.is_equal(this.doc)) {
733
// no change.
734
return;
735
}
736
if (exit_undo_mode) this.undo_state = undefined;
737
// console.log(`sync-doc.set_doc("${doc.to_str()}")`);
738
this.doc = doc;
739
740
// debounced, so don't immediately alert, in case there are many
741
// more sets comming in the same loop:
742
this.emit_change_debounced();
743
};
744
745
// Convenience function to avoid having to do
746
// get_doc and set_doc constantly.
747
set = (x: any): void => {
748
this.set_doc(this.doc.set(x));
749
};
750
751
delete = (x?: any): void => {
752
this.set_doc(this.doc.delete(x));
753
};
754
755
get = (x?: any): any => {
756
return this.doc.get(x);
757
};
758
759
get_one(x?: any): any {
760
return this.doc.get_one(x);
761
}
762
763
// Return underlying document, or undefined if document
764
// hasn't been set yet.
765
get_doc = (): Document => {
766
if (this.doc == null) {
767
throw Error("doc must be set");
768
}
769
return this.doc;
770
};
771
772
// Set this doc from its string representation.
773
from_str = (value: string): void => {
774
// console.log(`sync-doc.from_str("${value}")`);
775
this.doc = this._from_str(value);
776
};
777
778
// Return string representation of this doc,
779
// or exception if not yet ready.
780
to_str = (): string => {
781
if (this.doc == null) {
782
throw Error("doc must be set");
783
}
784
return this.doc.to_str();
785
};
786
787
count = (): number => {
788
return this.doc.count();
789
};
790
791
// Version of the document at a given point in time; if no
792
// time specified, gives the version right now.
793
// If not fully initialized, will throw exception.
794
version = (time?: number): Document => {
795
this.assert_table_is_ready("patches");
796
assertDefined(this.patch_list);
797
return this.patch_list.value({ time });
798
};
799
800
/* Compute version of document if the patches at the given times
801
were simply not included. This is a building block that is
802
used for implementing undo functionality for client editors. */
803
version_without = (without_times: number[]): Document => {
804
this.assert_table_is_ready("patches");
805
assertDefined(this.patch_list);
806
return this.patch_list.value({ without_times });
807
};
808
809
// Revert document to what it was at the given point in time.
810
// There doesn't have to be a patch at exactly that point in
811
// time -- if there isn't it just uses the patch before that
812
// point in time.
813
revert = (time: number): void => {
814
this.set_doc(this.version(time));
815
};
816
817
/* Undo/redo public api.
818
Calling this.undo and this.redo returns the version of
819
the document after the undo or redo operation, and records
820
a commit changing to that.
821
The first time calling this.undo switches into undo
822
state in which additional
823
calls to undo/redo move up and down the stack of changes made
824
by this user during this session.
825
826
Call this.exit_undo_mode() to exit undo/redo mode.
827
828
Undo and redo *only* impact changes made by this user during
829
this session. Other users edits are unaffected, and work by
830
this same user working from another browser tab or session is
831
also unaffected.
832
833
Finally, undo of a past patch by definition means "the state
834
of the document" if that patch was not applied. The impact
835
of undo is NOT that the patch is removed from the patch history.
836
Instead, it records a new patch that is what would have happened
837
had we replayed history with the patches being undone not there.
838
839
Doing any set_doc explicitly exits undo mode automatically.
840
*/
841
undo = (): Document => {
842
const prev = this._undo();
843
this.set_doc(prev, false);
844
this.commit();
845
return prev;
846
};
847
848
redo = (): Document => {
849
const next = this._redo();
850
this.set_doc(next, false);
851
this.commit();
852
return next;
853
};
854
855
private _undo(): Document {
856
this.assert_is_ready("_undo");
857
let state = this.undo_state;
858
if (state == null) {
859
// not in undo mode
860
state = this.initUndoState();
861
}
862
if (state.pointer === state.my_times.length) {
863
// pointing at live state (e.g., happens on entering undo mode)
864
const value: Document = this.version(); // last saved version
865
const live: Document = this.doc;
866
if (!live.is_equal(value)) {
867
// User had unsaved changes, so last undo is to revert to version without those.
868
state.final = value.make_patch(live); // live redo if needed
869
state.pointer -= 1; // most recent timestamp
870
return value;
871
} else {
872
// User had no unsaved changes, so last undo is version without last saved change.
873
const tm = state.my_times[state.pointer - 1];
874
state.pointer -= 2;
875
if (tm != null) {
876
state.without.push(tm);
877
return this.version_without(state.without);
878
} else {
879
// no undo information during this session
880
return value;
881
}
882
}
883
} else {
884
// pointing at particular timestamp in the past
885
if (state.pointer >= 0) {
886
// there is still more to undo
887
state.without.push(state.my_times[state.pointer]);
888
state.pointer -= 1;
889
}
890
return this.version_without(state.without);
891
}
892
}
893
894
private _redo(): Document {
895
this.assert_is_ready("_redo");
896
const state = this.undo_state;
897
if (state == null) {
898
// nothing to do but return latest live version
899
return this.get_doc();
900
}
901
if (state.pointer === state.my_times.length) {
902
// pointing at live state -- nothing to do
903
return this.get_doc();
904
} else if (state.pointer === state.my_times.length - 1) {
905
// one back from live state, so apply unsaved patch to live version
906
const value = this.version();
907
if (value == null) {
908
// see remark in undo -- do nothing
909
return this.get_doc();
910
}
911
state.pointer += 1;
912
return value.apply_patch(state.final);
913
} else {
914
// at least two back from live state
915
state.without.pop();
916
state.pointer += 1;
917
if (state.final == null && state.pointer === state.my_times.length - 1) {
918
// special case when there wasn't any live change
919
state.pointer += 1;
920
}
921
return this.version_without(state.without);
922
}
923
}
924
925
in_undo_mode = (): boolean => {
926
return this.undo_state != null;
927
};
928
929
exit_undo_mode = (): void => {
930
this.undo_state = undefined;
931
};
932
933
private initUndoState = (): UndoState => {
934
if (this.undo_state != null) {
935
return this.undo_state;
936
}
937
const my_times = keys(this.my_patches).map((x) => parseInt(x));
938
my_times.sort();
939
this.undo_state = {
940
my_times,
941
pointer: my_times.length,
942
without: [],
943
};
944
return this.undo_state;
945
};
946
947
private save_to_disk_autosave = async (): Promise<void> => {
948
if (this.state !== "ready") {
949
return;
950
}
951
const dbg = this.dbg("save_to_disk_autosave");
952
dbg();
953
try {
954
await this.save_to_disk();
955
} catch (err) {
956
dbg(`failed -- ${err}`);
957
}
958
};
959
960
/* Make it so the local hub project will automatically save
961
the file to disk periodically. */
962
private init_file_autosave = async () => {
963
// Do not autosave sagews until we resolve
964
// https://github.com/sagemathinc/cocalc/issues/974
965
// Similarly, do not autosave ipynb because of
966
// https://github.com/sagemathinc/cocalc/issues/5216
967
if (
968
!FILE_SERVER_AUTOSAVE_S ||
969
!(await this.isFileServer()) ||
970
this.fileserver_autosave_timer ||
971
endswith(this.path, ".sagews") ||
972
endswith(this.path, "." + JUPYTER_SYNCDB_EXTENSIONS)
973
) {
974
return;
975
}
976
977
// Explicit cast due to node vs browser typings.
978
this.fileserver_autosave_timer = <any>(
979
setInterval(this.save_to_disk_autosave, FILE_SERVER_AUTOSAVE_S * 1000)
980
);
981
};
982
983
// account_id of the user who made the edit at
984
// the given point in time.
985
account_id = (time: number): string => {
986
this.assert_is_ready("account_id");
987
return this.users[this.user_id(time)];
988
};
989
990
// Integer index of user who made the edit at given
991
// point in time.
992
user_id = (time: number): number => {
993
this.assert_table_is_ready("patches");
994
assertDefined(this.patch_list);
995
return this.patch_list.user_id(time);
996
};
997
998
private syncstring_table_get_one = (): Map<string, any> => {
999
if (this.syncstring_table == null) {
1000
throw Error("syncstring_table must be defined");
1001
}
1002
const t = this.syncstring_table.get_one();
1003
if (t == null) {
1004
// project has not initialized it yet.
1005
return Map();
1006
}
1007
return t;
1008
};
1009
1010
/* The project calls set_initialized once it has checked for
1011
the file on disk; this way the frontend knows that the
1012
syncstring has been initialized in the database, and also
1013
if there was an error doing the check.
1014
*/
1015
private set_initialized = async (
1016
error: string,
1017
read_only: boolean,
1018
size: number,
1019
): Promise<void> => {
1020
this.assert_table_is_ready("syncstring");
1021
this.dbg("set_initialized")({ error, read_only, size });
1022
const init = { time: this.client.server_time(), size, error };
1023
for (let i = 0; i < 3; i++) {
1024
await this.set_syncstring_table({
1025
init,
1026
read_only,
1027
last_active: this.client.server_time(),
1028
});
1029
await delay(1000);
1030
}
1031
};
1032
1033
/* List of logical timestamps of the versions of this string in the sync
1034
table that we opened to start editing (so starts with what was
1035
the most recent snapshot when we started). The list of timestamps
1036
is sorted from oldest to newest. */
1037
versions = (): number[] => {
1038
assertDefined(this.patch_list);
1039
return this.patch_list.versions();
1040
};
1041
1042
wallTime = (version: number): number | undefined => {
1043
return this.patch_list?.wallTime(version);
1044
};
1045
1046
// newest version of any non-staging known patch on this client,
1047
// including ones just made that might not be in patch_list yet.
1048
newestVersion = (): number | undefined => {
1049
return this.patch_list?.newest_patch_time();
1050
};
1051
1052
hasVersion = (time: number): boolean => {
1053
assertDefined(this.patch_list);
1054
return this.patch_list.hasVersion(time);
1055
};
1056
1057
historyFirstVersion = () => {
1058
this.assert_table_is_ready("patches");
1059
assertDefined(this.patch_list);
1060
return this.patch_list.firstVersion();
1061
};
1062
1063
historyLastVersion = () => {
1064
this.assert_table_is_ready("patches");
1065
assertDefined(this.patch_list);
1066
return this.patch_list.lastVersion();
1067
};
1068
1069
historyVersionNumber = (time: number): number | undefined => {
1070
return this.patch_list?.versionNumber(time);
1071
};
1072
1073
last_changed = (): number => {
1074
const v = this.versions();
1075
return v[v.length - 1] ?? 0;
1076
};
1077
1078
private init_table_close_handlers(): void {
1079
for (const x of ["syncstring", "patches", "cursors"]) {
1080
const t = this[x + "_table"];
1081
if (t != null) {
1082
t.on("close", this.close);
1083
}
1084
}
1085
}
1086
1087
// more gentle version -- this can cause the project actions
1088
// to be *created* etc.
1089
end = reuseInFlight(async () => {
1090
if (this.client.is_browser() && this.state == "ready") {
1091
try {
1092
await this.save_to_disk();
1093
} catch (err) {
1094
// has to be non-fatal since we are closing the document,
1095
// and of couse we need to clear up everything else.
1096
// Do nothing here.
1097
}
1098
}
1099
this.close();
1100
});
1101
1102
// Close synchronized editing of this string; this stops listening
1103
// for changes and stops broadcasting changes.
1104
close = reuseInFlight(async () => {
1105
if (this.state == "closed") {
1106
return;
1107
}
1108
const dbg = this.dbg("close");
1109
dbg("close");
1110
1111
SyncDoc.computeServerManagerDoc?.removeListener(
1112
"change",
1113
this.handleComputeServerManagerChange,
1114
);
1115
//
1116
// SYNC STUFF
1117
//
1118
1119
// WARNING: that 'closed' is emitted at the beginning of the
1120
// close function (before anything async) for the project is
1121
// assumed in src/packages/project/sync/sync-doc.ts, because
1122
// that ensures that the moment close is called we lock trying
1123
// try create the syncdoc again until closing is finished.
1124
// (This set_state call emits "closed"):
1125
this.set_state("closed");
1126
1127
this.emit("close");
1128
1129
// must be after the emits above, so clients know
1130
// what happened and can respond.
1131
this.removeAllListeners();
1132
1133
if (this.throttled_file_use != null) {
1134
// Cancel any pending file_use calls.
1135
cancel_scheduled(this.throttled_file_use);
1136
(this.throttled_file_use as any).cancel();
1137
}
1138
1139
if (this.emit_change != null) {
1140
// Cancel any pending change emit calls.
1141
cancel_scheduled(this.emit_change);
1142
}
1143
1144
if (this.fileserver_autosave_timer) {
1145
clearInterval(this.fileserver_autosave_timer as any);
1146
this.fileserver_autosave_timer = 0;
1147
}
1148
1149
if (this.read_only_timer) {
1150
clearInterval(this.read_only_timer as any);
1151
this.read_only_timer = 0;
1152
}
1153
1154
this.patch_update_queue = [];
1155
1156
// Stop watching for file changes. It's important to
1157
// do this *before* all the await's below, since
1158
// this syncdoc can't do anything in response to a
1159
// a file change in its current state.
1160
this.update_watch_path(); // no input = closes it, if open
1161
1162
if (this.patch_list != null) {
1163
// not async -- just a data structure in memory
1164
this.patch_list.close();
1165
}
1166
1167
try {
1168
this.closeTables();
1169
dbg("closeTables -- successfully saved all data to database");
1170
} catch (err) {
1171
dbg(`closeTables -- ERROR -- ${err}`);
1172
}
1173
// this avoids memory leaks:
1174
close(this);
1175
1176
// after doing that close, we need to keep the state (which just got deleted) as 'closed'
1177
this.set_state("closed");
1178
dbg("close done");
1179
});
1180
1181
private closeTables = async () => {
1182
this.syncstring_table?.close();
1183
this.patches_table?.close();
1184
this.cursors_table?.close();
1185
this.evaluator?.close();
1186
this.ipywidgets_state?.close();
1187
};
1188
1189
// TODO: We **have** to do this on the client, since the backend
1190
// **security model** for accessing the patches table only
1191
// knows the string_id, but not the project_id/path. Thus
1192
// there is no way currently to know whether or not the client
1193
// has access to the patches, and hence the patches table
1194
// query fails. This costs significant time -- a roundtrip
1195
// and write to the database -- whenever the user opens a file.
1196
// This fix should be to change the patches schema somehow
1197
// to have the user also provide the project_id and path, thus
1198
// proving they have access to the sha1 hash (string_id), but
1199
// don't actually use the project_id and path as columns in
1200
// the table. This requires some new idea I guess of virtual
1201
// fields....
1202
// Also, this also establishes the correct doctype.
1203
1204
// Since this MUST succeed before doing anything else. This is critical
1205
// because the patches table can't be opened anywhere if the syncstring
1206
// object doesn't exist, due to how our security works, *AND* that the
1207
// patches table uses the string_id, which is a SHA1 hash.
1208
private ensure_syncstring_exists_in_db = async (): Promise<void> => {
1209
const dbg = this.dbg("ensure_syncstring_exists_in_db");
1210
if (this.useConat) {
1211
dbg("skipping -- no database");
1212
return;
1213
}
1214
1215
if (!this.client.is_connected()) {
1216
dbg("wait until connected...", this.client.is_connected());
1217
await once(this.client, "connected");
1218
}
1219
1220
if (this.client.is_browser() && !this.client.is_signed_in()) {
1221
// the browser has to sign in, unlike the project (and compute servers)
1222
await once(this.client, "signed_in");
1223
}
1224
1225
if (this.state == ("closed" as State)) return;
1226
1227
dbg("do syncstring write query...");
1228
1229
await callback2(this.client.query, {
1230
query: {
1231
syncstrings: {
1232
string_id: this.string_id,
1233
project_id: this.project_id,
1234
path: this.path,
1235
doctype: JSON.stringify(this.doctype),
1236
},
1237
},
1238
});
1239
dbg("wrote syncstring to db - done.");
1240
};
1241
1242
private synctable = async (
1243
query,
1244
options: any[],
1245
throttle_changes?: undefined | number,
1246
): Promise<SyncTable> => {
1247
this.assert_not_closed("synctable");
1248
const dbg = this.dbg("synctable");
1249
if (!this.useConat && !this.ephemeral && this.persistent) {
1250
// persistent table in a non-ephemeral syncdoc, so ensure that table is
1251
// persisted to database (not just in memory).
1252
options = options.concat([{ persistent: true }]);
1253
}
1254
if (this.ephemeral) {
1255
options.push({ ephemeral: true });
1256
}
1257
let synctable;
1258
let ephemeral = false;
1259
for (const x of options) {
1260
if (x.ephemeral) {
1261
ephemeral = true;
1262
break;
1263
}
1264
}
1265
if (this.useConat && query.patches) {
1266
synctable = await this.client.synctable_conat(query, {
1267
obj: {
1268
project_id: this.project_id,
1269
path: this.path,
1270
},
1271
stream: true,
1272
atomic: true,
1273
desc: { path: this.path },
1274
start_seq: this.last_seq,
1275
ephemeral,
1276
});
1277
1278
if (this.last_seq) {
1279
// any possibility last_seq is wrong?
1280
if (!isCompletePatchStream(synctable.dstream)) {
1281
// we load everything and fix it. This happened
1282
// for data moving to conat when the seq numbers changed.
1283
console.log("updating invalid timetravel -- ", this.path);
1284
1285
synctable.close();
1286
synctable = await this.client.synctable_conat(query, {
1287
obj: {
1288
project_id: this.project_id,
1289
path: this.path,
1290
},
1291
stream: true,
1292
atomic: true,
1293
desc: { path: this.path },
1294
ephemeral,
1295
});
1296
1297
// also find the correct last_seq:
1298
let n = synctable.dstream.length - 1;
1299
for (; n >= 0; n--) {
1300
const x = synctable.dstream[n];
1301
if (x?.is_snapshot) {
1302
const time = x.time;
1303
// find the seq number with time
1304
let m = n - 1;
1305
let last_seq = 0;
1306
while (m >= 1) {
1307
if (synctable.dstream[m].time == time) {
1308
last_seq = synctable.dstream.seq(m);
1309
break;
1310
}
1311
m -= 1;
1312
}
1313
this.last_seq = last_seq;
1314
await this.set_syncstring_table({
1315
last_snapshot: time,
1316
last_seq,
1317
});
1318
this.setLastSnapshot(time);
1319
break;
1320
}
1321
}
1322
if (n == -1) {
1323
// no snapshot? should never happen, but just in case.
1324
delete this.last_seq;
1325
await this.set_syncstring_table({
1326
last_seq: undefined,
1327
});
1328
}
1329
}
1330
}
1331
} else if (this.useConat && query.syncstrings) {
1332
synctable = await this.client.synctable_conat(query, {
1333
obj: {
1334
project_id: this.project_id,
1335
path: this.path,
1336
},
1337
stream: false,
1338
atomic: false,
1339
immutable: true,
1340
desc: { path: this.path },
1341
ephemeral,
1342
});
1343
} else if (this.useConat && query.ipywidgets) {
1344
synctable = await this.client.synctable_conat(query, {
1345
obj: {
1346
project_id: this.project_id,
1347
path: this.path,
1348
},
1349
stream: false,
1350
atomic: true,
1351
immutable: true,
1352
// for now just putting a 1-day limit on the ipywidgets table
1353
// so we don't waste a ton of space.
1354
config: { max_age: 1000 * 60 * 60 * 24 },
1355
desc: { path: this.path },
1356
ephemeral: true, // ipywidgets state always ephemeral
1357
});
1358
} else if (this.useConat && (query.eval_inputs || query.eval_outputs)) {
1359
synctable = await this.client.synctable_conat(query, {
1360
obj: {
1361
project_id: this.project_id,
1362
path: this.path,
1363
},
1364
stream: false,
1365
atomic: true,
1366
immutable: true,
1367
config: { max_age: 5 * 60 * 1000 },
1368
desc: { path: this.path },
1369
ephemeral: true, // eval state (for sagews) is always ephemeral
1370
});
1371
} else if (this.useConat) {
1372
synctable = await this.client.synctable_conat(query, {
1373
obj: {
1374
project_id: this.project_id,
1375
path: this.path,
1376
},
1377
stream: false,
1378
atomic: true,
1379
immutable: true,
1380
desc: { path: this.path },
1381
ephemeral,
1382
});
1383
} else {
1384
// only used for unit tests and the ephemeral messaging composer
1385
if (this.client.synctable_ephemeral == null) {
1386
throw Error(`client does not support sync properly`);
1387
}
1388
synctable = await this.client.synctable_ephemeral(
1389
this.project_id,
1390
query,
1391
options,
1392
throttle_changes,
1393
);
1394
}
1395
// We listen and log error events. This is useful because in some settings, e.g.,
1396
// in the project, an eventemitter with no listener for errors, which has an error,
1397
// will crash the entire process.
1398
synctable.on("error", (error) => dbg("ERROR", error));
1399
return synctable;
1400
};
1401
1402
private init_syncstring_table = async (): Promise<void> => {
1403
const query = {
1404
syncstrings: [
1405
{
1406
string_id: this.string_id,
1407
project_id: this.project_id,
1408
path: this.path,
1409
users: null,
1410
last_snapshot: null,
1411
last_seq: null,
1412
snapshot_interval: null,
1413
save: null,
1414
last_active: null,
1415
init: null,
1416
read_only: null,
1417
last_file_change: null,
1418
doctype: null,
1419
archived: null,
1420
settings: null,
1421
},
1422
],
1423
};
1424
const dbg = this.dbg("init_syncstring_table");
1425
1426
dbg("getting table...");
1427
this.syncstring_table = await this.synctable(query, []);
1428
if (this.ephemeral && this.client.is_project()) {
1429
await this.set_syncstring_table({
1430
doctype: JSON.stringify(this.doctype),
1431
});
1432
} else {
1433
dbg("handling the first update...");
1434
this.handle_syncstring_update();
1435
}
1436
this.syncstring_table.on("change", this.handle_syncstring_update);
1437
};
1438
1439
// Used for internal debug logging
1440
private dbg = (_f: string = ""): Function => {
1441
if (DEBUG) {
1442
return (...args) => {
1443
logger.debug(this.path, _f, ...args);
1444
};
1445
} else {
1446
return (..._args) => {};
1447
}
1448
};
1449
1450
private initAll = async (): Promise<void> => {
1451
if (this.state !== "init") {
1452
throw Error("connect can only be called in init state");
1453
}
1454
const log = this.dbg("initAll");
1455
1456
log("update interest");
1457
this.initInterestLoop();
1458
1459
log("ensure syncstring exists in database (if not using NATS)");
1460
this.assert_not_closed("initAll -- before ensuring syncstring exists");
1461
await this.ensure_syncstring_exists_in_db();
1462
1463
await this.init_syncstring_table();
1464
this.assert_not_closed("initAll -- successful init_syncstring_table");
1465
1466
log("patch_list, cursors, evaluator, ipywidgets");
1467
this.assert_not_closed(
1468
"initAll -- before init patch_list, cursors, evaluator, ipywidgets",
1469
);
1470
if (PARALLEL_INIT) {
1471
await Promise.all([
1472
this.init_patch_list(),
1473
this.init_cursors(),
1474
this.init_evaluator(),
1475
this.init_ipywidgets(),
1476
]);
1477
this.assert_not_closed(
1478
"initAll -- successful init patch_list, cursors, evaluator, and ipywidgets",
1479
);
1480
} else {
1481
await this.init_patch_list();
1482
this.assert_not_closed("initAll -- successful init_patch_list");
1483
await this.init_cursors();
1484
this.assert_not_closed("initAll -- successful init_patch_cursors");
1485
await this.init_evaluator();
1486
this.assert_not_closed("initAll -- successful init_evaluator");
1487
await this.init_ipywidgets();
1488
this.assert_not_closed("initAll -- successful init_ipywidgets");
1489
}
1490
1491
this.init_table_close_handlers();
1492
this.assert_not_closed("initAll -- successful init_table_close_handlers");
1493
1494
log("file_use_interval");
1495
this.init_file_use_interval();
1496
1497
if (await this.isFileServer()) {
1498
log("load_from_disk");
1499
// This sets initialized, which is needed to be fully ready.
1500
// We keep trying this load from disk until sync-doc is closed
1501
// or it succeeds. It may fail if, e.g., the file is too
1502
// large or is not readable by the user. They are informed to
1503
// fix the problem... and once they do (and wait up to 10s),
1504
// this will finish.
1505
// if (!this.client.is_browser() && !this.client.is_project()) {
1506
// // FAKE DELAY!!! Just to simulate flakiness / slow network!!!!
1507
// await delay(3000);
1508
// }
1509
await retry_until_success({
1510
f: this.init_load_from_disk,
1511
max_delay: 10000,
1512
desc: "syncdoc -- load_from_disk",
1513
});
1514
log("done loading from disk");
1515
} else {
1516
if (this.patch_list!.count() == 0) {
1517
await Promise.race([
1518
this.waitUntilFullyReady(),
1519
once(this.patch_list!, "change"),
1520
]);
1521
}
1522
}
1523
this.assert_not_closed("initAll -- load from disk");
1524
this.emit("init");
1525
1526
this.assert_not_closed("initAll -- after waiting until fully ready");
1527
1528
if (await this.isFileServer()) {
1529
log("init file autosave");
1530
this.init_file_autosave();
1531
}
1532
this.update_has_unsaved_changes();
1533
log("done");
1534
};
1535
1536
private init_error = (): string | undefined => {
1537
let x;
1538
try {
1539
x = this.syncstring_table.get_one();
1540
} catch (_err) {
1541
// if the table hasn't been initialized yet,
1542
// it can't be in error state.
1543
return undefined;
1544
}
1545
return x?.get("init")?.get("error");
1546
};
1547
1548
// wait until the syncstring table is ready to be
1549
// used (so extracted from archive, etc.),
1550
private waitUntilFullyReady = async (): Promise<void> => {
1551
this.assert_not_closed("wait_until_fully_ready");
1552
const dbg = this.dbg("wait_until_fully_ready");
1553
dbg();
1554
1555
if (this.client.is_browser() && this.init_error()) {
1556
// init is set and is in error state. Give the backend a few seconds
1557
// to try to fix this error before giving up. The browser client
1558
// can close and open the file to retry this (as instructed).
1559
try {
1560
await this.syncstring_table.wait(() => !this.init_error(), 5);
1561
} catch (err) {
1562
// fine -- let the code below deal with this problem...
1563
}
1564
}
1565
1566
let init;
1567
const is_init = (t: SyncTable) => {
1568
this.assert_not_closed("is_init");
1569
const tbl = t.get_one();
1570
if (tbl == null) {
1571
dbg("null");
1572
return false;
1573
}
1574
init = tbl.get("init")?.toJS();
1575
return init != null;
1576
};
1577
dbg("waiting for init...");
1578
await this.syncstring_table.wait(is_init, 0);
1579
dbg("init done");
1580
if (init.error) {
1581
throw Error(init.error);
1582
}
1583
assertDefined(this.patch_list);
1584
if (init.size == null) {
1585
// don't crash but warn at least.
1586
console.warn("SYNC BUG -- init.size must be defined", { init });
1587
}
1588
if (
1589
!this.client.is_project() &&
1590
this.patch_list.count() === 0 &&
1591
init.size
1592
) {
1593
dbg("waiting for patches for nontrivial file");
1594
// normally this only happens in a later event loop,
1595
// so force it now.
1596
dbg("handling patch update queue since", this.patch_list.count());
1597
await this.handle_patch_update_queue();
1598
assertDefined(this.patch_list);
1599
dbg("done handling, now ", this.patch_list.count());
1600
if (this.patch_list.count() === 0) {
1601
// wait for a change -- i.e., project loading the file from
1602
// disk and making available... Because init.size > 0, we know that
1603
// there must be SOMETHING in the patches table once initialization is done.
1604
// This is the root cause of https://github.com/sagemathinc/cocalc/issues/2382
1605
await once(this.patches_table, "change");
1606
dbg("got patches_table change");
1607
await this.handle_patch_update_queue();
1608
dbg("handled update queue");
1609
}
1610
}
1611
};
1612
1613
private assert_table_is_ready = (table: string): void => {
1614
const t = this[table + "_table"]; // not using string template only because it breaks codemirror!
1615
if (t == null || t.get_state() != "connected") {
1616
throw Error(
1617
`Table ${table} must be connected. string_id=${this.string_id}`,
1618
);
1619
}
1620
};
1621
1622
assert_is_ready = (desc: string): void => {
1623
if (this.state != "ready") {
1624
throw Error(`must be ready -- ${desc}`);
1625
}
1626
};
1627
1628
wait_until_ready = async (): Promise<void> => {
1629
this.assert_not_closed("wait_until_ready");
1630
if (this.state !== ("ready" as State)) {
1631
// wait for a state change to ready.
1632
await once(this, "ready");
1633
}
1634
};
1635
1636
/* Calls wait for the corresponding patches SyncTable, if
1637
it has been defined. If it hasn't been defined, it waits
1638
until it is defined, then calls wait. Timeout only starts
1639
when patches_table is already initialized.
1640
*/
1641
wait = async (until: Function, timeout: number = 30): Promise<any> => {
1642
await this.wait_until_ready();
1643
//console.trace("SYNC WAIT -- start...");
1644
const result = await wait({
1645
obj: this,
1646
until,
1647
timeout,
1648
change_event: "change",
1649
});
1650
//console.trace("SYNC WAIT -- got it!");
1651
return result;
1652
};
1653
1654
/* Delete the synchronized string and **all** patches from the database
1655
-- basically delete the complete history of editing this file.
1656
WARNINGS:
1657
(1) If a project has this string open, then things may be messed
1658
up, unless that project is restarted.
1659
(2) Only available for an **admin** user right now!
1660
1661
To use: from a javascript console in the browser as admin, do:
1662
1663
await smc.client.sync_string({
1664
project_id:'9f2e5869-54b8-4890-8828-9aeba9a64af4',
1665
path:'a.txt'}).delete_from_database()
1666
1667
Then make sure project and clients refresh.
1668
1669
WORRY: Race condition where constructor might write stuff as
1670
it is being deleted?
1671
*/
1672
delete_from_database = async (): Promise<void> => {
1673
const queries: object[] = this.ephemeral
1674
? []
1675
: [
1676
{
1677
patches_delete: {
1678
id: [this.string_id],
1679
dummy: null,
1680
},
1681
},
1682
];
1683
queries.push({
1684
syncstrings_delete: {
1685
project_id: this.project_id,
1686
path: this.path,
1687
},
1688
});
1689
1690
const v: Promise<any>[] = [];
1691
for (let i = 0; i < queries.length; i++) {
1692
v.push(callback2(this.client.query, { query: queries[i] }));
1693
}
1694
await Promise.all(v);
1695
};
1696
1697
private pathExistsAndIsReadOnly = async (path): Promise<boolean> => {
1698
try {
1699
await callback2(this.client.path_access, {
1700
path,
1701
mode: "w",
1702
});
1703
// clearly exists and is NOT read only:
1704
return false;
1705
} catch (err) {
1706
// either it doesn't exist or it is read only
1707
if (await callback2(this.client.path_exists, { path })) {
1708
// it exists, so is read only and exists
1709
return true;
1710
}
1711
// doesn't exist
1712
return false;
1713
}
1714
};
1715
1716
private file_is_read_only = async (): Promise<boolean> => {
1717
if (await this.pathExistsAndIsReadOnly(this.path)) {
1718
return true;
1719
}
1720
const path = this.getFileServerPath();
1721
if (path != this.path) {
1722
if (await this.pathExistsAndIsReadOnly(path)) {
1723
return true;
1724
}
1725
}
1726
return false;
1727
};
1728
1729
private update_if_file_is_read_only = async (): Promise<void> => {
1730
const read_only = await this.file_is_read_only();
1731
if (this.state == "closed") {
1732
return;
1733
}
1734
this.set_read_only(read_only);
1735
};
1736
1737
private init_load_from_disk = async (): Promise<void> => {
1738
if (this.state == "closed") {
1739
// stop trying, no error -- this is assumed
1740
// in a retry_until_success elsewhere.
1741
return;
1742
}
1743
if (await this.load_from_disk_if_newer()) {
1744
throw Error("failed to load from disk");
1745
}
1746
};
1747
1748
private load_from_disk_if_newer = async (): Promise<boolean> => {
1749
const last_changed = new Date(this.last_changed());
1750
const firstLoad = this.versions().length == 0;
1751
const dbg = this.dbg("load_from_disk_if_newer");
1752
let is_read_only: boolean = false;
1753
let size: number = 0;
1754
let error: string = "";
1755
try {
1756
dbg("check if path exists");
1757
if (await callback2(this.client.path_exists, { path: this.path })) {
1758
// the path exists
1759
dbg("path exists -- stat file");
1760
const stats = await callback2(this.client.path_stat, {
1761
path: this.path,
1762
});
1763
if (firstLoad || stats.ctime > last_changed) {
1764
dbg(
1765
`disk file changed more recently than edits (or first load), so loading, ${stats.ctime} > ${last_changed}; firstLoad=${firstLoad}`,
1766
);
1767
size = await this.load_from_disk();
1768
if (firstLoad) {
1769
dbg("emitting first-load event");
1770
// this event is emited the first time the document is ever loaded from disk.
1771
this.emit("first-load");
1772
}
1773
dbg("loaded");
1774
} else {
1775
dbg("stick with database version");
1776
}
1777
dbg("checking if read only");
1778
is_read_only = await this.file_is_read_only();
1779
dbg("read_only", is_read_only);
1780
}
1781
} catch (err) {
1782
error = `${err}`;
1783
}
1784
1785
await this.set_initialized(error, is_read_only, size);
1786
dbg("done");
1787
return !!error;
1788
};
1789
1790
private patch_table_query = (cutoff?: number) => {
1791
const query = {
1792
string_id: this.string_id,
1793
is_snapshot: false, // only used with conat
1794
time: cutoff ? { ">=": cutoff } : null,
1795
wall: null,
1796
// compressed format patch as a JSON *string*
1797
patch: null,
1798
// integer id of user (maps to syncstring table)
1799
user_id: null,
1800
// (optional) a snapshot at this point in time
1801
snapshot: null,
1802
// info about sequence number, count, etc. of this snapshot
1803
seq_info: null,
1804
parents: null,
1805
version: null,
1806
};
1807
if (this.doctype.patch_format != null) {
1808
(query as any).format = this.doctype.patch_format;
1809
}
1810
return query;
1811
};
1812
1813
private setLastSnapshot(last_snapshot?: number) {
1814
// only set last_snapshot here, so we can keep it in sync with patch_list.last_snapshot
1815
// and also be certain about the data type (being number or undefined).
1816
if (last_snapshot !== undefined && typeof last_snapshot != "number") {
1817
throw Error("type of last_snapshot must be number or undefined");
1818
}
1819
this.last_snapshot = last_snapshot;
1820
}
1821
1822
private init_patch_list = async (): Promise<void> => {
1823
this.assert_not_closed("init_patch_list - start");
1824
const dbg = this.dbg("init_patch_list");
1825
dbg();
1826
1827
// CRITICAL: note that handle_syncstring_update checks whether
1828
// init_patch_list is done by testing whether this.patch_list is defined!
1829
// That is why we first define "patch_list" below, then set this.patch_list
1830
// to it only after we're done.
1831
delete this.patch_list;
1832
1833
const patch_list = new SortedPatchList({
1834
from_str: this._from_str,
1835
});
1836
1837
dbg("opening the table...");
1838
const query = { patches: [this.patch_table_query(this.last_snapshot)] };
1839
this.patches_table = await this.synctable(query, [], this.patch_interval);
1840
this.assert_not_closed("init_patch_list -- after making synctable");
1841
1842
const update_has_unsaved_changes = debounce(
1843
this.update_has_unsaved_changes,
1844
500,
1845
{ leading: true, trailing: true },
1846
);
1847
1848
this.patches_table.on("has-uncommitted-changes", (val) => {
1849
this.emit("has-uncommitted-changes", val);
1850
});
1851
1852
this.on("change", () => {
1853
update_has_unsaved_changes();
1854
});
1855
1856
this.syncstring_table.on("change", () => {
1857
update_has_unsaved_changes();
1858
});
1859
1860
dbg("adding all known patches");
1861
patch_list.add(this.get_patches());
1862
1863
dbg("possibly kick off loading more history");
1864
let last_start_seq: null | number = null;
1865
while (patch_list.needsMoreHistory()) {
1866
// @ts-ignore
1867
const dstream = this.patches_table.dstream;
1868
if (dstream == null) {
1869
break;
1870
}
1871
const snap = patch_list.getOldestSnapshot();
1872
if (snap == null) {
1873
break;
1874
}
1875
const seq_info = snap.seq_info ?? {
1876
prev_seq: 1,
1877
};
1878
const start_seq = seq_info.prev_seq ?? 1;
1879
if (last_start_seq != null && start_seq >= last_start_seq) {
1880
// no progress, e.g., corruption would cause this.
1881
// "corruption" is EXPECTED, since a user might be submitting
1882
// patches after being offline, and get disconnected halfway through.
1883
break;
1884
}
1885
last_start_seq = start_seq;
1886
await dstream.load({ start_seq });
1887
dbg("load more history");
1888
patch_list.add(this.get_patches());
1889
if (start_seq <= 1) {
1890
// loaded everything
1891
break;
1892
}
1893
}
1894
1895
//this.patches_table.on("saved", this.handle_offline);
1896
this.patch_list = patch_list;
1897
1898
let doc;
1899
try {
1900
doc = patch_list.value();
1901
} catch (err) {
1902
console.warn("error getting doc", err);
1903
doc = this._from_str("");
1904
}
1905
this.last = this.doc = doc;
1906
this.patches_table.on("change", this.handle_patch_update);
1907
1908
dbg("done");
1909
};
1910
1911
private init_evaluator = async () => {
1912
const dbg = this.dbg("init_evaluator");
1913
const ext = filename_extension(this.path);
1914
if (ext !== "sagews") {
1915
dbg("done -- only use init_evaluator for sagews");
1916
return;
1917
}
1918
dbg("creating the evaluator and waiting for init");
1919
this.evaluator = new Evaluator(this, this.client, this.synctable);
1920
await this.evaluator.init();
1921
dbg("done");
1922
};
1923
1924
private init_ipywidgets = async () => {
1925
const dbg = this.dbg("init_evaluator");
1926
const ext = filename_extension(this.path);
1927
if (ext != JUPYTER_SYNCDB_EXTENSIONS) {
1928
dbg("done -- only use ipywidgets for jupyter");
1929
return;
1930
}
1931
dbg("creating the ipywidgets state table, and waiting for init");
1932
this.ipywidgets_state = new IpywidgetsState(
1933
this,
1934
this.client,
1935
this.synctable,
1936
);
1937
await this.ipywidgets_state.init();
1938
dbg("done");
1939
};
1940
1941
private init_cursors = async () => {
1942
const dbg = this.dbg("init_cursors");
1943
if (!this.cursors) {
1944
dbg("done -- do not care about cursors for this syncdoc.");
1945
return;
1946
}
1947
if (this.useConat) {
1948
dbg("cursors broadcast using pub/sub");
1949
this.cursors_table = await this.client.pubsub_conat({
1950
project_id: this.project_id,
1951
path: this.path,
1952
name: "cursors",
1953
});
1954
this.cursors_table.on(
1955
"change",
1956
(obj: { user_id: number; locs: any; time: number }) => {
1957
const account_id = this.users[obj.user_id];
1958
if (!account_id) {
1959
return;
1960
}
1961
if (obj.locs == null && !this.cursor_map.has(account_id)) {
1962
// gone, and already gone.
1963
return;
1964
}
1965
if (obj.locs != null) {
1966
// changed
1967
this.cursor_map = this.cursor_map.set(account_id, fromJS(obj));
1968
} else {
1969
// deleted
1970
this.cursor_map = this.cursor_map.delete(account_id);
1971
}
1972
this.emit("cursor_activity", account_id);
1973
},
1974
);
1975
return;
1976
}
1977
1978
dbg("getting cursors ephemeral table");
1979
const query = {
1980
cursors: [
1981
{
1982
string_id: this.string_id,
1983
user_id: null,
1984
locs: null,
1985
time: null,
1986
},
1987
],
1988
};
1989
// We make cursors an ephemeral table, since there is no
1990
// need to persist it to the database, obviously!
1991
// Also, queue_size:1 makes it so only the last cursor position is
1992
// saved, e.g., in case of disconnect and reconnect.
1993
const options = [{ ephemeral: true }, { queue_size: 1 }]; // probably deprecated
1994
this.cursors_table = await this.synctable(query, options, 1000);
1995
this.assert_not_closed("init_cursors -- after making synctable");
1996
1997
// cursors now initialized; first initialize the
1998
// local this._cursor_map, which tracks positions
1999
// of cursors by account_id:
2000
dbg("loading initial state");
2001
const s = this.cursors_table.get();
2002
if (s == null) {
2003
throw Error("bug -- get should not return null once table initialized");
2004
}
2005
s.forEach((locs: any, k: string) => {
2006
if (locs == null) {
2007
return;
2008
}
2009
const u = JSON.parse(k);
2010
if (u != null) {
2011
this.cursor_map = this.cursor_map.set(this.users[u[1]], locs);
2012
}
2013
});
2014
this.cursors_table.on("change", this.handle_cursors_change);
2015
2016
dbg("done");
2017
};
2018
2019
private handle_cursors_change = (keys) => {
2020
if (this.state === "closed") {
2021
return;
2022
}
2023
for (const k of keys) {
2024
const u = JSON.parse(k);
2025
if (u == null) {
2026
continue;
2027
}
2028
const account_id = this.users[u[1]];
2029
if (!account_id) {
2030
// this happens for ephemeral table when project restarts and browser
2031
// has data it is trying to send.
2032
continue;
2033
}
2034
const locs = this.cursors_table.get(k);
2035
if (locs == null && !this.cursor_map.has(account_id)) {
2036
// gone, and already gone.
2037
continue;
2038
}
2039
if (locs != null) {
2040
// changed
2041
this.cursor_map = this.cursor_map.set(account_id, locs);
2042
} else {
2043
// deleted
2044
this.cursor_map = this.cursor_map.delete(account_id);
2045
}
2046
this.emit("cursor_activity", account_id);
2047
}
2048
};
2049
2050
/* Returns *immutable* Map from account_id to list
2051
of cursor positions, if cursors are enabled.
2052
2053
- excludeSelf: do not include our own cursor
2054
- maxAge: only include cursors that have been updated with maxAge ms from now.
2055
*/
2056
get_cursors = ({
2057
maxAge = 60 * 1000,
2058
// excludeSelf:
2059
// 'always' -- *always* exclude self
2060
// 'never' -- never exclude self
2061
// 'heuristic' -- exclude self is older than last set from here, e.g., useful on
2062
// frontend so we don't see our own cursor unless more than one browser.
2063
excludeSelf = "always",
2064
}: {
2065
maxAge?: number;
2066
excludeSelf?: "always" | "never" | "heuristic";
2067
} = {}): Map<string, any> => {
2068
this.assert_not_closed("get_cursors");
2069
if (!this.cursors) {
2070
throw Error("cursors are not enabled");
2071
}
2072
if (this.cursors_table == null) {
2073
return Map(); // not loaded yet -- so no info yet.
2074
}
2075
const account_id: string = this.client_id();
2076
let map = this.cursor_map;
2077
if (map.has(account_id) && excludeSelf != "never") {
2078
if (
2079
excludeSelf == "always" ||
2080
(excludeSelf == "heuristic" &&
2081
this.cursor_last_time >=
2082
new Date(map.getIn([account_id, "time"], 0) as number))
2083
) {
2084
map = map.delete(account_id);
2085
}
2086
}
2087
// Remove any old cursors, where "old" is by default more than maxAge old.
2088
const now = Date.now();
2089
for (const [client_id, value] of map as any) {
2090
const time = value.get("time");
2091
if (time == null) {
2092
// this should always be set.
2093
map = map.delete(client_id);
2094
continue;
2095
}
2096
if (maxAge) {
2097
// we use abs to implicitly exclude a bad value that is somehow in the future,
2098
// if that were to happen.
2099
if (Math.abs(now - time.valueOf()) >= maxAge) {
2100
map = map.delete(client_id);
2101
continue;
2102
}
2103
}
2104
if (time >= now + 10 * 1000) {
2105
// We *always* delete any cursors more than 10 seconds in the future, since
2106
// that can only happen if a client inserts invalid data (e.g., clock not
2107
// yet synchronized). See https://github.com/sagemathinc/cocalc/issues/7969
2108
map = map.delete(client_id);
2109
continue;
2110
}
2111
}
2112
return map;
2113
};
2114
2115
/* Set settings map. Used for custom configuration just for
2116
this one file, e.g., overloading the spell checker language.
2117
*/
2118
set_settings = async (obj): Promise<void> => {
2119
this.assert_is_ready("set_settings");
2120
await this.set_syncstring_table({
2121
settings: obj,
2122
});
2123
};
2124
2125
client_id = () => {
2126
return this.client.client_id();
2127
};
2128
2129
// get settings object
2130
get_settings = (): Map<string, any> => {
2131
this.assert_is_ready("get_settings");
2132
return this.syncstring_table_get_one().get("settings", Map());
2133
};
2134
2135
/*
2136
Commits and saves current live syncdoc to backend.
2137
2138
Function only returns when there is nothing needing
2139
saving.
2140
2141
Save any changes we have as a new patch.
2142
*/
2143
save = reuseInFlight(async () => {
2144
const dbg = this.dbg("save");
2145
dbg();
2146
// We just keep trying while syncdoc is ready and there
2147
// are changes that have not been saved (due to this.doc
2148
// changing during the while loop!).
2149
if (this.doc == null || this.last == null || this.state == "closed") {
2150
// EXPECTED: this happens after document is closed
2151
// There's nothing to do regarding save if the table is
2152
// already closed. Note that we *do* have to save when
2153
// the table is init stage, since the project has to
2154
// record the newly opened version of the file to the
2155
// database! See
2156
// https://github.com/sagemathinc/cocalc/issues/4986
2157
return;
2158
}
2159
if (this.client?.is_deleted(this.path, this.project_id)) {
2160
dbg("not saving because deleted");
2161
return;
2162
}
2163
// Compute any patches.
2164
while (!this.doc.is_equal(this.last)) {
2165
dbg("something to save");
2166
this.emit("user-change");
2167
const doc = this.doc;
2168
// TODO: put in a delay if just saved too recently?
2169
// Or maybe won't matter since not using database?
2170
if (this.handle_patch_update_queue_running) {
2171
dbg("wait until the update queue is done");
2172
await once(this, "handle_patch_update_queue_done");
2173
// but wait until next loop (so as to check that needed
2174
// and state still ready).
2175
continue;
2176
}
2177
dbg("Compute new patch.");
2178
this.sync_remote_and_doc(false);
2179
// Emit event since this syncstring was
2180
// changed locally (or we wouldn't have had
2181
// to save at all).
2182
if (doc.is_equal(this.doc)) {
2183
dbg("no change during loop -- done!");
2184
break;
2185
}
2186
}
2187
if (this.state != "ready") {
2188
// above async waits could have resulted in state change.
2189
return;
2190
}
2191
await this.handle_patch_update_queue();
2192
if (this.state != "ready") {
2193
return;
2194
}
2195
2196
// Ensure all patches are saved to backend.
2197
// We do this after the above, so that creating the newest patch
2198
// happens immediately on save, which makes it possible for clients
2199
// to save current state without having to wait on an async, which is
2200
// useful to ensure specific undo points (e.g., right before a paste).
2201
await this.patches_table.save();
2202
});
2203
2204
private timeOfLastCommit: number | undefined = undefined;
2205
private next_patch_time = (): number => {
2206
let time = this.client.server_time().valueOf();
2207
if (time == this.timeOfLastCommit) {
2208
time = this.timeOfLastCommit + 1;
2209
}
2210
assertDefined(this.patch_list);
2211
time = this.patch_list.next_available_time(
2212
time,
2213
this.my_user_id,
2214
this.users.length,
2215
);
2216
return time;
2217
};
2218
2219
private commit_patch = (time: number, patch: XPatch): void => {
2220
this.timeOfLastCommit = time;
2221
this.assert_not_closed("commit_patch");
2222
assertDefined(this.patch_list);
2223
const obj: any = {
2224
// version for database
2225
string_id: this.string_id,
2226
// logical time -- usually the sync'd walltime, but
2227
// guaranteed to be increasing.
2228
time,
2229
// what we show user
2230
wall: this.client.server_time().valueOf(),
2231
patch: JSON.stringify(patch),
2232
user_id: this.my_user_id,
2233
is_snapshot: false,
2234
parents: this.patch_list.getHeads(),
2235
version: this.patch_list.lastVersion() + 1,
2236
};
2237
2238
this.my_patches[time.valueOf()] = obj;
2239
2240
if (this.doctype.patch_format != null) {
2241
obj.format = this.doctype.patch_format;
2242
}
2243
2244
// If in undo mode put the just-created patch in our
2245
// without timestamp list, so it won't be included
2246
// when doing undo/redo.
2247
if (this.undo_state != null) {
2248
this.undo_state.without.unshift(time);
2249
}
2250
2251
//console.log 'saving patch with time ', time.valueOf()
2252
let x = this.patches_table.set(obj, "none");
2253
if (x == null) {
2254
// TODO: just for NATS right now!
2255
x = fromJS(obj);
2256
}
2257
const y = this.processPatch({ x, patch, size: obj.patch.size });
2258
this.patch_list.add([y]);
2259
// Since *we* just made a definite change to the document, we're
2260
// active, so we check if we should make a snapshot. There is the
2261
// potential of a race condition where more than one clients make
2262
// a snapshot at the same time -- this would waste a little space
2263
// in the stream, but is otherwise harmless, since the snapshots
2264
// are identical.
2265
this.snapshot_if_necessary();
2266
};
2267
2268
private dstream = () => {
2269
// @ts-ignore -- in general patches_table might not be a conat one still,
2270
// or at least dstream is an internal implementation detail.
2271
const { dstream } = this.patches_table ?? {};
2272
if (dstream == null) {
2273
throw Error("dstream must be defined");
2274
}
2275
return dstream;
2276
};
2277
2278
// return the conat-assigned sequence number of the oldest entry in the
2279
// patch list with the given time, and also:
2280
// - prev_seq -- the sequence number of previous patch before that, for use in "load more"
2281
// - index -- the global index of the entry with the given time.
2282
private conatSnapshotSeqInfo = (
2283
time: number,
2284
): { seq: number; prev_seq?: number } => {
2285
const dstream = this.dstream();
2286
// seq = actual sequence number of the message with the patch that we're
2287
// snapshotting at -- i.e., at time
2288
let seq: number | undefined = undefined;
2289
// prev_seq = sequence number of patch of *previous* snapshot, if there is a previous one.
2290
// This is needed for incremental loading of more history.
2291
let prev_seq: number | undefined;
2292
let i = 0;
2293
for (const mesg of dstream.getAll()) {
2294
if (mesg.is_snapshot && mesg.time < time) {
2295
// the seq field of this message has the actual sequence number of the patch
2296
// that was snapshotted, along with the index of that patch.
2297
prev_seq = mesg.seq_info.seq;
2298
}
2299
if (seq === undefined && mesg.time == time) {
2300
seq = dstream.seq(i);
2301
}
2302
i += 1;
2303
}
2304
if (seq == null) {
2305
throw Error(
2306
`unable to find message with time '${time}'=${new Date(time)}`,
2307
);
2308
}
2309
return { seq, prev_seq };
2310
};
2311
2312
/* Create and store in the database a snapshot of the state
2313
of the string at the given point in time. This should
2314
be the time of an existing patch.
2315
2316
The point of a snapshot is that if you load all patches recorded
2317
>= this point in time, then you don't need any earlier ones to
2318
reconstruct the document, since otherwise, why have the snapshot at
2319
all, as it does not good. Due to potentially long offline users
2320
putting old data into history, this can fail. However, in the usual
2321
case we should never record a snapshot with this bad property.
2322
*/
2323
private snapshot = reuseInFlight(async (time: number): Promise<void> => {
2324
assertDefined(this.patch_list);
2325
const x = this.patch_list.patch(time);
2326
if (x == null) {
2327
throw Error(`no patch at time ${time}`);
2328
}
2329
if (x.snapshot != null) {
2330
// there is already a snapshot at this point in time,
2331
// so nothing further to do.
2332
return;
2333
}
2334
2335
const snapshot: string = this.patch_list.value({ time }).to_str();
2336
// save the snapshot itself in the patches table.
2337
const seq_info = this.conatSnapshotSeqInfo(time);
2338
const obj = {
2339
size: snapshot.length,
2340
string_id: this.string_id,
2341
time,
2342
wall: time,
2343
is_snapshot: true,
2344
snapshot,
2345
user_id: x.user_id,
2346
seq_info,
2347
};
2348
// also set snapshot in the this.patch_list, which which saves a little time.
2349
// and ensures that "(x.snapshot != null)" above works if snapshot is called again.
2350
this.patch_list.add([obj]);
2351
this.patches_table.set(obj);
2352
await this.patches_table.save();
2353
if (this.state != "ready") {
2354
return;
2355
}
2356
2357
const last_seq = seq_info.seq;
2358
await this.set_syncstring_table({
2359
last_snapshot: time,
2360
last_seq,
2361
});
2362
this.setLastSnapshot(time);
2363
this.last_seq = last_seq;
2364
});
2365
2366
// Have a snapshot every this.snapshot_interval patches, except
2367
// for the very last interval.
2368
private snapshot_if_necessary = async (): Promise<void> => {
2369
if (this.get_state() !== "ready") return;
2370
const dbg = this.dbg("snapshot_if_necessary");
2371
const max_size = Math.floor(1.2 * MAX_FILE_SIZE_MB * 1000000);
2372
const interval = this.snapshot_interval;
2373
dbg("check if we need to make a snapshot:", { interval, max_size });
2374
assertDefined(this.patch_list);
2375
const time = this.patch_list.time_of_unmade_periodic_snapshot(
2376
interval,
2377
max_size,
2378
);
2379
if (time != null) {
2380
dbg("yes, try to make a snapshot at time", time);
2381
try {
2382
await this.snapshot(time);
2383
} catch (err) {
2384
// this is expected to happen sometimes, e.g., when sufficient information
2385
// isn't known about the stream of patches.
2386
console.log(
2387
`(expected) WARNING: client temporarily unable to make a snapshot of ${this.path} -- ${err}`,
2388
);
2389
}
2390
} else {
2391
dbg("no need to make a snapshot yet");
2392
}
2393
};
2394
2395
/*- x - patch object
2396
- patch: if given will be used as an actual patch
2397
instead of x.patch, which is a JSON string.
2398
*/
2399
private processPatch = ({
2400
x,
2401
patch,
2402
size: size0,
2403
}: {
2404
x: Map<string, any>;
2405
patch?: any;
2406
size?: number;
2407
}): Patch => {
2408
let t = x.get("time");
2409
if (typeof t != "number") {
2410
// backwards compat
2411
t = new Date(t).valueOf();
2412
}
2413
const time: number = t;
2414
const wall = x.get("wall") ?? time;
2415
const user_id: number = x.get("user_id");
2416
let parents: number[] = x.get("parents")?.toJS() ?? [];
2417
let size: number;
2418
const is_snapshot = x.get("is_snapshot");
2419
if (is_snapshot) {
2420
size = x.get("snapshot")?.length ?? 0;
2421
} else {
2422
if (patch == null) {
2423
/* Do **NOT** use misc.from_json, since we definitely
2424
do not want to unpack ISO timestamps as Date,
2425
since patch just contains the raw patches from
2426
user editing. This was done for a while, which
2427
led to horrific bugs in some edge cases...
2428
See https://github.com/sagemathinc/cocalc/issues/1771
2429
*/
2430
if (x.has("patch")) {
2431
const p: string = x.get("patch");
2432
patch = JSON.parse(p);
2433
size = p.length;
2434
} else {
2435
patch = [];
2436
size = 2;
2437
}
2438
} else {
2439
const p = x.get("patch");
2440
size = p?.length ?? size0 ?? JSON.stringify(patch).length;
2441
}
2442
}
2443
2444
const obj: Patch = {
2445
time,
2446
wall,
2447
user_id,
2448
patch,
2449
size,
2450
is_snapshot,
2451
parents,
2452
version: x.get("version"),
2453
};
2454
if (is_snapshot) {
2455
obj.snapshot = x.get("snapshot"); // this is a string
2456
obj.seq_info = x.get("seq_info")?.toJS();
2457
if (obj.snapshot == null || obj.seq_info == null) {
2458
console.warn("WARNING: message = ", x.toJS());
2459
throw Error(
2460
`message with is_snapshot true must also set snapshot and seq_info fields -- time=${time}`,
2461
);
2462
}
2463
}
2464
return obj;
2465
};
2466
2467
/* Return all patches with time such that
2468
time0 <= time <= time1;
2469
If time0 undefined then sets time0 equal to time of last_snapshot.
2470
If time1 undefined treated as +oo.
2471
*/
2472
private get_patches = (): Patch[] => {
2473
this.assert_table_is_ready("patches");
2474
2475
// m below is an immutable map with keys the string that
2476
// is the JSON version of the primary key
2477
// [string_id, timestamp, user_number].
2478
let m: Map<string, any> | undefined = this.patches_table.get();
2479
if (m == null) {
2480
// won't happen because of assert above.
2481
throw Error("patches_table must be initialized");
2482
}
2483
if (!Map.isMap(m)) {
2484
// TODO: this is just for proof of concept NATS!!
2485
m = fromJS(m);
2486
}
2487
const v: Patch[] = [];
2488
m.forEach((x, _) => {
2489
const p = this.processPatch({ x });
2490
if (p != null) {
2491
return v.push(p);
2492
}
2493
});
2494
v.sort(patch_cmp);
2495
return v;
2496
};
2497
2498
hasFullHistory = (): boolean => {
2499
if (this.patch_list == null) {
2500
return false;
2501
}
2502
return this.patch_list.hasFullHistory();
2503
};
2504
2505
// returns true if there may be additional history to load
2506
// after loading this. return false if definitely done.
2507
loadMoreHistory = async ({
2508
all,
2509
}: {
2510
// if true, loads all history
2511
all?: boolean;
2512
} = {}): Promise<boolean> => {
2513
if (this.hasFullHistory() || this.ephemeral || this.patch_list == null) {
2514
return false;
2515
}
2516
let start_seq;
2517
if (all) {
2518
start_seq = 1;
2519
} else {
2520
const seq_info = this.patch_list.getOldestSnapshot()?.seq_info;
2521
if (seq_info == null) {
2522
// nothing more to load
2523
return false;
2524
}
2525
start_seq = seq_info.prev_seq ?? 1;
2526
}
2527
// Doing this load triggers change events for all the patch info
2528
// that gets loaded.
2529
// TODO: right now we load everything, since the seq_info is wrong
2530
// from the NATS migration. Maybe this is fine since it is very efficient.
2531
// @ts-ignore
2532
await this.patches_table.dstream?.load({ start_seq: 0 });
2533
2534
// Wait until patch update queue is empty
2535
while (this.patch_update_queue.length > 0) {
2536
await once(this, "patch-update-queue-empty");
2537
}
2538
return start_seq > 1;
2539
};
2540
2541
legacyHistoryExists = async () => {
2542
const info = await this.legacy.getInfo();
2543
return !!info.uuid;
2544
};
2545
2546
private loadedLegacyHistory = false;
2547
loadLegacyHistory = reuseInFlight(async () => {
2548
if (this.loadedLegacyHistory) {
2549
return;
2550
}
2551
this.loadedLegacyHistory = true;
2552
if (!this.hasFullHistory()) {
2553
throw Error("must first load full history first");
2554
}
2555
const { patches, users } = await this.legacy.getPatches();
2556
if (this.patch_list == null) {
2557
return;
2558
}
2559
// @ts-ignore - cheating here
2560
const first = this.patch_list.patches[0];
2561
if ((first?.parents ?? []).length > 0) {
2562
throw Error("first patch should have no parents");
2563
}
2564
for (const patch of patches) {
2565
// @ts-ignore
2566
patch.time = new Date(patch.time).valueOf();
2567
}
2568
patches.sort(field_cmp("time"));
2569
const v: Patch[] = [];
2570
let version = -patches.length;
2571
let i = 0;
2572
for (const patch of patches) {
2573
// @ts-ignore
2574
patch.version = version;
2575
version += 1;
2576
if (i > 0) {
2577
// @ts-ignore
2578
patch.parents = [patches[i - 1].time];
2579
} else {
2580
// @ts-ignore
2581
patch.parents = [];
2582
}
2583
2584
// remap the user_id field
2585
const account_id = users[patch.user_id];
2586
let user_id = this.users.indexOf(account_id);
2587
if (user_id == -1) {
2588
this.users.push(account_id);
2589
user_id = this.users.length - 1;
2590
}
2591
patch.user_id = user_id;
2592
2593
const p = this.processPatch({ x: fromJS(patch) });
2594
i += 1;
2595
v.push(p);
2596
}
2597
if (first != null) {
2598
// @ts-ignore
2599
first.parents = [patches[patches.length - 1].time];
2600
first.is_snapshot = true;
2601
first.snapshot = this.patch_list.value({ time: first.time }).to_str();
2602
}
2603
this.patch_list.add(v);
2604
this.emit("change");
2605
});
2606
2607
show_history = (opts = {}): void => {
2608
assertDefined(this.patch_list);
2609
this.patch_list.show_history(opts);
2610
};
2611
2612
set_snapshot_interval = async (n: number): Promise<void> => {
2613
await this.set_syncstring_table({
2614
snapshot_interval: n,
2615
});
2616
await this.syncstring_table.save();
2617
};
2618
2619
get_last_save_to_disk_time = (): Date => {
2620
return this.last_save_to_disk_time;
2621
};
2622
2623
private handle_syncstring_save_state = async (
2624
state: string,
2625
time: Date,
2626
): Promise<void> => {
2627
// Called when the save state changes.
2628
2629
/* this.syncstring_save_state is used to make it possible to emit a
2630
'save-to-disk' event, whenever the state changes
2631
to indicate a save completed.
2632
2633
NOTE: it is intentional that this.syncstring_save_state is not defined
2634
the first time this function is called, so that save-to-disk
2635
with last save time gets emitted on initial load (which, e.g., triggers
2636
latex compilation properly in case of a .tex file).
2637
*/
2638
if (state === "done" && this.syncstring_save_state !== "done") {
2639
this.last_save_to_disk_time = time;
2640
this.emit("save-to-disk", time);
2641
}
2642
const dbg = this.dbg("handle_syncstring_save_state");
2643
dbg(
2644
`state='${state}', this.syncstring_save_state='${this.syncstring_save_state}', this.state='${this.state}'`,
2645
);
2646
if (
2647
this.state === "ready" &&
2648
(await this.isFileServer()) &&
2649
this.syncstring_save_state !== "requested" &&
2650
state === "requested"
2651
) {
2652
this.syncstring_save_state = state; // only used in the if above
2653
dbg("requesting save to disk -- calling save_to_disk");
2654
// state just changed to requesting a save to disk...
2655
// so we do it (unless of course syncstring is still
2656
// being initialized).
2657
try {
2658
// Uncomment the following to test simulating a
2659
// random failure in save_to_disk:
2660
// if (Math.random() < 0.5) throw Error("CHAOS MONKEY!"); // FOR TESTING ONLY.
2661
await this.save_to_disk();
2662
} catch (err) {
2663
// CRITICAL: we must unset this.syncstring_save_state (and set the save state);
2664
// otherwise, it stays as "requested" and this if statement would never get
2665
// run again, thus completely breaking saving this doc to disk.
2666
// It is normal behavior that *sometimes* this.save_to_disk might
2667
// throw an exception, e.g., if the file is temporarily deleted
2668
// or save it called before everything is initialized, or file
2669
// is temporarily set readonly, or maybe there is a file system error.
2670
// Of course, the finally below will also take care of this. However,
2671
// it's nice to record the error here.
2672
this.syncstring_save_state = "done";
2673
await this.set_save({ state: "done", error: `${err}` });
2674
dbg(`ERROR saving to disk in handle_syncstring_save_state-- ${err}`);
2675
} finally {
2676
// No matter what, after the above code is run,
2677
// the save state in the table better be "done".
2678
// We triple check that here, though of course
2679
// we believe the logic in save_to_disk and above
2680
// should always accomplish this.
2681
dbg("had to set the state to done in finally block");
2682
if (
2683
this.state === "ready" &&
2684
(this.syncstring_save_state != "done" ||
2685
this.syncstring_table_get_one().getIn(["save", "state"]) != "done")
2686
) {
2687
this.syncstring_save_state = "done";
2688
await this.set_save({ state: "done", error: "" });
2689
}
2690
}
2691
}
2692
};
2693
2694
private handle_syncstring_update = async (): Promise<void> => {
2695
if (this.state === "closed") {
2696
return;
2697
}
2698
const dbg = this.dbg("handle_syncstring_update");
2699
dbg();
2700
2701
const data = this.syncstring_table_get_one();
2702
const x: any = data != null ? data.toJS() : undefined;
2703
2704
if (x != null && x.save != null) {
2705
this.handle_syncstring_save_state(x.save.state, x.save.time);
2706
}
2707
2708
dbg(JSON.stringify(x));
2709
if (x == null || x.users == null) {
2710
dbg("new_document");
2711
await this.handle_syncstring_update_new_document();
2712
} else {
2713
dbg("update_existing");
2714
await this.handle_syncstring_update_existing_document(x, data);
2715
}
2716
};
2717
2718
private handle_syncstring_update_new_document = async (): Promise<void> => {
2719
// Brand new document
2720
this.emit("load-time-estimate", { type: "new", time: 1 });
2721
this.setLastSnapshot();
2722
this.last_seq = undefined;
2723
this.snapshot_interval =
2724
schema.SCHEMA.syncstrings.user_query?.get?.fields.snapshot_interval ??
2725
DEFAULT_SNAPSHOT_INTERVAL;
2726
2727
// Brand new syncstring
2728
// TODO: worry about race condition with everybody making themselves
2729
// have user_id 0... and also setting doctype.
2730
this.my_user_id = 0;
2731
this.users = [this.client.client_id()];
2732
const obj = {
2733
string_id: this.string_id,
2734
project_id: this.project_id,
2735
path: this.path,
2736
last_snapshot: this.last_snapshot,
2737
users: this.users,
2738
doctype: JSON.stringify(this.doctype),
2739
last_active: this.client.server_time(),
2740
};
2741
this.syncstring_table.set(obj);
2742
await this.syncstring_table.save();
2743
this.settings = Map();
2744
this.emit("metadata-change");
2745
this.emit("settings-change", this.settings);
2746
};
2747
2748
private handle_syncstring_update_existing_document = async (
2749
x: any,
2750
data: Map<string, any>,
2751
): Promise<void> => {
2752
if (this.state === "closed") {
2753
return;
2754
}
2755
// Existing document.
2756
2757
if (this.path == null) {
2758
// We just opened the file -- emit a load time estimate.
2759
this.emit("load-time-estimate", { type: "ready", time: 1 });
2760
}
2761
// TODO: handle doctype change here (?)
2762
this.setLastSnapshot(x.last_snapshot);
2763
this.last_seq = x.last_seq;
2764
this.snapshot_interval = x.snapshot_interval ?? DEFAULT_SNAPSHOT_INTERVAL;
2765
this.users = x.users ?? [];
2766
if (x.project_id) {
2767
// @ts-ignore
2768
this.project_id = x.project_id;
2769
}
2770
if (x.path) {
2771
// @ts-ignore
2772
this.path = x.path;
2773
}
2774
2775
const settings = data.get("settings", Map());
2776
if (settings !== this.settings) {
2777
this.settings = settings;
2778
this.emit("settings-change", settings);
2779
}
2780
2781
if (this.client != null) {
2782
// Ensure that this client is in the list of clients
2783
const client_id: string = this.client_id();
2784
this.my_user_id = this.users.indexOf(client_id);
2785
if (this.my_user_id === -1) {
2786
this.my_user_id = this.users.length;
2787
this.users.push(client_id);
2788
await this.set_syncstring_table({
2789
users: this.users,
2790
});
2791
}
2792
}
2793
this.emit("metadata-change");
2794
};
2795
2796
private init_watch = async (): Promise<void> => {
2797
if (!(await this.isFileServer())) {
2798
// ensures we are NOT watching anything
2799
await this.update_watch_path();
2800
return;
2801
}
2802
2803
// If path isn't being properly watched, make it so.
2804
if (this.watch_path !== this.path) {
2805
await this.update_watch_path(this.path);
2806
}
2807
2808
await this.pending_save_to_disk();
2809
};
2810
2811
private pending_save_to_disk = async (): Promise<void> => {
2812
this.assert_table_is_ready("syncstring");
2813
if (!(await this.isFileServer())) {
2814
return;
2815
}
2816
2817
const x = this.syncstring_table.get_one();
2818
// Check if there is a pending save-to-disk that is needed.
2819
if (x != null && x.getIn(["save", "state"]) === "requested") {
2820
try {
2821
await this.save_to_disk();
2822
} catch (err) {
2823
const dbg = this.dbg("pending_save_to_disk");
2824
dbg(`ERROR saving to disk in pending_save_to_disk -- ${err}`);
2825
}
2826
}
2827
};
2828
2829
private update_watch_path = async (path?: string): Promise<void> => {
2830
const dbg = this.dbg("update_watch_path");
2831
if (this.file_watcher != null) {
2832
// clean up
2833
dbg("close");
2834
this.file_watcher.close();
2835
delete this.file_watcher;
2836
delete this.watch_path;
2837
}
2838
if (path != null && this.client.is_deleted(path, this.project_id)) {
2839
dbg(`not setting up watching since "${path}" is explicitly deleted`);
2840
return;
2841
}
2842
if (path == null) {
2843
dbg("not opening another watcher since path is null");
2844
this.watch_path = path;
2845
return;
2846
}
2847
if (this.watch_path != null) {
2848
// this case is impossible since we deleted it above if it is was defined.
2849
dbg("watch_path already defined");
2850
return;
2851
}
2852
dbg("opening watcher...");
2853
if (this.state === "closed") {
2854
throw Error("must not be closed");
2855
}
2856
this.watch_path = path;
2857
try {
2858
if (!(await callback2(this.client.path_exists, { path }))) {
2859
if (this.client.is_deleted(path, this.project_id)) {
2860
dbg(`not setting up watching since "${path}" is explicitly deleted`);
2861
return;
2862
}
2863
// path does not exist
2864
dbg(
2865
`write '${path}' to disk from syncstring in-memory database version`,
2866
);
2867
const data = this.to_str();
2868
await callback2(this.client.write_file, { path, data });
2869
dbg(`wrote '${path}' to disk`);
2870
}
2871
} catch (err) {
2872
// This can happen, e.g, if path is read only.
2873
dbg(`could NOT write '${path}' to disk -- ${err}`);
2874
await this.update_if_file_is_read_only();
2875
// In this case, can't really setup a file watcher.
2876
return;
2877
}
2878
2879
dbg("now requesting to watch file");
2880
this.file_watcher = this.client.watch_file({ path });
2881
this.file_watcher.on("change", this.handle_file_watcher_change);
2882
this.file_watcher.on("delete", this.handle_file_watcher_delete);
2883
this.setupReadOnlyTimer();
2884
};
2885
2886
private setupReadOnlyTimer = () => {
2887
if (this.read_only_timer) {
2888
clearInterval(this.read_only_timer as any);
2889
this.read_only_timer = 0;
2890
}
2891
this.read_only_timer = <any>(
2892
setInterval(this.update_if_file_is_read_only, READ_ONLY_CHECK_INTERVAL_MS)
2893
);
2894
};
2895
2896
private handle_file_watcher_change = async (ctime: Date): Promise<void> => {
2897
const dbg = this.dbg("handle_file_watcher_change");
2898
const time: number = ctime.valueOf();
2899
dbg(
2900
`file_watcher: change, ctime=${time}, this.save_to_disk_start_ctime=${this.save_to_disk_start_ctime}, this.save_to_disk_end_ctime=${this.save_to_disk_end_ctime}`,
2901
);
2902
if (
2903
this.save_to_disk_start_ctime == null ||
2904
(this.save_to_disk_end_ctime != null &&
2905
time - this.save_to_disk_end_ctime >= RECENT_SAVE_TO_DISK_MS)
2906
) {
2907
// Either we never saved to disk, or the last attempt
2908
// to save was at least RECENT_SAVE_TO_DISK_MS ago, and it finished,
2909
// so definitely this change event was not caused by it.
2910
dbg("load_from_disk since no recent save to disk");
2911
await this.load_from_disk();
2912
return;
2913
}
2914
};
2915
2916
private handle_file_watcher_delete = async (): Promise<void> => {
2917
this.assert_is_ready("handle_file_watcher_delete");
2918
const dbg = this.dbg("handle_file_watcher_delete");
2919
dbg("delete: set_deleted and closing");
2920
await this.client.set_deleted(this.path, this.project_id);
2921
this.close();
2922
};
2923
2924
private load_from_disk = async (): Promise<number> => {
2925
const path = this.path;
2926
const dbg = this.dbg("load_from_disk");
2927
dbg();
2928
const exists: boolean = await callback2(this.client.path_exists, { path });
2929
let size: number;
2930
if (!exists) {
2931
dbg("file no longer exists -- setting to blank");
2932
size = 0;
2933
this.from_str("");
2934
} else {
2935
dbg("file exists");
2936
await this.update_if_file_is_read_only();
2937
2938
const data = await callback2<string>(this.client.path_read, {
2939
path,
2940
maxsize_MB: MAX_FILE_SIZE_MB,
2941
});
2942
2943
size = data.length;
2944
dbg(`got it -- length=${size}`);
2945
this.from_str(data);
2946
this.commit();
2947
// we also know that this is the version on disk, so we update the hash
2948
await this.set_save({
2949
state: "done",
2950
error: "",
2951
hash: hash_string(data),
2952
});
2953
}
2954
// save new version to database, which we just set via from_str.
2955
await this.save();
2956
return size;
2957
};
2958
2959
private set_save = async (save: {
2960
state: string;
2961
error: string;
2962
hash?: number;
2963
expected_hash?: number;
2964
time?: number;
2965
}): Promise<void> => {
2966
this.assert_table_is_ready("syncstring");
2967
// set timestamp of when the save happened; this can be useful
2968
// for coordinating running code, etc.... and is just generally useful.
2969
const cur = this.syncstring_table_get_one().toJS()?.save;
2970
if (cur != null) {
2971
if (
2972
cur.state == save.state &&
2973
cur.error == save.error &&
2974
cur.hash == (save.hash ?? cur.hash) &&
2975
cur.expected_hash == (save.expected_hash ?? cur.expected_hash) &&
2976
cur.time == (save.time ?? cur.time)
2977
) {
2978
// no genuine change, so no point in wasting cycles on updating.
2979
return;
2980
}
2981
}
2982
if (!save.time) {
2983
save.time = Date.now();
2984
}
2985
await this.set_syncstring_table({ save });
2986
};
2987
2988
private set_read_only = async (read_only: boolean): Promise<void> => {
2989
this.assert_table_is_ready("syncstring");
2990
await this.set_syncstring_table({ read_only });
2991
};
2992
2993
is_read_only = (): boolean => {
2994
this.assert_table_is_ready("syncstring");
2995
return this.syncstring_table_get_one().get("read_only");
2996
};
2997
2998
wait_until_read_only_known = async (): Promise<void> => {
2999
await this.wait_until_ready();
3000
function read_only_defined(t: SyncTable): boolean {
3001
const x = t.get_one();
3002
if (x == null) {
3003
return false;
3004
}
3005
return x.get("read_only") != null;
3006
}
3007
await this.syncstring_table.wait(read_only_defined, 5 * 60);
3008
};
3009
3010
/* Returns true if the current live version of this document has
3011
a different hash than the version mostly recently saved to disk.
3012
I.e., if there are changes that have not yet been **saved to
3013
disk**. See the other function has_uncommitted_changes below
3014
for determining whether there are changes that haven't been
3015
commited to the database yet. Returns *undefined* if
3016
initialization not even done yet. */
3017
has_unsaved_changes = (): boolean | undefined => {
3018
if (this.state !== "ready") {
3019
return;
3020
}
3021
const dbg = this.dbg("has_unsaved_changes");
3022
try {
3023
return this.hash_of_saved_version() !== this.hash_of_live_version();
3024
} catch (err) {
3025
dbg(
3026
"exception computing hash_of_saved_version and hash_of_live_version",
3027
err,
3028
);
3029
// This could happen, e.g. when syncstring_table isn't connected
3030
// in some edge case. Better to just say we don't know then crash
3031
// everything. See https://github.com/sagemathinc/cocalc/issues/3577
3032
return;
3033
}
3034
};
3035
3036
// Returns hash of last version saved to disk (as far as we know).
3037
hash_of_saved_version = (): number | undefined => {
3038
if (this.state !== "ready") {
3039
return;
3040
}
3041
return this.syncstring_table_get_one().getIn(["save", "hash"]) as
3042
| number
3043
| undefined;
3044
};
3045
3046
/* Return hash of the live version of the document,
3047
or undefined if the document isn't loaded yet.
3048
(TODO: write faster version of this for syncdb, which
3049
avoids converting to a string, which is a waste of time.) */
3050
hash_of_live_version = (): number | undefined => {
3051
if (this.state !== "ready") {
3052
return;
3053
}
3054
return hash_string(this.doc.to_str());
3055
};
3056
3057
/* Return true if there are changes to this syncstring that
3058
have not been committed to the database (with the commit
3059
acknowledged). This does not mean the file has been
3060
written to disk; however, it does mean that it safe for
3061
the user to close their browser.
3062
*/
3063
has_uncommitted_changes = (): boolean => {
3064
if (this.state !== "ready") {
3065
return false;
3066
}
3067
return this.patches_table.has_uncommitted_changes();
3068
};
3069
3070
// Commit any changes to the live document to
3071
// history as a new patch. Returns true if there
3072
// were changes and false otherwise. This works
3073
// fine offline, and does not wait until anything
3074
// is saved to the network, etc.
3075
commit = (emitChangeImmediately = false): boolean => {
3076
if (this.last == null || this.doc == null || this.last.is_equal(this.doc)) {
3077
return false;
3078
}
3079
// console.trace('commit');
3080
3081
if (emitChangeImmediately) {
3082
// used for local clients. NOTE: don't do this without explicit
3083
// request, since it could in some cases cause serious trouble.
3084
// E.g., for the jupyter backend doing this by default causes
3085
// an infinite recurse. Having this as an option is important, e.g.,
3086
// to avoid flicker/delay in the UI.
3087
this.emit_change();
3088
}
3089
3090
// Now save to backend as a new patch:
3091
this.emit("user-change");
3092
const patch = this.last.make_patch(this.doc); // must be nontrivial
3093
this.last = this.doc;
3094
// ... and save that to patches table
3095
const time = this.next_patch_time();
3096
this.commit_patch(time, patch);
3097
this.save(); // so eventually also gets sent out.
3098
this.touchProject();
3099
return true;
3100
};
3101
3102
/* Initiates a save of file to disk, then waits for the
3103
state to change. */
3104
save_to_disk = async (): Promise<void> => {
3105
if (this.state != "ready") {
3106
// We just make save_to_disk a successful
3107
// no operation, if the document is either
3108
// closed or hasn't finished opening, since
3109
// there's a lot of code that tries to save
3110
// on exit/close or automatically, and it
3111
// is difficult to ensure it all checks state
3112
// properly.
3113
return;
3114
}
3115
const dbg = this.dbg("save_to_disk");
3116
if (this.client.is_deleted(this.path, this.project_id)) {
3117
dbg("not saving to disk because deleted");
3118
await this.set_save({ state: "done", error: "" });
3119
return;
3120
}
3121
3122
// Make sure to include changes to the live document.
3123
// A side effect of save if we didn't do this is potentially
3124
// discarding them, which is obviously not good.
3125
this.commit();
3126
3127
dbg("initiating the save");
3128
if (!this.has_unsaved_changes()) {
3129
dbg("no unsaved changes, so don't save");
3130
// CRITICAL: this optimization is assumed by
3131
// autosave, etc.
3132
await this.set_save({ state: "done", error: "" });
3133
return;
3134
}
3135
3136
if (this.is_read_only()) {
3137
dbg("read only, so can't save to disk");
3138
// save should fail if file is read only and there are changes
3139
throw Error("can't save readonly file with changes to disk");
3140
}
3141
3142
// First make sure any changes are saved to the database.
3143
// One subtle case where this matters is that loading a file
3144
// with \r's into codemirror changes them to \n...
3145
if (!(await this.isFileServer())) {
3146
dbg("browser client -- sending any changes over network");
3147
await this.save();
3148
dbg("save done; now do actual save to the *disk*.");
3149
this.assert_is_ready("save_to_disk - after save");
3150
}
3151
3152
try {
3153
await this.save_to_disk_aux();
3154
} catch (err) {
3155
if (this.state != "ready") return;
3156
const error = `save to disk failed -- ${err}`;
3157
dbg(error);
3158
if (await this.isFileServer()) {
3159
this.set_save({ error, state: "done" });
3160
}
3161
}
3162
if (this.state != "ready") return;
3163
3164
if (!(await this.isFileServer())) {
3165
dbg("now wait for the save to disk to finish");
3166
this.assert_is_ready("save_to_disk - waiting to finish");
3167
await this.wait_for_save_to_disk_done();
3168
}
3169
this.update_has_unsaved_changes();
3170
};
3171
3172
/* Export the (currently loaded) history of editing of this
3173
document to a simple JSON-able object. */
3174
export_history = (options: HistoryExportOptions = {}): HistoryEntry[] => {
3175
this.assert_is_ready("export_history");
3176
const info = this.syncstring_table.get_one();
3177
if (info == null || !info.has("users")) {
3178
throw Error("syncstring table must be defined and users initialized");
3179
}
3180
const account_ids: string[] = info.get("users").toJS();
3181
assertDefined(this.patch_list);
3182
return export_history(account_ids, this.patch_list, options);
3183
};
3184
3185
private update_has_unsaved_changes = (): void => {
3186
if (this.state != "ready") {
3187
// This can happen, since this is called by a debounced function.
3188
// Make it a no-op in case we're not ready.
3189
// See https://github.com/sagemathinc/cocalc/issues/3577
3190
return;
3191
}
3192
const cur = this.has_unsaved_changes();
3193
if (cur !== this.last_has_unsaved_changes) {
3194
this.emit("has-unsaved-changes", cur);
3195
this.last_has_unsaved_changes = cur;
3196
}
3197
};
3198
3199
// wait for save.state to change state.
3200
private wait_for_save_to_disk_done = async (): Promise<void> => {
3201
const dbg = this.dbg("wait_for_save_to_disk_done");
3202
dbg();
3203
function until(table): boolean {
3204
const done = table.get_one().getIn(["save", "state"]) === "done";
3205
dbg("checking... done=", done);
3206
return done;
3207
}
3208
3209
let last_err: string | undefined = undefined;
3210
const f = async () => {
3211
dbg("f");
3212
if (
3213
this.state != "ready" ||
3214
this.client.is_deleted(this.path, this.project_id)
3215
) {
3216
dbg("not ready or deleted - no longer trying to save.");
3217
return;
3218
}
3219
try {
3220
dbg("waiting until done...");
3221
await this.syncstring_table.wait(until, 15);
3222
} catch (err) {
3223
dbg("timed out after 15s");
3224
throw Error("timed out");
3225
}
3226
if (
3227
this.state != "ready" ||
3228
this.client.is_deleted(this.path, this.project_id)
3229
) {
3230
dbg("not ready or deleted - no longer trying to save.");
3231
return;
3232
}
3233
const err = this.syncstring_table_get_one().getIn(["save", "error"]) as
3234
| string
3235
| undefined;
3236
if (err) {
3237
dbg("error", err);
3238
last_err = err;
3239
throw Error(err);
3240
}
3241
dbg("done, with no error.");
3242
last_err = undefined;
3243
return;
3244
};
3245
await retry_until_success({
3246
f,
3247
max_tries: 8,
3248
desc: "wait_for_save_to_disk_done",
3249
});
3250
if (
3251
this.state != "ready" ||
3252
this.client.is_deleted(this.path, this.project_id)
3253
) {
3254
return;
3255
}
3256
if (last_err && typeof this.client.log_error != null) {
3257
this.client.log_error?.({
3258
string_id: this.string_id,
3259
path: this.path,
3260
project_id: this.project_id,
3261
error: `Error saving file -- ${last_err}`,
3262
});
3263
}
3264
};
3265
3266
/* Auxiliary function 2 for saving to disk:
3267
If this is associated with
3268
a project and has a filename.
3269
A user (web browsers) sets the save state to requested.
3270
The project sets the state to saving, does the save
3271
to disk, then sets the state to done.
3272
*/
3273
private save_to_disk_aux = async (): Promise<void> => {
3274
this.assert_is_ready("save_to_disk_aux");
3275
3276
if (!(await this.isFileServer())) {
3277
return await this.save_to_disk_non_filesystem_owner();
3278
}
3279
3280
try {
3281
return await this.save_to_disk_filesystem_owner();
3282
} catch (err) {
3283
this.emit("save_to_disk_filesystem_owner", err);
3284
throw err;
3285
}
3286
};
3287
3288
private save_to_disk_non_filesystem_owner = async (): Promise<void> => {
3289
this.assert_is_ready("save_to_disk_non_filesystem_owner");
3290
3291
if (!this.has_unsaved_changes()) {
3292
/* Browser client has no unsaved changes,
3293
so don't need to save --
3294
CRITICAL: this optimization is assumed by autosave.
3295
*/
3296
return;
3297
}
3298
const x = this.syncstring_table.get_one();
3299
if (x != null && x.getIn(["save", "state"]) === "requested") {
3300
// Nothing to do -- save already requested, which is
3301
// all the browser client has to do.
3302
return;
3303
}
3304
3305
// string version of this doc
3306
const data: string = this.to_str();
3307
const expected_hash = hash_string(data);
3308
await this.set_save({ state: "requested", error: "", expected_hash });
3309
};
3310
3311
private save_to_disk_filesystem_owner = async (): Promise<void> => {
3312
this.assert_is_ready("save_to_disk_filesystem_owner");
3313
const dbg = this.dbg("save_to_disk_filesystem_owner");
3314
3315
// check if on-disk version is same as in memory, in
3316
// which case no save is needed.
3317
const data = this.to_str(); // string version of this doc
3318
const hash = hash_string(data);
3319
dbg("hash = ", hash);
3320
3321
/*
3322
// TODO: put this consistency check back in (?).
3323
const expected_hash = this.syncstring_table
3324
.get_one()
3325
.getIn(["save", "expected_hash"]);
3326
*/
3327
3328
if (hash === this.hash_of_saved_version()) {
3329
// No actual save to disk needed; still we better
3330
// record this fact in table in case it
3331
// isn't already recorded
3332
this.set_save({ state: "done", error: "", hash });
3333
return;
3334
}
3335
3336
const path = this.path;
3337
if (!path) {
3338
const err = "cannot save without path";
3339
this.set_save({ state: "done", error: err });
3340
throw Error(err);
3341
}
3342
3343
dbg("project - write to disk file", path);
3344
// set window to slightly earlier to account for clock
3345
// imprecision.
3346
// Over an sshfs mount, all stats info is **rounded down
3347
// to the nearest second**, which this also takes care of.
3348
this.save_to_disk_start_ctime = Date.now() - 1500;
3349
this.save_to_disk_end_ctime = undefined;
3350
try {
3351
await callback2(this.client.write_file, { path, data });
3352
this.assert_is_ready("save_to_disk_filesystem_owner -- after write_file");
3353
const stat = await callback2(this.client.path_stat, { path });
3354
this.assert_is_ready("save_to_disk_filesystem_owner -- after path_state");
3355
this.save_to_disk_end_ctime = stat.ctime.valueOf() + 1500;
3356
this.set_save({
3357
state: "done",
3358
error: "",
3359
hash: hash_string(data),
3360
});
3361
} catch (err) {
3362
this.set_save({ state: "done", error: JSON.stringify(err) });
3363
throw err;
3364
}
3365
};
3366
3367
/*
3368
When the underlying synctable that defines the state
3369
of the document changes due to new remote patches, this
3370
function is called.
3371
It handles update of the remote version, updating our
3372
live version as a result.
3373
*/
3374
private handle_patch_update = async (changed_keys): Promise<void> => {
3375
// console.log("handle_patch_update", { changed_keys });
3376
if (changed_keys == null || changed_keys.length === 0) {
3377
// this happens right now when we do a save.
3378
return;
3379
}
3380
3381
const dbg = this.dbg("handle_patch_update");
3382
//dbg(changed_keys);
3383
if (this.patch_update_queue == null) {
3384
this.patch_update_queue = [];
3385
}
3386
for (const key of changed_keys) {
3387
this.patch_update_queue.push(key);
3388
}
3389
3390
dbg("Clear patch update_queue in a later event loop...");
3391
await delay(1);
3392
await this.handle_patch_update_queue();
3393
dbg("done");
3394
};
3395
3396
/*
3397
Whenever new patches are added to this.patches_table,
3398
their timestamp gets added to this.patch_update_queue.
3399
*/
3400
private handle_patch_update_queue = async (): Promise<void> => {
3401
const dbg = this.dbg("handle_patch_update_queue");
3402
try {
3403
this.handle_patch_update_queue_running = true;
3404
while (this.state != "closed" && this.patch_update_queue.length > 0) {
3405
dbg("queue size = ", this.patch_update_queue.length);
3406
const v: Patch[] = [];
3407
for (const key of this.patch_update_queue) {
3408
let x = this.patches_table.get(key);
3409
if (x == null) {
3410
continue;
3411
}
3412
if (!Map.isMap(x)) {
3413
// TODO: my NATS synctable-stream doesn't convert to immutable on get.
3414
x = fromJS(x);
3415
}
3416
// may be null, e.g., when deleted.
3417
const t = x.get("time");
3418
// Optimization: only need to process patches that we didn't
3419
// create ourselves during this session.
3420
if (t && !this.my_patches[t.valueOf()]) {
3421
const p = this.processPatch({ x });
3422
//dbg(`patch=${JSON.stringify(p)}`);
3423
if (p != null) {
3424
v.push(p);
3425
}
3426
}
3427
}
3428
this.patch_update_queue = [];
3429
this.emit("patch-update-queue-empty");
3430
assertDefined(this.patch_list);
3431
this.patch_list.add(v);
3432
3433
dbg("waiting for remote and doc to sync...");
3434
this.sync_remote_and_doc(v.length > 0);
3435
await this.patches_table.save();
3436
if (this.state === ("closed" as State)) return; // closed during await; nothing further to do
3437
dbg("remote and doc now synced");
3438
3439
if (this.patch_update_queue.length > 0) {
3440
// It is very important that next loop happen in a later
3441
// event loop to avoid the this.sync_remote_and_doc call
3442
// in this.handle_patch_update_queue above from causing
3443
// sync_remote_and_doc to get called from within itself,
3444
// due to synctable changes being emited on save.
3445
dbg("wait for next event loop");
3446
await delay(1);
3447
}
3448
}
3449
} finally {
3450
if (this.state == "closed") return; // got closed, so nothing further to do
3451
3452
// OK, done and nothing in the queue
3453
// Notify save() to try again -- it may have
3454
// paused waiting for this to clear.
3455
dbg("done");
3456
this.handle_patch_update_queue_running = false;
3457
this.emit("handle_patch_update_queue_done");
3458
}
3459
};
3460
3461
/* Disable and enable sync. When disabled we still
3462
collect patches from upstream (but do not apply them
3463
locally), and changes we make are broadcast into
3464
the patch stream. When we re-enable sync, all
3465
patches are put together in the stream and
3466
everything is synced as normal. This is useful, e.g.,
3467
to make it so a user **actively** editing a document is
3468
not interrupted by being forced to sync (in particular,
3469
by the 'before-change' event that they use to update
3470
the live document).
3471
3472
Also, delay_sync will delay syncing local with upstream
3473
for the given number of ms. Calling it regularly while
3474
user is actively editing to avoid them being bothered
3475
by upstream patches getting merged in.
3476
3477
IMPORTANT: I implemented this, but it is NOT used anywhere
3478
else in the codebase, so don't trust that it works.
3479
*/
3480
3481
disable_sync = (): void => {
3482
this.sync_is_disabled = true;
3483
};
3484
3485
enable_sync = (): void => {
3486
this.sync_is_disabled = false;
3487
this.sync_remote_and_doc(true);
3488
};
3489
3490
delay_sync = (timeout_ms = 2000): void => {
3491
clearTimeout(this.delay_sync_timer);
3492
this.disable_sync();
3493
this.delay_sync_timer = setTimeout(() => {
3494
this.enable_sync();
3495
}, timeout_ms);
3496
};
3497
3498
/*
3499
Merge remote patches and live version to create new live version,
3500
which is equal to result of applying all patches.
3501
*/
3502
private sync_remote_and_doc = (upstreamPatches: boolean): void => {
3503
if (this.last == null || this.doc == null || this.sync_is_disabled) {
3504
return;
3505
}
3506
3507
// Critical to save what we have now so it doesn't get overwritten during
3508
// before-change or setting this.doc below. This caused
3509
// https://github.com/sagemathinc/cocalc/issues/5871
3510
this.commit();
3511
3512
if (upstreamPatches && this.state == "ready") {
3513
// First save any unsaved changes from the live document, which this
3514
// sync-doc doesn't acutally know the state of. E.g., this is some
3515
// rapidly changing live editor with changes not yet saved here.
3516
this.emit("before-change");
3517
// As a result of the emit in the previous line, all kinds of
3518
// nontrivial listener code probably just ran, and it should
3519
// have updated this.doc. We commit this.doc, so that the
3520
// upstream patches get applied against the correct live this.doc.
3521
this.commit();
3522
}
3523
3524
// Compute the global current state of the document,
3525
// which is got by applying all patches in order.
3526
// It is VERY important to do this, even if the
3527
// document is not yet ready, since it is critical
3528
// to properly set the state of this.doc to the value
3529
// of the patch list (e.g., not doing this 100% breaks
3530
// opening a file for the first time on cocalc-docker).
3531
assertDefined(this.patch_list);
3532
const new_remote = this.patch_list.value();
3533
if (!this.doc.is_equal(new_remote)) {
3534
// There is a possibility that live document changed, so
3535
// set to new version.
3536
this.last = this.doc = new_remote;
3537
if (this.state == "ready") {
3538
this.emit("after-change");
3539
this.emit_change();
3540
}
3541
}
3542
};
3543
3544
// Immediately alert all watchers of all changes since
3545
// last time.
3546
private emit_change = (): void => {
3547
this.emit("change", this.doc?.changes(this.before_change));
3548
this.before_change = this.doc;
3549
};
3550
3551
// Alert to changes soon, but debounced in case there are a large
3552
// number of calls in a group. This is called by default.
3553
// The debounce param is 0, since the idea is that this just waits
3554
// until the next "render loop" to avoid huge performance issues
3555
// with a nested for loop of sets. Doing it this way, massively
3556
// simplifies client code.
3557
emit_change_debounced: typeof this.emit_change = debounce(
3558
this.emit_change,
3559
0,
3560
);
3561
3562
private set_syncstring_table = async (obj, save = true) => {
3563
const value0 = this.syncstring_table_get_one();
3564
const value = mergeDeep(value0, fromJS(obj));
3565
if (value0.equals(value)) {
3566
return;
3567
}
3568
this.syncstring_table.set(value);
3569
if (save) {
3570
await this.syncstring_table.save();
3571
}
3572
};
3573
3574
// this keeps the project from idle timing out -- it happens
3575
// whenever there is an edit to the file by a browser, and
3576
// keeps the project from stopping.
3577
private touchProject = throttle(() => {
3578
if (this.client?.is_browser()) {
3579
this.client.touch_project?.(this.path);
3580
}
3581
}, 60000);
3582
3583
private initInterestLoop = async () => {
3584
if (!this.client.is_browser()) {
3585
// only browser clients -- so actual humans
3586
return;
3587
}
3588
const touch = async () => {
3589
if (this.state == "closed" || this.client?.touchOpenFile == null) return;
3590
await this.client.touchOpenFile({
3591
path: this.path,
3592
project_id: this.project_id,
3593
doctype: this.doctype,
3594
});
3595
};
3596
// then every CONAT_OPEN_FILE_TOUCH_INTERVAL (30 seconds).
3597
await until(
3598
async () => {
3599
if (this.state == "closed") {
3600
return true;
3601
}
3602
await touch();
3603
return false;
3604
},
3605
{
3606
start: CONAT_OPEN_FILE_TOUCH_INTERVAL,
3607
max: CONAT_OPEN_FILE_TOUCH_INTERVAL,
3608
},
3609
);
3610
};
3611
}
3612
3613
function isCompletePatchStream(dstream) {
3614
if (dstream.length == 0) {
3615
return false;
3616
}
3617
const first = dstream[0];
3618
if (first.is_snapshot) {
3619
return false;
3620
}
3621
if (first.parents == null) {
3622
// first ever commit
3623
return true;
3624
}
3625
for (let i = 1; i < dstream.length; i++) {
3626
if (dstream[i].is_snapshot && dstream[i].time == first.time) {
3627
return true;
3628
}
3629
}
3630
return false;
3631
}
3632
3633