import EventEmitter from "node:events";
import fs from "node:fs";
import { join } from "node:path";
import { FileSystemClient } from "@cocalc/sync-client/lib/client-fs";
import { execute_code, uuidsha1 } from "@cocalc/backend/misc_node";
import { CoCalcSocket } from "@cocalc/backend/tcp/enable-messaging-protocol";
import type { SyncDoc } from "@cocalc/sync/editor/generic/sync-doc";
import type { ProjectClient as ProjectClientInterface } from "@cocalc/sync/editor/generic/types";
import { SyncString } from "@cocalc/sync/editor/string/sync";
import * as synctable2 from "@cocalc/sync/table";
import { callback2 } from "@cocalc/util/async-utils";
import { PROJECT_HUB_HEARTBEAT_INTERVAL_S } from "@cocalc/util/heartbeat";
import * as message from "@cocalc/util/message";
import * as misc from "@cocalc/util/misc";
import type { CB } from "@cocalc/util/types/callback";
import type { ExecuteCodeOptionsWithCallback } from "@cocalc/util/types/execute-code";
import * as blobs from "./blobs";
import { json } from "./common";
import * as data from "./data";
import initJupyter from "./jupyter/init";
import * as kucalc from "./kucalc";
import { getLogger } from "./logger";
import * as sage_session from "./sage_session";
import synctable_conat from "@cocalc/project/conat/synctable";
import pubsub from "@cocalc/project/conat/pubsub";
import type { ConatSyncTableFunction } from "@cocalc/conat/sync/synctable";
import {
callConatService,
createConatService,
type CallConatServiceFunction,
type CreateConatServiceFunction,
} from "@cocalc/conat/service";
import { connectToConat } from "./conat/connection";
import { getSyncDoc } from "@cocalc/project/conat/open-files";
import { isDeleted } from "@cocalc/project/conat/listings";
const winston = getLogger("client");
const HOME = process.env.HOME ?? "/home/user";
let DEBUG = !!kucalc.IN_KUCALC;
export function initDEBUG() {
if (DEBUG) {
return;
}
const DEBUG_FILE = join(HOME, ".smc-DEBUG");
fs.access(DEBUG_FILE, (err) => {
if (err) {
winston.info(
"create this file to enable very verbose debugging:",
DEBUG_FILE,
);
return;
} else {
DEBUG = true;
}
winston.info(`DEBUG = ${DEBUG}`);
});
}
let client: Client | null = null;
export function init() {
if (client != null) {
return client;
}
client = new Client();
return client;
}
export function getClient(): Client {
if (client == null) {
init();
}
if (client == null) {
throw Error("BUG: Client not initialized!");
}
return client;
}
let ALREADY_CREATED = false;
type HubCB = CB<any, { event: "error"; error?: string }>;
export class Client extends EventEmitter implements ProjectClientInterface {
public readonly project_id: string;
private _connected: boolean;
private _hub_callbacks: {
[key: string]: HubCB;
};
private _hub_client_sockets: {
[id: string]: {
socket: CoCalcSocket;
callbacks?: { [id: string]: HubCB | CB<any, string> };
activity: Date;
};
};
private _changefeed_sockets: any;
private _open_syncstrings?: { [key: string]: SyncString };
dbg = (f: string) => {
if (DEBUG && winston) {
return (...m) => {
return winston.debug(`Client.${f}`, ...m);
};
} else {
return function (..._) {};
}
};
private filesystemClient = new FileSystemClient();
write_file = this.filesystemClient.write_file;
path_read = this.filesystemClient.path_read;
path_stat = this.filesystemClient.path_stat;
file_size_async = this.filesystemClient.file_size_async;
file_stat_async = this.filesystemClient.file_stat_async;
watch_file = this.filesystemClient.watch_file;
constructor() {
super();
if (ALREADY_CREATED) {
throw Error("BUG: Client already created!");
}
ALREADY_CREATED = true;
if (process.env.HOME != null) {
process.chdir(process.env.HOME);
}
this.project_id = data.project_id;
this.dbg("constructor")();
this.setMaxListeners(300);
this._hub_callbacks = {};
this._hub_client_sockets = {};
this._changefeed_sockets = {};
this._connected = false;
if (kucalc.IN_KUCALC) {
kucalc.init(this);
}
misc.bind_methods(this);
initJupyter();
}
public alert_message({
type = "default",
title,
message,
}: {
type?: "default";
title?: string;
message: string;
block?: boolean;
timeout?: number;
}): void {
this.dbg("alert_message")(type, title, message);
}
public close(): void {
if (this._open_syncstrings != null) {
const object = misc.keys(this._open_syncstrings);
for (let _ in object) {
const s = this._open_syncstrings[_];
s.close();
}
delete this._open_syncstrings;
}
}
public client_id(): string {
return this.project_id;
}
public get_project_id(): string {
return this.project_id;
}
public is_project(): boolean {
return true;
}
public is_browser(): boolean {
return false;
}
public is_compute_server(): boolean {
return false;
}
public is_user(): boolean {
return false;
}
public is_signed_in(): boolean {
return true;
}
public is_connected(): boolean {
return this._connected;
}
public server_time(): Date {
return new Date();
}
public active_socket(socket: CoCalcSocket): void {
const dbg = this.dbg(
`active_socket(id=${socket.id},ip='${socket.remoteAddress}')`,
);
let x = this._hub_client_sockets[socket.id];
if (x == null) {
dbg();
x = this._hub_client_sockets[socket.id] = {
socket,
callbacks: {},
activity: new Date(),
};
let heartbeat_interval: ReturnType<typeof setInterval> | undefined =
undefined;
const socket_end = (): void => {
if (heartbeat_interval == null) {
return;
}
dbg("ending socket");
clearInterval(heartbeat_interval);
heartbeat_interval = undefined;
if (x.callbacks != null) {
for (const id in x.callbacks) {
const cb = x.callbacks[id] as CB<any, string>;
cb?.("socket closed");
}
delete x.callbacks;
}
delete this._hub_client_sockets[socket.id];
dbg(
`number of active sockets now equals ${misc.len(
this._hub_client_sockets,
)}`,
);
if (misc.len(this._hub_client_sockets) === 0) {
this._connected = false;
dbg("lost all active sockets");
this.emit("disconnected");
}
socket.end();
socket.destroy();
};
socket.on("end", socket_end);
socket.on("error", socket_end);
const check_heartbeat = (): void => {
if (
socket.heartbeat == null ||
Date.now() - socket.heartbeat.getTime() >=
1.5 * PROJECT_HUB_HEARTBEAT_INTERVAL_S * 1000
) {
dbg("heartbeat failed");
socket_end();
} else {
dbg("heartbeat -- socket is working");
}
};
heartbeat_interval = setInterval(
check_heartbeat,
1.5 * PROJECT_HUB_HEARTBEAT_INTERVAL_S * 1000,
);
if (misc.len(this._hub_client_sockets) >= 1) {
dbg("CONNECTED!");
this._connected = true;
this.emit("connected");
}
} else {
x.activity = new Date();
}
}
public handle_mesg(mesg, socket) {
const dbg = this.dbg(`handle_mesg(${misc.trunc_middle(json(mesg), 512)})`);
const f = this._hub_callbacks[mesg.id];
if (f != null) {
dbg("calling callback");
if (!mesg.multi_response) {
delete this._hub_callbacks[mesg.id];
delete this._hub_client_sockets[socket.id].callbacks?.[mesg.id];
}
try {
f(mesg);
} catch (err) {
dbg(`WARNING: error handling message from client. -- ${err}`);
}
return true;
} else {
dbg("no callback");
return false;
}
}
public get_hub_socket() {
const socket_ids = misc.keys(this._hub_client_sockets);
this.dbg("get_hub_socket")(
`there are ${socket_ids.length} sockets -- ${JSON.stringify(socket_ids)}`,
);
if (socket_ids.length === 0) {
return;
}
return this._hub_client_sockets[misc.random_choice(socket_ids)].socket;
}
public call(opts: {
message: any;
timeout?: number;
socket?: CoCalcSocket;
cb?: CB<any, string>;
}) {
const dbg = this.dbg(`call(message=${json(opts.message)})`);
dbg();
const socket =
opts.socket != null ? opts.socket : (opts.socket = this.get_hub_socket());
if (socket == null) {
dbg("no sockets");
opts.cb?.("no hubs currently connected to this project");
return;
}
if (opts.cb != null) {
let timer;
if (opts.timeout) {
dbg("configure timeout");
const fail = () => {
dbg("failed");
delete this._hub_callbacks[opts.message.id];
opts.cb?.(`timeout after ${opts.timeout}s`);
delete opts.cb;
};
timer = setTimeout(fail, opts.timeout * 1000);
}
if (opts.message.id == null) {
opts.message.id = misc.uuid();
}
const cb = (this._hub_callbacks[opts.message.id] = (resp) => {
if (timer != null) {
clearTimeout(timer);
timer = undefined;
}
if (resp?.event === "error") {
opts.cb?.(resp.error ? resp.error : "error");
} else {
opts.cb?.(undefined, resp);
}
});
const callbacks = this._hub_client_sockets[socket.id].callbacks;
if (callbacks != null) {
callbacks[opts.message.id] = cb;
}
}
return socket.write_mesg("json", opts.message);
}
public query({
query,
options,
changes,
timeout = 30,
cb,
}: {
query: any;
options?: { [key: string]: any }[];
changes?: boolean;
timeout: number;
cb: CB<any, string>;
}) {
if (options != null && !misc.is_array(options)) {
throw Error("options must be an array");
return;
}
const mesg = message.query({
id: misc.uuid(),
query,
options,
changes,
multi_response: changes,
});
const socket = this.get_hub_socket();
if (socket == null) {
cb("no hub socket available");
return;
}
if (changes) {
this._changefeed_sockets[mesg.id] = socket;
socket.on("error", () => {
cb("socket-end");
});
socket.on("end", () => {
cb("socket-end");
});
}
return this.call({
message: mesg,
timeout,
socket,
cb,
});
}
private _query_cancel(opts: { id: string; cb?: CB }) {
const socket = this._changefeed_sockets[opts.id];
if (socket == null) {
return opts.cb?.();
} else {
return this.call({
message: message.query_cancel({ id: opts.id }),
timeout: 30,
socket,
cb: opts.cb,
});
}
}
public async query_cancel(id) {
return await callback2(this._query_cancel, { id });
}
public sync_table(query, options?: any, throttle_changes = undefined) {
return synctable2.synctable(query, options, this, throttle_changes);
}
conat = () => connectToConat();
synctable_conat: ConatSyncTableFunction = async (query, options?) => {
return await synctable_conat(query, options);
};
pubsub_conat = async ({ path, name }: { path?: string; name: string }) => {
return await pubsub({ path, name });
};
callConatService: CallConatServiceFunction = async (options) => {
return await callConatService(options);
};
createConatService: CreateConatServiceFunction = (options) => {
return createConatService({
...options,
project_id: this.project_id,
});
};
syncdoc = ({ path }: { path: string }): SyncDoc | undefined => {
return getSyncDoc(path);
};
public path_access(opts: { path: string; mode: string; cb: CB }): void {
let access = 0;
for (let s of opts.mode) {
access |= fs[s.toUpperCase() + "_OK"];
}
return fs.access(opts.path, access, opts.cb);
}
public path_exists(opts: { path: string; cb: CB }) {
const dbg = this.dbg(`checking if path (='${opts.path}') exists`);
dbg();
return fs.exists(opts.path, (exists) => {
dbg(`returned ${exists}`);
opts.cb(undefined, exists);
});
}
public file_size(opts: { filename: string; cb: CB }): void {
this.path_stat({
path: opts.filename,
cb: (err, stat) => {
opts.cb(err, stat?.size);
},
});
}
public shell(opts: ExecuteCodeOptionsWithCallback): void {
execute_code(opts);
}
public sage_session({
path,
}: {
path: string;
}): sage_session.SageSessionType {
return sage_session.sage_session({ path, client: this });
}
public save_blob({
blob,
sha1,
uuid: optsUUID,
cb,
}: {
blob: Buffer;
sha1?: string;
uuid?: string;
cb?: (err: string | undefined, resp?: any) => void;
}) {
const uuid = optsUUID ?? uuidsha1(blob, sha1);
const dbg = this.dbg(`save_blob(uuid='${uuid}')`);
const hub = this.get_hub_socket();
if (hub == null) {
dbg("fail -- no global hubs");
cb?.(
"no global hubs are connected to the local hub, so nowhere to send file",
);
return;
}
dbg("sending blob mesg");
hub.write_mesg("blob", { uuid, blob });
dbg("waiting for response");
blobs.receive_save_blob_message({
sha1: uuid,
cb: (resp): void => {
if (resp?.error) {
dbg(`fail -- '${resp.error}'`);
cb?.(resp.error, resp);
} else {
dbg("success");
cb?.(undefined, resp);
}
},
});
}
public get_blob(opts: {
blob: Buffer;
sha1?: string;
uuid?: string;
cb?: (err: string) => void;
}) {
const dbg = this.dbg("get_blob");
dbg(opts.sha1);
opts.cb?.("get_blob: not implemented");
}
touch_project(_project_id: string, _compute_server_id?: number) {}
public is_deleted(
filename: string,
_project_id: string,
): boolean | undefined {
return isDeleted(filename);
}
public async set_deleted(
_filename: string,
_project_id?: string,
): Promise<void> {
this.dbg("set_deleted: DEPRECATED");
}
}