Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/database/postgres/project-and-user-tracker.ts
1503 views
1
/*
2
* This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.
3
* License: MS-RSL – see LICENSE.md for details
4
*/
5
6
import { EventEmitter } from "events";
7
import { callback } from "awaiting";
8
import { callback2 } from "@cocalc/util/async-utils";
9
import { close, len } from "@cocalc/util/misc";
10
import { PostgreSQL, QueryOptions, QueryResult } from "./types";
11
import { getPoolClient } from "@cocalc/database/pool";
12
import { ChangeEvent, Changes } from "./changefeed";
13
14
const { all_results } = require("../postgres-base");
15
16
type SetOfAccounts = { [account_id: string]: boolean };
17
type SetOfProjects = { [project_id: string]: boolean };
18
19
type State = "init" | "ready" | "closed";
20
21
export class ProjectAndUserTracker extends EventEmitter {
22
private state: State = "init";
23
24
private db: PostgreSQL;
25
26
private feed: Changes;
27
28
// by a "set" we mean map to boolean...
29
// set of accounts we care about
30
private accounts: SetOfAccounts = {};
31
32
// map from from project_id to set of users of a given project
33
private users: { [project_id: string]: SetOfAccounts } = {};
34
35
// map from account_id to set of projects of a given user
36
private projects: { [account_id: string]: SetOfProjects } = {};
37
38
// map from account_id to map from account_ids to *number* of
39
// projects the two users have in common.
40
private collabs: {
41
[account_id: string]: { [account_id: string]: number };
42
} = {};
43
44
private register_todo: { [account_id: string]: Function[] } = {};
45
46
// used for a runtime sanity check
47
private do_register_lock: boolean = false;
48
49
constructor(db: PostgreSQL) {
50
super();
51
this.db = db;
52
}
53
54
private assert_state(state: State, f: string): void {
55
if (this.state != state) {
56
throw Error(`${f}: state must be ${state} but it is ${this.state}`);
57
}
58
}
59
60
async init(): Promise<void> {
61
this.assert_state("init", "init");
62
const dbg = this.dbg("init");
63
dbg("Initializing Project and user tracker...");
64
65
// every changefeed for a user will result in a listener
66
// on an event on this one object.
67
this.setMaxListeners(1000);
68
69
try {
70
// create changefeed listening on changes to projects table
71
this.feed = await callback2(this.db.changefeed, {
72
table: "projects",
73
select: { project_id: "UUID" },
74
watch: ["users"],
75
where: {},
76
});
77
dbg("Success");
78
} catch (err) {
79
this.handle_error(err);
80
return;
81
}
82
this.feed.on("change", this.handle_change.bind(this));
83
this.feed.on("error", this.handle_error.bind(this));
84
this.feed.on("close", () => this.handle_error("changefeed closed"));
85
this.set_state("ready");
86
}
87
88
private dbg(f) {
89
return this.db._dbg(`Tracker.${f}`);
90
}
91
92
private handle_error(err) {
93
if (this.state == "closed") return;
94
// There was an error in the changefeed.
95
// Error is totally fatal, so we close up shop.
96
const dbg = this.dbg("handle_error");
97
dbg(`err='${err}'`);
98
this.emit("error", err);
99
this.close();
100
}
101
102
private set_state(state: State): void {
103
this.state = state;
104
this.emit(state);
105
}
106
107
close() {
108
if (this.state == "closed") {
109
return;
110
}
111
this.set_state("closed");
112
this.removeAllListeners();
113
if (this.feed != null) {
114
this.feed.close();
115
}
116
if (this.register_todo != null) {
117
// clear any outstanding callbacks
118
for (const account_id in this.register_todo) {
119
const callbacks = this.register_todo[account_id];
120
if (callbacks != null) {
121
for (const cb of callbacks) {
122
cb("closed - project-and-user-tracker");
123
}
124
}
125
}
126
}
127
close(this);
128
this.state = "closed";
129
}
130
131
private handle_change_delete(old_val): void {
132
this.assert_state("ready", "handle_change_delete");
133
const { project_id } = old_val;
134
if (this.users[project_id] == null) {
135
// no users, so nothing to worry about.
136
return;
137
}
138
for (const account_id in this.users[project_id]) {
139
this.remove_user_from_project(account_id, project_id);
140
}
141
return;
142
}
143
144
private handle_change(x: ChangeEvent): void {
145
this.assert_state("ready", "handle_change");
146
if (x.action === "delete") {
147
if (x.old_val == null) return; // should never happen
148
this.handle_change_delete(x.old_val);
149
} else {
150
if (x.new_val == null) return; // should never happen
151
this.handle_change_update(x.new_val);
152
}
153
}
154
155
private async handle_change_update(new_val): Promise<void> {
156
this.assert_state("ready", "handle_change_update");
157
const dbg = this.dbg("handle_change_update");
158
dbg(new_val);
159
// users on a project changed or project created
160
const { project_id } = new_val;
161
let users: QueryResult<{ account_id: string }>[];
162
try {
163
users = await query<{ account_id: string }>(this.db, {
164
query: "SELECT jsonb_object_keys(users) AS account_id FROM projects",
165
where: { "project_id = $::UUID": project_id },
166
});
167
} catch (err) {
168
this.handle_error(err);
169
return;
170
}
171
if (this.users[project_id] == null) {
172
// we are not already watching this project
173
let any = false;
174
for (const { account_id } of users) {
175
if (this.accounts[account_id]) {
176
any = true;
177
break;
178
}
179
}
180
if (!any) {
181
// *and* none of our tracked users are on this project... so don't care
182
return;
183
}
184
}
185
186
// first add any users who got added, and record which accounts are relevant
187
const users_now: SetOfAccounts = {};
188
for (const { account_id } of users) {
189
users_now[account_id] = true;
190
}
191
const users_before: SetOfAccounts =
192
this.users[project_id] != null ? this.users[project_id] : {};
193
for (const account_id in users_now) {
194
if (!users_before[account_id]) {
195
this.add_user_to_project(account_id, project_id);
196
}
197
}
198
for (const account_id in users_before) {
199
if (!users_now[account_id]) {
200
this.remove_user_from_project(account_id, project_id);
201
}
202
}
203
}
204
205
// add and remove user from a project, maintaining our data structures
206
private add_user_to_project(account_id: string, project_id: string): void {
207
this.assert_state("ready", "add_user_to_project");
208
if (
209
this.projects[account_id] != null &&
210
this.projects[account_id][project_id]
211
) {
212
// already added
213
return;
214
}
215
this.emit(`add_user_to_project-${account_id}`, project_id);
216
if (this.users[project_id] == null) {
217
this.users[project_id] = {};
218
}
219
const users = this.users[project_id];
220
users[account_id] = true;
221
222
if (this.projects[account_id] == null) {
223
this.projects[account_id] = {};
224
}
225
const projects = this.projects[account_id];
226
projects[project_id] = true;
227
228
if (this.collabs[account_id] == null) {
229
this.collabs[account_id] = {};
230
}
231
const collabs = this.collabs[account_id];
232
233
for (const other_account_id in users) {
234
if (collabs[other_account_id] != null) {
235
collabs[other_account_id] += 1;
236
} else {
237
collabs[other_account_id] = 1;
238
this.emit(`add_collaborator-${account_id}`, other_account_id);
239
}
240
const other_collabs = this.collabs[other_account_id];
241
if (other_collabs[account_id] != null) {
242
other_collabs[account_id] += 1;
243
} else {
244
other_collabs[account_id] = 1;
245
this.emit(`add_collaborator-${other_account_id}`, account_id);
246
}
247
}
248
}
249
250
private remove_user_from_project(
251
account_id: string,
252
project_id: string,
253
no_emit: boolean = false,
254
): void {
255
this.assert_state("ready", "remove_user_from_project");
256
if (
257
(account_id != null ? account_id.length : undefined) !== 36 ||
258
(project_id != null ? project_id.length : undefined) !== 36
259
) {
260
throw Error("invalid account_id or project_id");
261
}
262
if (
263
!(this.projects[account_id] != null
264
? this.projects[account_id][project_id]
265
: undefined)
266
) {
267
return;
268
}
269
if (!no_emit) {
270
this.emit(`remove_user_from_project-${account_id}`, project_id);
271
}
272
if (this.collabs[account_id] == null) {
273
this.collabs[account_id] = {};
274
}
275
for (const other_account_id in this.users[project_id]) {
276
this.collabs[account_id][other_account_id] -= 1;
277
if (this.collabs[account_id][other_account_id] === 0) {
278
delete this.collabs[account_id][other_account_id];
279
if (!no_emit) {
280
this.emit(`remove_collaborator-${account_id}`, other_account_id);
281
}
282
}
283
this.collabs[other_account_id][account_id] -= 1;
284
if (this.collabs[other_account_id][account_id] === 0) {
285
delete this.collabs[other_account_id][account_id];
286
if (!no_emit) {
287
this.emit(`remove_collaborator-${other_account_id}`, account_id);
288
}
289
}
290
}
291
delete this.users[project_id][account_id];
292
delete this.projects[account_id][project_id];
293
}
294
295
// Register the given account so that this client watches the database
296
// in order to be aware of all projects and collaborators of the
297
// given account.
298
public async register(account_id: string): Promise<void> {
299
await callback(this.register_cb.bind(this), account_id);
300
}
301
302
private register_cb(account_id: string, cb: Function): void {
303
if (this.state == "closed") return;
304
const dbg = this.dbg(`register(account_id="${account_id}"`);
305
if (this.accounts[account_id] != null) {
306
dbg(
307
`already registered -- listener counts ${JSON.stringify(
308
this.listener_counts(account_id),
309
)}`,
310
);
311
cb();
312
return;
313
}
314
if (len(this.register_todo) === 0) {
315
// no registration is currently happening
316
this.register_todo[account_id] = [cb];
317
// kick things off -- this will keep registering accounts
318
// until everything is done, then this.register_todo will have length 0.
319
this.do_register();
320
} else {
321
// Accounts are being registered right now. Add to the todo list.
322
const v = this.register_todo[account_id];
323
if (v != null) {
324
v.push(cb);
325
} else {
326
this.register_todo[account_id] = [cb];
327
}
328
}
329
}
330
331
// Call do_register_work to completely clear the work
332
// this.register_todo work queue.
333
// NOTE: do_register_work does each account, *one after another*,
334
// rather than doing everything in parallel. WARNING: DO NOT
335
// rewrite this to do everything in parallel, unless you think you
336
// thoroughly understand the algorithm, since I think
337
// doing things in parallel would horribly break!
338
private async do_register(): Promise<void> {
339
if (this.state != "ready") return; // maybe shutting down.
340
341
// This gets a single account_id, if there are any:
342
let account_id: string | undefined = undefined;
343
for (account_id in this.register_todo) break;
344
if (account_id == null) return; // nothing to do.
345
346
const dbg = this.dbg(`do_register(account_id="${account_id}")`);
347
dbg("registering account");
348
if (this.do_register_lock)
349
throw Error("do_register MUST NOT be called twice at once!");
350
this.do_register_lock = true;
351
try {
352
// Register this account, which starts by getting ALL of their projects.
353
// 2021-05-10: it's possible that a single user has a really large number of projects, so
354
// we get the projects in batches to reduce the load on the database.
355
// We must have *all* projects, since this is used frequently in
356
// database/postgres-user-queries.coffee
357
// when deciding how to route listen/notify events to users. Search for
358
// "# Check that this is a project we have read access to"
359
// E.g., without all projects, changefeeds would just fail to update,
360
// which, e.g., makes it so projects appear to not start.
361
// Register this account
362
const client = await getPoolClient();
363
let projects: QueryResult[] = [];
364
const batchSize = 2000;
365
try {
366
// Start a transaction
367
await client.query("BEGIN");
368
// Declare a cursor
369
await client.query(
370
`
371
DECLARE project_cursor CURSOR FOR SELECT project_id, json_agg(o) as users
372
FROM (SELECT project_id, jsonb_object_keys(users) AS o FROM projects
373
WHERE users ? $1::TEXT) AS s group by s.project_id`,
374
[account_id],
375
);
376
// Fetch rows in batches
377
while (true) {
378
const batchResult = await client.query(
379
`FETCH ${batchSize} FROM project_cursor`,
380
);
381
projects = projects.concat(batchResult.rows);
382
if (batchResult.rows.length < batchSize) {
383
break; // No more rows to fetch
384
}
385
}
386
// Close the cursor and end the transaction
387
await client.query("CLOSE project_cursor");
388
await client.query("COMMIT");
389
} catch (err) {
390
// If an error occurs, roll back the transaction
391
await client.query("ROLLBACK");
392
const e = `error registering '${account_id}' -- err=${err}`;
393
dbg(e);
394
this.handle_error(e); // it is game over.
395
return;
396
} finally {
397
client.release();
398
}
399
// we care about this account_id
400
this.accounts[account_id] = true;
401
402
dbg("now adding all users to project tracker -- start");
403
for (const project of projects) {
404
if (this.users[project.project_id] != null) {
405
// already have data about this project
406
continue;
407
} else {
408
for (const collab_account_id of project.users) {
409
if (collab_account_id == null) {
410
continue; // just skip; evidently rarely this isn't defined, maybe due to db error?
411
}
412
this.add_user_to_project(collab_account_id, project.project_id);
413
}
414
}
415
}
416
dbg("successfully registered -- stop");
417
418
// call the callbacks
419
const callbacks = this.register_todo[account_id];
420
if (callbacks != null) {
421
for (const cb of callbacks) {
422
cb();
423
}
424
// We are done (trying to) register account_id.
425
delete this.register_todo[account_id];
426
}
427
} finally {
428
this.do_register_lock = false;
429
}
430
if (len(this.register_todo) > 0) {
431
// Deal with next account that needs to be registered
432
this.do_register();
433
}
434
}
435
436
// TODO: not actually used by any client yet... but obviously it should
437
// be since this would be a work/memory leak, right?
438
public unregister(account_id: string): void {
439
if (this.state == "closed") return;
440
if (!this.accounts[account_id]) return; // nothing to do
441
442
const v: string[] = [];
443
for (const project_id in this.projects[account_id]) {
444
v.push(project_id);
445
}
446
delete this.accounts[account_id];
447
448
// Forget about any projects they account_id is on that are no longer
449
// necessary to watch...
450
for (const project_id of v) {
451
let need: boolean = false;
452
for (const other_account_id in this.users[project_id]) {
453
if (this.accounts[other_account_id] != null) {
454
need = true;
455
break;
456
}
457
}
458
if (!need) {
459
for (const other_account_id in this.users[project_id]) {
460
this.remove_user_from_project(other_account_id, project_id, true);
461
}
462
delete this.users[project_id];
463
}
464
}
465
}
466
467
// Return *set* of projects that this user is a collaborator on
468
public get_projects(account_id: string): { [project_id: string]: boolean } {
469
if (this.state == "closed") return {};
470
if (!this.accounts[account_id]) {
471
// This should never happen, but very rarely it DOES. I do not know why, having studied the
472
// code. But when it does, just raising an exception blows up the server really badly.
473
// So for now we just async register the account, return that it is not a collaborator
474
// on anything. Then some query will fail, get tried again, and work since registration will
475
// have finished.
476
//throw Error("account (='#{account_id}') must be registered")
477
this.register(account_id);
478
return {};
479
}
480
return this.projects[account_id] != null ? this.projects[account_id] : {};
481
}
482
483
// map from collabs of account_id to number of projects they collab
484
// on (account_id itself counted twice)
485
public get_collabs(account_id: string): { [account_id: string]: number } {
486
if (this.state == "closed") return {};
487
return this.collabs[account_id] != null ? this.collabs[account_id] : {};
488
}
489
490
private listener_counts(account_id: string): object {
491
const x: any = {};
492
for (const e of [
493
"add_user_to_project",
494
"remove_user_from_project",
495
"add_collaborator",
496
"remove_collaborator",
497
]) {
498
const event = e + "-" + account_id;
499
x[event] = this.listenerCount(event);
500
}
501
return x;
502
}
503
}
504
505
function all_query(db: PostgreSQL, opts: QueryOptions, cb: Function): void {
506
if (opts == null) {
507
throw Error("opts must not be null");
508
}
509
opts.cb = all_results(cb);
510
db._query(opts);
511
}
512
513
async function query<T>(
514
db: PostgreSQL,
515
opts: QueryOptions,
516
): Promise<QueryResult<T>[]> {
517
return await callback(all_query, db, opts);
518
}
519
520