Path: blob/master/src/packages/database/postgres/project-and-user-tracker.ts
1503 views
/*1* This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.2* License: MS-RSL – see LICENSE.md for details3*/45import { EventEmitter } from "events";6import { callback } from "awaiting";7import { callback2 } from "@cocalc/util/async-utils";8import { close, len } from "@cocalc/util/misc";9import { PostgreSQL, QueryOptions, QueryResult } from "./types";10import { getPoolClient } from "@cocalc/database/pool";11import { ChangeEvent, Changes } from "./changefeed";1213const { all_results } = require("../postgres-base");1415type SetOfAccounts = { [account_id: string]: boolean };16type SetOfProjects = { [project_id: string]: boolean };1718type State = "init" | "ready" | "closed";1920export class ProjectAndUserTracker extends EventEmitter {21private state: State = "init";2223private db: PostgreSQL;2425private feed: Changes;2627// by a "set" we mean map to boolean...28// set of accounts we care about29private accounts: SetOfAccounts = {};3031// map from from project_id to set of users of a given project32private users: { [project_id: string]: SetOfAccounts } = {};3334// map from account_id to set of projects of a given user35private projects: { [account_id: string]: SetOfProjects } = {};3637// map from account_id to map from account_ids to *number* of38// projects the two users have in common.39private collabs: {40[account_id: string]: { [account_id: string]: number };41} = {};4243private register_todo: { [account_id: string]: Function[] } = {};4445// used for a runtime sanity check46private do_register_lock: boolean = false;4748constructor(db: PostgreSQL) {49super();50this.db = db;51}5253private assert_state(state: State, f: string): void {54if (this.state != state) {55throw Error(`${f}: state must be ${state} but it is ${this.state}`);56}57}5859async init(): Promise<void> {60this.assert_state("init", "init");61const dbg = this.dbg("init");62dbg("Initializing Project and user tracker...");6364// every changefeed for a user will result in a listener65// on an event on this one object.66this.setMaxListeners(1000);6768try {69// create changefeed listening on changes to projects table70this.feed = await callback2(this.db.changefeed, {71table: "projects",72select: { project_id: "UUID" },73watch: ["users"],74where: {},75});76dbg("Success");77} catch (err) {78this.handle_error(err);79return;80}81this.feed.on("change", this.handle_change.bind(this));82this.feed.on("error", this.handle_error.bind(this));83this.feed.on("close", () => this.handle_error("changefeed closed"));84this.set_state("ready");85}8687private dbg(f) {88return this.db._dbg(`Tracker.${f}`);89}9091private handle_error(err) {92if (this.state == "closed") return;93// There was an error in the changefeed.94// Error is totally fatal, so we close up shop.95const dbg = this.dbg("handle_error");96dbg(`err='${err}'`);97this.emit("error", err);98this.close();99}100101private set_state(state: State): void {102this.state = state;103this.emit(state);104}105106close() {107if (this.state == "closed") {108return;109}110this.set_state("closed");111this.removeAllListeners();112if (this.feed != null) {113this.feed.close();114}115if (this.register_todo != null) {116// clear any outstanding callbacks117for (const account_id in this.register_todo) {118const callbacks = this.register_todo[account_id];119if (callbacks != null) {120for (const cb of callbacks) {121cb("closed - project-and-user-tracker");122}123}124}125}126close(this);127this.state = "closed";128}129130private handle_change_delete(old_val): void {131this.assert_state("ready", "handle_change_delete");132const { project_id } = old_val;133if (this.users[project_id] == null) {134// no users, so nothing to worry about.135return;136}137for (const account_id in this.users[project_id]) {138this.remove_user_from_project(account_id, project_id);139}140return;141}142143private handle_change(x: ChangeEvent): void {144this.assert_state("ready", "handle_change");145if (x.action === "delete") {146if (x.old_val == null) return; // should never happen147this.handle_change_delete(x.old_val);148} else {149if (x.new_val == null) return; // should never happen150this.handle_change_update(x.new_val);151}152}153154private async handle_change_update(new_val): Promise<void> {155this.assert_state("ready", "handle_change_update");156const dbg = this.dbg("handle_change_update");157dbg(new_val);158// users on a project changed or project created159const { project_id } = new_val;160let users: QueryResult<{ account_id: string }>[];161try {162users = await query<{ account_id: string }>(this.db, {163query: "SELECT jsonb_object_keys(users) AS account_id FROM projects",164where: { "project_id = $::UUID": project_id },165});166} catch (err) {167this.handle_error(err);168return;169}170if (this.users[project_id] == null) {171// we are not already watching this project172let any = false;173for (const { account_id } of users) {174if (this.accounts[account_id]) {175any = true;176break;177}178}179if (!any) {180// *and* none of our tracked users are on this project... so don't care181return;182}183}184185// first add any users who got added, and record which accounts are relevant186const users_now: SetOfAccounts = {};187for (const { account_id } of users) {188users_now[account_id] = true;189}190const users_before: SetOfAccounts =191this.users[project_id] != null ? this.users[project_id] : {};192for (const account_id in users_now) {193if (!users_before[account_id]) {194this.add_user_to_project(account_id, project_id);195}196}197for (const account_id in users_before) {198if (!users_now[account_id]) {199this.remove_user_from_project(account_id, project_id);200}201}202}203204// add and remove user from a project, maintaining our data structures205private add_user_to_project(account_id: string, project_id: string): void {206this.assert_state("ready", "add_user_to_project");207if (208this.projects[account_id] != null &&209this.projects[account_id][project_id]210) {211// already added212return;213}214this.emit(`add_user_to_project-${account_id}`, project_id);215if (this.users[project_id] == null) {216this.users[project_id] = {};217}218const users = this.users[project_id];219users[account_id] = true;220221if (this.projects[account_id] == null) {222this.projects[account_id] = {};223}224const projects = this.projects[account_id];225projects[project_id] = true;226227if (this.collabs[account_id] == null) {228this.collabs[account_id] = {};229}230const collabs = this.collabs[account_id];231232for (const other_account_id in users) {233if (collabs[other_account_id] != null) {234collabs[other_account_id] += 1;235} else {236collabs[other_account_id] = 1;237this.emit(`add_collaborator-${account_id}`, other_account_id);238}239const other_collabs = this.collabs[other_account_id];240if (other_collabs[account_id] != null) {241other_collabs[account_id] += 1;242} else {243other_collabs[account_id] = 1;244this.emit(`add_collaborator-${other_account_id}`, account_id);245}246}247}248249private remove_user_from_project(250account_id: string,251project_id: string,252no_emit: boolean = false,253): void {254this.assert_state("ready", "remove_user_from_project");255if (256(account_id != null ? account_id.length : undefined) !== 36 ||257(project_id != null ? project_id.length : undefined) !== 36258) {259throw Error("invalid account_id or project_id");260}261if (262!(this.projects[account_id] != null263? this.projects[account_id][project_id]264: undefined)265) {266return;267}268if (!no_emit) {269this.emit(`remove_user_from_project-${account_id}`, project_id);270}271if (this.collabs[account_id] == null) {272this.collabs[account_id] = {};273}274for (const other_account_id in this.users[project_id]) {275this.collabs[account_id][other_account_id] -= 1;276if (this.collabs[account_id][other_account_id] === 0) {277delete this.collabs[account_id][other_account_id];278if (!no_emit) {279this.emit(`remove_collaborator-${account_id}`, other_account_id);280}281}282this.collabs[other_account_id][account_id] -= 1;283if (this.collabs[other_account_id][account_id] === 0) {284delete this.collabs[other_account_id][account_id];285if (!no_emit) {286this.emit(`remove_collaborator-${other_account_id}`, account_id);287}288}289}290delete this.users[project_id][account_id];291delete this.projects[account_id][project_id];292}293294// Register the given account so that this client watches the database295// in order to be aware of all projects and collaborators of the296// given account.297public async register(account_id: string): Promise<void> {298await callback(this.register_cb.bind(this), account_id);299}300301private register_cb(account_id: string, cb: Function): void {302if (this.state == "closed") return;303const dbg = this.dbg(`register(account_id="${account_id}"`);304if (this.accounts[account_id] != null) {305dbg(306`already registered -- listener counts ${JSON.stringify(307this.listener_counts(account_id),308)}`,309);310cb();311return;312}313if (len(this.register_todo) === 0) {314// no registration is currently happening315this.register_todo[account_id] = [cb];316// kick things off -- this will keep registering accounts317// until everything is done, then this.register_todo will have length 0.318this.do_register();319} else {320// Accounts are being registered right now. Add to the todo list.321const v = this.register_todo[account_id];322if (v != null) {323v.push(cb);324} else {325this.register_todo[account_id] = [cb];326}327}328}329330// Call do_register_work to completely clear the work331// this.register_todo work queue.332// NOTE: do_register_work does each account, *one after another*,333// rather than doing everything in parallel. WARNING: DO NOT334// rewrite this to do everything in parallel, unless you think you335// thoroughly understand the algorithm, since I think336// doing things in parallel would horribly break!337private async do_register(): Promise<void> {338if (this.state != "ready") return; // maybe shutting down.339340// This gets a single account_id, if there are any:341let account_id: string | undefined = undefined;342for (account_id in this.register_todo) break;343if (account_id == null) return; // nothing to do.344345const dbg = this.dbg(`do_register(account_id="${account_id}")`);346dbg("registering account");347if (this.do_register_lock)348throw Error("do_register MUST NOT be called twice at once!");349this.do_register_lock = true;350try {351// Register this account, which starts by getting ALL of their projects.352// 2021-05-10: it's possible that a single user has a really large number of projects, so353// we get the projects in batches to reduce the load on the database.354// We must have *all* projects, since this is used frequently in355// database/postgres-user-queries.coffee356// when deciding how to route listen/notify events to users. Search for357// "# Check that this is a project we have read access to"358// E.g., without all projects, changefeeds would just fail to update,359// which, e.g., makes it so projects appear to not start.360// Register this account361const client = await getPoolClient();362let projects: QueryResult[] = [];363const batchSize = 2000;364try {365// Start a transaction366await client.query("BEGIN");367// Declare a cursor368await client.query(369`370DECLARE project_cursor CURSOR FOR SELECT project_id, json_agg(o) as users371FROM (SELECT project_id, jsonb_object_keys(users) AS o FROM projects372WHERE users ? $1::TEXT) AS s group by s.project_id`,373[account_id],374);375// Fetch rows in batches376while (true) {377const batchResult = await client.query(378`FETCH ${batchSize} FROM project_cursor`,379);380projects = projects.concat(batchResult.rows);381if (batchResult.rows.length < batchSize) {382break; // No more rows to fetch383}384}385// Close the cursor and end the transaction386await client.query("CLOSE project_cursor");387await client.query("COMMIT");388} catch (err) {389// If an error occurs, roll back the transaction390await client.query("ROLLBACK");391const e = `error registering '${account_id}' -- err=${err}`;392dbg(e);393this.handle_error(e); // it is game over.394return;395} finally {396client.release();397}398// we care about this account_id399this.accounts[account_id] = true;400401dbg("now adding all users to project tracker -- start");402for (const project of projects) {403if (this.users[project.project_id] != null) {404// already have data about this project405continue;406} else {407for (const collab_account_id of project.users) {408if (collab_account_id == null) {409continue; // just skip; evidently rarely this isn't defined, maybe due to db error?410}411this.add_user_to_project(collab_account_id, project.project_id);412}413}414}415dbg("successfully registered -- stop");416417// call the callbacks418const callbacks = this.register_todo[account_id];419if (callbacks != null) {420for (const cb of callbacks) {421cb();422}423// We are done (trying to) register account_id.424delete this.register_todo[account_id];425}426} finally {427this.do_register_lock = false;428}429if (len(this.register_todo) > 0) {430// Deal with next account that needs to be registered431this.do_register();432}433}434435// TODO: not actually used by any client yet... but obviously it should436// be since this would be a work/memory leak, right?437public unregister(account_id: string): void {438if (this.state == "closed") return;439if (!this.accounts[account_id]) return; // nothing to do440441const v: string[] = [];442for (const project_id in this.projects[account_id]) {443v.push(project_id);444}445delete this.accounts[account_id];446447// Forget about any projects they account_id is on that are no longer448// necessary to watch...449for (const project_id of v) {450let need: boolean = false;451for (const other_account_id in this.users[project_id]) {452if (this.accounts[other_account_id] != null) {453need = true;454break;455}456}457if (!need) {458for (const other_account_id in this.users[project_id]) {459this.remove_user_from_project(other_account_id, project_id, true);460}461delete this.users[project_id];462}463}464}465466// Return *set* of projects that this user is a collaborator on467public get_projects(account_id: string): { [project_id: string]: boolean } {468if (this.state == "closed") return {};469if (!this.accounts[account_id]) {470// This should never happen, but very rarely it DOES. I do not know why, having studied the471// code. But when it does, just raising an exception blows up the server really badly.472// So for now we just async register the account, return that it is not a collaborator473// on anything. Then some query will fail, get tried again, and work since registration will474// have finished.475//throw Error("account (='#{account_id}') must be registered")476this.register(account_id);477return {};478}479return this.projects[account_id] != null ? this.projects[account_id] : {};480}481482// map from collabs of account_id to number of projects they collab483// on (account_id itself counted twice)484public get_collabs(account_id: string): { [account_id: string]: number } {485if (this.state == "closed") return {};486return this.collabs[account_id] != null ? this.collabs[account_id] : {};487}488489private listener_counts(account_id: string): object {490const x: any = {};491for (const e of [492"add_user_to_project",493"remove_user_from_project",494"add_collaborator",495"remove_collaborator",496]) {497const event = e + "-" + account_id;498x[event] = this.listenerCount(event);499}500return x;501}502}503504function all_query(db: PostgreSQL, opts: QueryOptions, cb: Function): void {505if (opts == null) {506throw Error("opts must not be null");507}508opts.cb = all_results(cb);509db._query(opts);510}511512async function query<T>(513db: PostgreSQL,514opts: QueryOptions,515): Promise<QueryResult<T>[]> {516return await callback(all_query, db, opts);517}518519520