Path: blob/master/src/packages/conat/hub/changefeeds/server.ts
1453 views
import { type Client, type ConatSocketServer } from "@cocalc/conat/core/client";1import { uuid } from "@cocalc/util/misc";2import { UsageMonitor } from "@cocalc/conat/monitor/usage";3import { getLogger } from "@cocalc/conat/client";4import { isValidUUID } from "@cocalc/util/misc";5import {6SUBJECT,7MAX_PER_ACCOUNT,8MAX_GLOBAL,9SERVER_KEEPALIVE,10KEEPALIVE_TIMEOUT,11RESOURCE,12} from "./util";13export { type ConatSocketServer };1415const logger = getLogger("hub:changefeeds:server");1617export function changefeedServer({18client,19userQuery,20cancelQuery,21}: {22client: Client;2324userQuery: (opts: {25query: object;26options?: object[];27account_id: string;28changes: string;29cb: Function;30}) => void;3132cancelQuery: (uuid: string) => void;33}): ConatSocketServer {34logger.debug("creating changefeed server");3536const usage = new UsageMonitor({37maxPerUser: MAX_PER_ACCOUNT,38max: MAX_GLOBAL,39resource: RESOURCE,40log: (...args) => {41logger.debug(RESOURCE, ...args);42},43});4445const server = client.socket.listen(SUBJECT, {46keepAlive: SERVER_KEEPALIVE,47keepAliveTimeout: KEEPALIVE_TIMEOUT,48});4950server.on("connection", (socket) => {51const v = socket.subject.split(".")[1];52if (!v?.startsWith("account-")) {53socket.write({ error: "only account users can create changefeeds" });54logger.debug(55"socket.close: due to changefeed request from non-account subject",56socket.subject,57);58socket.close();59return;60}61const account_id = v.slice("account-".length);62if (!isValidUUID(account_id)) {63logger.debug(64"socket.close: due to invalid uuid",65socket.subject,66account_id,67);68socket.write({69error: `invalid account_id -- '${account_id}', subject=${socket.subject}`,70});71socket.close();72return;73}74let added = false;75try {76usage.add(account_id);77added = true;78} catch (err) {79socket.write({ error: `${err}`, code: err.code });80logger.debug(81"socket.close: due to usage error (limit exceeded?)",82socket.subject,83err,84);85socket.close();86return;87}8889const changes = uuid();9091socket.on("closed", () => {92logger.debug(93"socket.close: cleaning up since socket closed for some external reason (timeout?)",94socket.subject,95);96if (added) {97usage.delete(account_id);98}99cancelQuery(changes);100});101102let running = false;103socket.on("data", (data) => {104if (running) {105socket.write({ error: "exactly one query per connection" });106logger.debug(107"socket.close: due to attempt to run more than one query",108socket.subject,109);110socket.close();111return;112}113running = true;114const { query, options } = data;115try {116userQuery({117query,118options,119changes,120account_id,121cb: (error, update) => {122// logger.debug("got: ", { error, update });123try {124socket.write({ error, update });125} catch (err) {126// happens if buffer is full or socket is closed. in both cases, might was well127// just close the socket.128error = `${err}`;129}130if (error) {131logger.debug(132"socket.close: due to error from postgres changefeed",133socket.subject,134error,135);136socket.close();137}138},139});140} catch (err) {141logger.debug(142"socket.close: due to error creating query",143socket.subject,144err,145);146try {147socket.write({ error: `${err}` });148} catch {}149socket.close();150}151});152});153server.on("closed", () => {154logger.debug("shutting down changefeed server");155usage.close();156});157158return server;159}160161162