Path: blob/master/src/packages/frontend/conat/client.ts
1503 views
import { redux } from "@cocalc/frontend/app-framework";1import type { WebappClient } from "@cocalc/frontend/client/client";2import { reuseInFlight } from "@cocalc/util/reuse-in-flight";3import {4type ConatSyncTable,5ConatSyncTableFunction,6} from "@cocalc/conat/sync/synctable";7import { randomId, inboxPrefix } from "@cocalc/conat/names";8import { projectSubject } from "@cocalc/conat/names";9import { parseQueryWithOptions } from "@cocalc/sync/table/util";10import { type HubApi, initHubApi } from "@cocalc/conat/hub/api";11import { type ProjectApi, initProjectApi } from "@cocalc/conat/project/api";12import { isValidUUID } from "@cocalc/util/misc";13import { createOpenFiles, OpenFiles } from "@cocalc/conat/sync/open-files";14import { PubSub } from "@cocalc/conat/sync/pubsub";15import type { ChatOptions } from "@cocalc/util/types/llm";16import { dkv } from "@cocalc/conat/sync/dkv";17import { akv } from "@cocalc/conat/sync/akv";18import { astream } from "@cocalc/conat/sync/astream";19import { dko } from "@cocalc/conat/sync/dko";20import { dstream } from "@cocalc/conat/sync/dstream";21import { callConatService, createConatService } from "@cocalc/conat/service";22import type {23CallConatServiceFunction,24CreateConatServiceFunction,25} from "@cocalc/conat/service";26import { listingsClient } from "@cocalc/conat/service/listings";27import getTime, { getSkew, init as initTime } from "@cocalc/conat/time";28import { llm } from "@cocalc/conat/llm/client";29import { inventory } from "@cocalc/conat/sync/inventory";30import { EventEmitter } from "events";31import {32getClient as getClientWithState,33setConatClient,34type ClientWithState,35} from "@cocalc/conat/client";36import Cookies from "js-cookie";37import { ACCOUNT_ID_COOKIE } from "@cocalc/frontend/client/client";38import { info as refCacheInfo } from "@cocalc/util/refcache";39import { connect as connectToConat } from "@cocalc/conat/core/client";40import type { ConnectionStats } from "@cocalc/conat/core/types";41import { appBasePath } from "@cocalc/frontend/customize/app-base-path";42import { until } from "@cocalc/util/async-utils";43import { delay } from "awaiting";44import {45deleteRememberMe,46setRememberMe,47} from "@cocalc/frontend/misc/remember-me";4849export interface ConatConnectionStatus {50state: "connected" | "disconnected";51reason: string;52details: any;53stats: ConnectionStats;54}5556const DEFAULT_TIMEOUT = 15000;5758const DEBUG = false;5960export class ConatClient extends EventEmitter {61client: WebappClient;62public hub: HubApi;63public sessionId = randomId();64private openFilesCache: { [project_id: string]: OpenFiles } = {};65private clientWithState: ClientWithState;66private _conatClient: null | ReturnType<typeof connectToConat>;67public numConnectionAttempts = 0;68private automaticallyReconnect;6970constructor(client: WebappClient) {71super();72this.setMaxListeners(100);73this.client = client;74this.hub = initHubApi(this.callHub);75this.initConatClient();76this.on("state", (state) => {77this.emit(state);78});79}8081private setConnectionStatus = (status: Partial<ConatConnectionStatus>) => {82const actions = redux?.getActions("page");83const store = redux?.getStore("page");84if (actions == null || store == null) {85return;86}87const cur = store.get("conat")?.toJS();88actions.setState({ conat: { ...cur, ...status } } as any);89};9091conat = () => {92if (this._conatClient == null) {93this.startStatsReporter();94const address = location.origin + appBasePath;95this._conatClient = connectToConat({96address,97inboxPrefix: inboxPrefix({ account_id: this.client.account_id }),98// it is necessary to manually managed reconnects due to a bugs99// in socketio that has stumped their devs100// -- https://github.com/socketio/socket.io/issues/5197101reconnection: false,102});103this._conatClient.on("connected", () => {104this.setConnectionStatus({105state: "connected",106reason: "",107details: "",108stats: this._conatClient?.stats,109});110this.client.emit("connected");111this.automaticallyReconnect = true;112});113this._conatClient.on("disconnected", (reason, details) => {114this.setConnectionStatus({115state: "disconnected",116reason,117details,118stats: this._conatClient?.stats,119});120this.client.emit("disconnected", "offline");121if (this.automaticallyReconnect) {122setTimeout(this.connect, 1000);123}124});125this._conatClient.conn.io.on("reconnect_attempt", (attempt) => {126this.numConnectionAttempts = attempt;127this.client.emit("connecting");128});129}130return this._conatClient!;131};132133private permanentlyDisconnected = false;134permanentlyDisconnect = () => {135this.permanentlyDisconnected = true;136this.standby();137};138139is_signed_in = (): boolean => {140return !!this._conatClient?.info?.user?.account_id;141};142143is_connected = (): boolean => {144return !!this._conatClient?.conn?.connected;145};146147private startStatsReporter = async () => {148while (true) {149if (this._conatClient != null) {150this.setConnectionStatus({ stats: this._conatClient?.stats });151}152await delay(5000);153}154};155156private initConatClient = async () => {157setConatClient({158account_id: this.client.account_id,159conat: this.conat,160reconnect: async () => this.reconnect(),161getLogger: DEBUG162? (name) => {163return {164info: (...args) => console.info(name, ...args),165debug: (...args) => console.log(name, ...args),166warn: (...args) => console.warn(name, ...args),167silly: (...args) => console.log(name, ...args),168};169}170: undefined,171});172this.clientWithState = getClientWithState();173this.clientWithState.on("state", (state) => {174if (state != "closed") {175this.emit(state);176}177});178initTime();179const client = this.conat();180client.on("info", (info) => {181if (client.info?.user?.account_id) {182console.log("Connected as ", JSON.stringify(client.info?.user));183this.signedIn({184account_id: info.user.account_id,185hub: info.id ?? "",186});187const cookie = Cookies.get(ACCOUNT_ID_COOKIE);188if (cookie && cookie != client.info.user.account_id) {189// make sure account_id cookie is set to the actual account we're190// signed in as, then refresh since some things are going to be191// broken otherwise. To test this use dev tools and just change the account_id192// cookies value to something random.193Cookies.set(ACCOUNT_ID_COOKIE, client.info.user.account_id);194// and we're out of here:195location.reload();196}197} else {198console.log("Sign in failed -- ", client.info);199this.signInFailed(client.info?.user?.error ?? "Failed to sign in.");200this.client.alert_message({201type: "error",202message: "You must sign in.",203block: true,204});205this.standby();206}207});208};209210public signedInMessage?: { account_id: string; hub: string };211private signedIn = (mesg: { account_id: string; hub: string }) => {212this.signedInMessage = mesg;213this.client.account_id = mesg.account_id;214setRememberMe(appBasePath);215this.client.emit("signed_in", mesg);216};217218private signInFailed = (error) => {219deleteRememberMe(appBasePath);220this.client.emit("remember_me_failed", { error });221};222223reconnect = () => {224this._conatClient?.conn.io.engine.close();225this.resume();226};227228// if there is a connection, put it in standby229standby = () => {230// @ts-ignore231this.automaticallyReconnect = false;232this._conatClient?.disconnect();233};234235// if there is a connection, resume it236resume = () => {237this.connect();238};239240// keep trying until connected.241connect = reuseInFlight(async () => {242let attempts = 0;243await until(244async () => {245if (this.permanentlyDisconnected) {246console.log(247"Not connecting -- client is permanently disconnected and must refresh their browser",248);249return true;250}251if (this._conatClient == null) {252this.conat();253}254if (this._conatClient?.conn?.connected) {255return true;256}257this._conatClient?.disconnect();258await delay(750);259await waitForOnline();260attempts += 1;261console.log(262`Connecting to ${this._conatClient?.options.address}: attempts ${attempts}`,263);264this._conatClient?.conn.io.connect();265return false;266},267{ min: 3000, max: 15000 },268);269});270271callConatService: CallConatServiceFunction = async (options) => {272return await callConatService(options);273};274275createConatService: CreateConatServiceFunction = (options) => {276return createConatService(options);277};278279projectWebsocketApi = async ({280project_id,281compute_server_id,282mesg,283timeout = DEFAULT_TIMEOUT,284}) => {285const cn = this.conat();286const subject = projectSubject({287project_id,288compute_server_id,289service: "browser-api",290});291const resp = await cn.request(subject, mesg, {292timeout,293waitForInterest: true,294});295return resp.data;296};297298private callHub = async ({299service = "api",300name,301args = [],302timeout = DEFAULT_TIMEOUT,303}: {304service?: string;305name: string;306args: any[];307timeout?: number;308}) => {309const cn = this.conat();310const subject = `hub.account.${this.client.account_id}.${service}`;311try {312const data = { name, args };313const resp = await cn.request(subject, data, { timeout });314return resp.data;315} catch (err) {316err.message = `${err.message} - callHub: subject='${subject}', name='${name}', `;317throw err;318}319};320321// Returns api for RPC calls to the project with typescript support!322// if compute_server_id is NOT given then:323// if path is given use compute server id for path (assuming mapping is loaded)324// if path is not given, use current project default325projectApi = ({326project_id,327compute_server_id,328path,329timeout = DEFAULT_TIMEOUT,330}: {331project_id: string;332path?: string;333compute_server_id?: number;334// IMPORTANT: this timeout is only AFTER user is connected.335timeout?: number;336}): ProjectApi => {337if (!isValidUUID(project_id)) {338throw Error(`project_id = '${project_id}' must be a valid uuid`);339}340if (compute_server_id == null) {341const actions = redux.getProjectActions(project_id);342if (path != null) {343compute_server_id =344actions.getComputeServerIdForFile({ path }) ??345actions.getComputeServerId();346} else {347compute_server_id = actions.getComputeServerId();348}349}350const callProjectApi = async ({ name, args }) => {351return await this.callProject({352project_id,353compute_server_id,354timeout,355service: "api",356name,357args,358});359};360return initProjectApi(callProjectApi);361};362363private callProject = async ({364service = "api",365project_id,366compute_server_id,367name,368args = [],369timeout = DEFAULT_TIMEOUT,370}: {371service?: string;372project_id: string;373compute_server_id?: number;374name: string;375args: any[];376timeout?: number;377}) => {378const cn = this.conat();379const subject = projectSubject({ project_id, compute_server_id, service });380const resp = await cn.request(381subject,382{ name, args },383// we use waitForInterest because often the project hasn't384// quite fully started.385{ timeout, waitForInterest: true },386);387return resp.data;388};389390synctable: ConatSyncTableFunction = async (391query0,392options?,393): Promise<ConatSyncTable> => {394const { query, table } = parseQueryWithOptions(query0, options);395if (options?.project_id != null && query[table][0]["project_id"] === null) {396query[table][0]["project_id"] = options.project_id;397}398return await this.conat().sync.synctable({399...options,400query,401account_id: this.client.account_id,402});403};404405primus = ({406project_id,407compute_server_id = 0,408channel,409}: {410project_id: string;411compute_server_id?: number;412channel?: string;413}) => {414let subject = projectSubject({415project_id,416compute_server_id,417service: "primus",418});419if (channel) {420subject += "." + channel;421}422return this.conat().socket.connect(subject, {423desc: `primus-${channel ?? ""}`,424});425};426427openFiles = reuseInFlight(async (project_id: string) => {428if (this.openFilesCache[project_id] == null) {429const openFiles = await createOpenFiles({430project_id,431});432this.openFilesCache[project_id] = openFiles;433openFiles.on("closed", () => {434delete this.openFilesCache[project_id];435});436openFiles.on("change", (entry) => {437if (entry.deleted?.deleted) {438setDeleted({439project_id,440path: entry.path,441deleted: entry.deleted.time,442});443} else {444setNotDeleted({ project_id, path: entry.path });445}446});447const recentlyDeletedPaths: any = {};448for (const { path, deleted } of openFiles.getAll()) {449if (deleted?.deleted) {450recentlyDeletedPaths[path] = deleted.time;451}452}453const store = redux.getProjectStore(project_id);454store.setState({ recentlyDeletedPaths });455}456return this.openFilesCache[project_id]!;457});458459closeOpenFiles = (project_id) => {460this.openFilesCache[project_id]?.close();461};462463pubsub = async ({464project_id,465path,466name,467}: {468project_id: string;469path?: string;470name: string;471}) => {472return new PubSub({ client: this.conat(), project_id, path, name });473};474475// Evaluate an llm. This streams the result if stream is given an option,476// AND it also always returns the result.477llm = async (opts: ChatOptions): Promise<string> => {478return await llm({ account_id: this.client.account_id, ...opts });479};480481dstream = dstream;482astream = astream;483dkv = dkv;484akv = akv;485dko = dko;486487listings = async (opts: {488project_id: string;489compute_server_id?: number;490}) => {491return await listingsClient(opts);492};493494getTime = (): number => {495return getTime();496};497498getSkew = async (): Promise<number> => {499return await getSkew();500};501502inventory = async (location: {503account_id?: string;504project_id?: string;505}) => {506const inv = await inventory(location);507// @ts-ignore508if (console.log_original != null) {509const ls_orig = inv.ls;510// @ts-ignore511inv.ls = (opts) => ls_orig({ ...opts, log: console.log_original });512}513return inv;514};515516refCacheInfo = () => refCacheInfo();517}518519function setDeleted({ project_id, path, deleted }) {520if (!redux.hasProjectStore(project_id)) {521return;522}523const actions = redux.getProjectActions(project_id);524actions.setRecentlyDeleted(path, deleted);525}526527function setNotDeleted({ project_id, path }) {528if (!redux.hasProjectStore(project_id)) {529return;530}531const actions = redux.getProjectActions(project_id);532actions?.setRecentlyDeleted(path, 0);533}534535async function waitForOnline(): Promise<void> {536if (navigator.onLine) return;537await new Promise<void>((resolve) => {538const handler = () => {539window.removeEventListener("online", handler);540resolve();541};542window.addEventListener("online", handler);543});544}545546547