Path: blob/master/src/packages/conat/persist/client.ts
1452 views
import {1type Message as ConatMessage,2type Client,3type MessageData,4ConatError,5} from "@cocalc/conat/core/client";6import { type ConatSocketClient } from "@cocalc/conat/socket";7import { EventIterator } from "@cocalc/util/event-iterator";8import type {9StorageOptions,10Configuration,11SetOperation,12DeleteOperation,13StoredMessage,14PartialInventory,15} from "./storage";16export { StoredMessage, StorageOptions };17import { persistSubject, type User } from "./util";18import { assertHasWritePermission as assertHasWritePermission0 } from "./auth";19import { refCacheSync } from "@cocalc/util/refcache";20import { EventEmitter } from "events";21import { getLogger } from "@cocalc/conat/client";22import { delay } from "awaiting";2324const logger = getLogger("persist:client");2526export type ChangefeedEvent = (SetOperation | DeleteOperation)[];2728export type Changefeed = EventIterator<ChangefeedEvent>;2930// const paths = new Set<string>();3132export { type PersistStreamClient };33class PersistStreamClient extends EventEmitter {34public socket: ConatSocketClient;35private changefeeds: any[] = [];36private state: "ready" | "closed" = "ready";37private lastSeq?: number;38private reconnecting = false;39private gettingMissed = false;40private changesWhenGettingMissed: ChangefeedEvent[] = [];4142constructor(43private client: Client,44private storage: StorageOptions,45private user: User,46) {47super();48// paths.add(this.storage.path);49logger.debug("constructor", this.storage);50this.init();51}5253private init = () => {54if (this.client.state == "closed") {55this.close();56return;57}58if (this.state == "closed") {59return;60}61this.socket?.close();62// console.log("making a socket connection to ", persistSubject(this.user));63this.socket = this.client.socket.connect(persistSubject(this.user), {64desc: `persist: ${this.storage.path}`,65reconnection: false,66});67logger.debug(68"init",69this.storage.path,70"connecting to ",71persistSubject(this.user),72);73// console.log(74// "persist -- create",75// this.storage.path,76// paths,77// "with id=",78// this.socket.id,79// );80this.socket.write({81storage: this.storage,82changefeed: this.changefeeds.length > 0,83});8485// get any messages from the stream that we missed while offline.86if (this.reconnecting) {87this.getMissed();88}8990this.socket.once("disconnected", () => {91this.reconnecting = true;92this.socket.removeAllListeners();93setTimeout(this.init, 1000);94});95this.socket.once("closed", () => {96this.reconnecting = true;97this.socket.removeAllListeners();98setTimeout(this.init, 1000);99});100101this.socket.on("data", (updates, headers) => {102if (updates == null && headers != null) {103// has to be an error104this.emit(105"error",106new ConatError(headers?.error, { code: headers?.code }),107);108this.close();109}110if (this.gettingMissed) {111this.changesWhenGettingMissed.push(updates);112} else {113this.changefeedEmit(updates);114}115});116};117118private getMissed = async () => {119try {120this.gettingMissed = true;121this.changesWhenGettingMissed.length = 0;122while (this.state == "ready") {123try {124await this.socket.waitUntilReady(90000);125break;126} catch {127// timeout128await delay(1000);129}130}131// console.log("getMissed", {132// path: this.storage.path,133// lastSeq: this.lastSeq,134// changefeeds: this.changefeeds.length,135// });136if (this.changefeeds.length == 0) {137return;138}139// we are resuming after a disconnect when we had some data up to lastSeq.140// let's grab anything we missed.141const sub = await this.socket.requestMany(null, {142headers: {143cmd: "getAll",144start_seq: this.lastSeq,145timeout: 15000,146} as any,147timeout: 15000,148maxWait: 15000,149});150for await (const { data: updates, headers } of sub) {151if (headers?.error) {152// give up153return;154}155if (updates == null || this.socket.state == "closed") {156// done157return;158}159this.changefeedEmit(updates);160}161} finally {162this.gettingMissed = false;163for (const updates of this.changesWhenGettingMissed) {164this.changefeedEmit(updates);165}166this.changesWhenGettingMissed.length = 0;167}168};169170private changefeedEmit = (updates: ChangefeedEvent) => {171updates = updates.filter((update) => {172if (update.op == "delete") {173return true;174} else {175if (update.seq > (this.lastSeq ?? 0)) {176this.lastSeq = update.seq;177return true;178}179}180return false;181});182if (updates.length == 0) {183return;184}185this.emit("changefeed", updates);186};187188close = () => {189logger.debug("close", this.storage);190// paths.delete(this.storage.path);191// console.log("persist -- close", this.storage.path, paths);192this.state = "closed";193this.emit("closed");194for (const iter of this.changefeeds) {195iter.close();196this.changefeeds.length = 0;197}198this.socket.close();199};200201// The changefeed is *guaranteed* to deliver every message202// in the stream **exactly once and in order**, even if there203// are disconnects, failovers, etc. Dealing with dropped messages,204// duplicates, etc., is NOT the responsibility of clients.205changefeed = async (): Promise<Changefeed> => {206// activate changefeed mode (so server publishes updates -- this is idempotent)207const resp = await this.socket.request(null, {208headers: {209cmd: "changefeed",210},211});212if (resp.headers?.error) {213throw new ConatError(`${resp.headers?.error}`, {214code: resp.headers?.code,215});216}217// an iterator over any updates that are published.218const iter = new EventIterator<ChangefeedEvent>(this, "changefeed", {219map: (args) => args[0],220});221this.changefeeds.push(iter);222return iter;223};224225set = async ({226key,227ttl,228previousSeq,229msgID,230messageData,231timeout,232}: SetOptions & { timeout?: number }): Promise<{233seq: number;234time: number;235}> => {236return this.checkForError(237await this.socket.request(null, {238raw: messageData.raw,239encoding: messageData.encoding,240headers: {241headers: messageData.headers,242cmd: "set",243key,244ttl,245previousSeq,246msgID,247timeout,248},249timeout,250}),251);252};253254setMany = async (255ops: SetOptions[],256{ timeout }: { timeout?: number } = {},257): Promise<258({ seq: number; time: number } | { error: string; code?: any })[]259> => {260return this.checkForError(261await this.socket.request(ops, {262headers: {263cmd: "setMany",264timeout,265},266timeout,267}),268);269};270271delete = async ({272timeout,273seq,274last_seq,275all,276}: {277timeout?: number;278seq?: number;279last_seq?: number;280all?: boolean;281}): Promise<{ seqs: number[] }> => {282return this.checkForError(283await this.socket.request(null, {284headers: {285cmd: "delete",286seq,287last_seq,288all,289timeout,290},291timeout,292}),293);294};295296config = async ({297config,298timeout,299}: {300config?: Partial<Configuration>;301timeout?: number;302} = {}): Promise<Configuration> => {303return this.checkForError(304await this.socket.request(null, {305headers: {306cmd: "config",307config,308timeout,309} as any,310timeout,311}),312);313};314315inventory = async (timeout?): Promise<PartialInventory> => {316return this.checkForError(317await this.socket.request(null, {318headers: {319cmd: "inventory",320} as any,321timeout,322}),323);324};325326get = async ({327seq,328key,329timeout,330}: {331timeout?: number;332} & (333| { seq: number; key?: undefined }334| { key: string; seq?: undefined }335)): Promise<ConatMessage | undefined> => {336const resp = await this.socket.request(null, {337headers: { cmd: "get", seq, key, timeout } as any,338timeout,339});340this.checkForError(resp, true);341if (resp.headers == null) {342return undefined;343}344return resp;345};346347// returns async iterator over arrays of stored messages348async *getAll({349start_seq,350end_seq,351timeout,352maxWait,353}: {354start_seq?: number;355end_seq?: number;356timeout?: number;357maxWait?: number;358} = {}): AsyncGenerator<StoredMessage[], void, unknown> {359const sub = await this.socket.requestMany(null, {360headers: {361cmd: "getAll",362start_seq,363end_seq,364timeout,365} as any,366timeout,367maxWait,368});369for await (const { data, headers } of sub) {370if (headers?.error) {371throw new ConatError(`${headers.error}`, { code: headers.code });372}373if (data == null || this.socket.state == "closed") {374// done375return;376}377yield data;378}379}380381keys = async ({ timeout }: { timeout?: number } = {}): Promise<string[]> => {382return this.checkForError(383await this.socket.request(null, {384headers: { cmd: "keys", timeout } as any,385timeout,386}),387);388};389390sqlite = async ({391timeout,392statement,393params,394}: {395timeout?: number;396statement: string;397params?: any[];398}): Promise<any[]> => {399return this.checkForError(400await this.socket.request(null, {401headers: {402cmd: "sqlite",403statement,404params,405} as any,406timeout,407}),408);409};410411private checkForError = (mesg, noReturn = false) => {412if (mesg.headers != null) {413const { error, code } = mesg.headers;414if (error || code) {415throw new ConatError(error ?? "error", { code });416}417}418if (!noReturn) {419return mesg.data;420}421};422423// id of the remote server we're connected to424serverId = async () => {425return this.checkForError(426await this.socket.request(null, {427headers: { cmd: "serverId" },428}),429);430};431}432433export interface SetOptions {434messageData: MessageData;435key?: string;436ttl?: number;437previousSeq?: number;438msgID?: string;439timeout?: number;440}441442interface Options {443client: Client;444// who is accessing persistent storage445user: User;446// what storage they are accessing447storage: StorageOptions;448noCache?: boolean;449}450451export const stream = refCacheSync<Options, PersistStreamClient>({452name: "persistent-stream-client",453createKey: ({ user, storage, client }: Options) => {454return JSON.stringify([user, storage, client.id]);455},456createObject: ({ client, user, storage }: Options) => {457// avoid wasting server resources, etc., by always checking permissions client side first458assertHasWritePermission({ user, storage });459return new PersistStreamClient(client, storage, user);460},461});462463let permissionChecks = true;464export function disablePermissionCheck() {465if (!process.env.COCALC_TEST_MODE) {466throw Error("disabling permission check only allowed in test mode");467}468permissionChecks = false;469}470471const assertHasWritePermission = ({ user, storage }) => {472if (!permissionChecks) {473// should only be used for unit testing, since otherwise would474// make clients slower and possibly increase server load.475return;476}477const subject = persistSubject(user);478assertHasWritePermission0({ subject, path: storage.path });479};480481482