Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/conat/sync/synctable-stream.ts
1452 views
1
/*
2
Conat implementation of the idea of a "SyncTable", but
3
for streaming data.
4
5
**This is ONLY for the scope of patches in a single
6
project and IS NOT USED IN ANY WAY WITH POSTGRESQL.**
7
8
It uses a conat persistent stream to store the elements
9
in a well defined order.
10
*/
11
12
import jsonStableStringify from "json-stable-stringify";
13
import { keys } from "lodash";
14
import { cmp_Date, is_array, isValidUUID } from "@cocalc/util/misc";
15
import { client_db } from "@cocalc/util/db-schema/client-db";
16
import { EventEmitter } from "events";
17
import { dstream, DStream } from "./dstream";
18
import { fromJS, Map } from "immutable";
19
import type { Configuration } from "@cocalc/conat/sync/core-stream";
20
import type { Client } from "@cocalc/conat/core/client";
21
22
export type State = "disconnected" | "connected" | "closed";
23
24
function toKey(x): string | undefined {
25
if (x === undefined) {
26
return undefined;
27
} else if (typeof x === "object") {
28
return jsonStableStringify(x);
29
} else {
30
return `${x}`;
31
}
32
}
33
34
export class SyncTableStream extends EventEmitter {
35
public readonly table;
36
private primaryKeys: string[];
37
private project_id?: string;
38
private path: string;
39
private string_id: string;
40
private data: any = {};
41
private state: State = "disconnected";
42
private dstream?: DStream;
43
private client: Client;
44
private getHook: Function;
45
private config?: Partial<Configuration>;
46
private start_seq?: number;
47
private noInventory?: boolean;
48
private ephemeral?: boolean;
49
50
constructor({
51
query,
52
client,
53
account_id: _account_id,
54
project_id,
55
immutable,
56
config,
57
start_seq,
58
noInventory,
59
ephemeral,
60
}: {
61
query;
62
client: Client;
63
account_id?: string;
64
project_id?: string;
65
immutable?: boolean;
66
config?: Partial<Configuration>;
67
start_seq?: number;
68
noInventory?: boolean;
69
ephemeral?: boolean;
70
}) {
71
super();
72
this.client = client;
73
this.noInventory = noInventory;
74
this.ephemeral = ephemeral;
75
this.setMaxListeners(1000);
76
this.getHook = immutable ? fromJS : (x) => x;
77
this.config = config;
78
this.start_seq = start_seq;
79
const table = keys(query)[0];
80
this.table = table;
81
if (table != "patches") {
82
throw Error("only the patches table is supported");
83
}
84
this.project_id = project_id ?? query[table][0].project_id;
85
if (!isValidUUID(this.project_id)) {
86
throw Error("query MUST specify a valid project_id");
87
}
88
this.path = query[table][0].path;
89
if (!this.path) {
90
throw Error("path MUST be specified");
91
}
92
query[table][0].string_id = this.string_id = client_db.sha1(
93
this.project_id,
94
this.path,
95
);
96
this.primaryKeys = client_db.primary_keys(table);
97
}
98
99
init = async () => {
100
const name = patchesStreamName({ string_id: this.string_id });
101
this.dstream = await dstream({
102
name,
103
client: this.client,
104
project_id: this.project_id,
105
config: this.config,
106
desc: { path: this.path },
107
start_seq: this.start_seq,
108
noInventory: this.noInventory,
109
ephemeral: this.ephemeral,
110
});
111
this.dstream.on("change", (mesg) => {
112
this.handle(mesg, true);
113
});
114
this.dstream.on("reject", (err) => {
115
console.warn("synctable-stream: rejected - ", err);
116
});
117
for (const mesg of this.dstream.getAll()) {
118
this.handle(mesg, false);
119
}
120
this.setState("connected");
121
};
122
123
private setState = (state: State): void => {
124
this.state = state;
125
this.emit(state);
126
};
127
128
get_state = () => {
129
return this.state;
130
};
131
132
private primaryString = (obj): string => {
133
const obj2 = { ...obj, string_id: this.string_id };
134
return toKey(this.primaryKeys.map((pk) => obj2[pk]))!;
135
};
136
137
getKey = this.primaryString;
138
139
set = (obj) => {
140
if (Map.isMap(obj)) {
141
obj = obj.toJS();
142
}
143
// console.log("set", obj);
144
// delete string_id since it is redundant info
145
const key = this.primaryString(obj);
146
const { string_id, ...obj2 } = obj;
147
if (this.data[key] != null) {
148
throw Error(
149
`object with key ${key} was already written to the stream -- written data cannot be modified`,
150
);
151
return;
152
}
153
// console.log("set - publish", obj);
154
if (this.dstream == null) {
155
throw Error("closed");
156
}
157
this.dstream.publish(obj2);
158
};
159
160
private handle = (obj, changeEvent: boolean) => {
161
if (this.state == "closed") {
162
return true;
163
}
164
const key = this.primaryString(obj);
165
this.data[key] = { ...obj };
166
if (changeEvent) {
167
this.emit("change", [key]);
168
}
169
};
170
171
get = (obj?) => {
172
if (obj == null) {
173
return this.getHook(this.data);
174
}
175
if (typeof obj == "string") {
176
return this.getHook(this.data[obj]);
177
}
178
if (is_array(obj)) {
179
const x: any = {};
180
for (const key of obj) {
181
x[this.primaryString(key)] = this.get(key);
182
}
183
return this.getHook(x);
184
}
185
let key;
186
if (typeof obj == "object") {
187
key = this.primaryString(obj);
188
} else {
189
key = `${key}`;
190
}
191
return this.getHook(this.data[key]);
192
};
193
194
getSortedTimes = () => {
195
return Object.values(this.data)
196
.map(({ time }) => time)
197
.sort(cmp_Date);
198
};
199
200
close = () => {
201
if (this.state === "closed") {
202
// already closed
203
return;
204
}
205
this.setState("closed");
206
this.removeAllListeners();
207
this.dstream?.close();
208
delete this.dstream;
209
// @ts-ignore
210
delete this.client;
211
};
212
213
delete = async (_obj) => {
214
throw Error("delete: not implemented for stream synctable");
215
};
216
217
save = () => {
218
this.dstream?.save();
219
};
220
221
has_uncommitted_changes = () => {
222
return this.dstream?.hasUnsavedChanges();
223
};
224
}
225
226
export function patchesStreamName({
227
project_id,
228
path,
229
string_id,
230
}: {
231
project_id?: string;
232
path?: string;
233
string_id?: string;
234
}): string {
235
if (!string_id) {
236
if (!project_id || !path) {
237
throw Error("one of string_id or both project_id and path must be given");
238
}
239
string_id = client_db.sha1(project_id, path);
240
}
241
if (!string_id) {
242
throw Error("bug");
243
}
244
return `patches:${string_id}`;
245
}
246
247