Path: blob/master/src/packages/conat/service/service.ts
1452 views
/*1Simple to use UI to connect anything in cocalc via request/reply services.23- callConatService4- createConatService56The input is basically where the service is (account, project, public),7and either what message to send or how to handle messages.8Also if the handler throws an error, the caller will throw9an error too.10*/1112import { type Location } from "@cocalc/conat/types";13import { conat, getLogger } from "@cocalc/conat/client";14import { randomId } from "@cocalc/conat/names";15import { EventEmitter } from "events";16import { encodeBase64 } from "@cocalc/conat/util";17import { type Client } from "@cocalc/conat/core/client";18import { until } from "@cocalc/util/async-utils";1920const DEFAULT_TIMEOUT = 10 * 1000;2122const logger = getLogger("conat:service");2324export interface ServiceDescription extends Location {25service: string;2627description?: string;2829// if true and multiple servers are setup in same "location", then they ALL get to respond (sender gets first response).30all?: boolean;3132// DEFAULT: ENABLE_SERVICE_FRAMEWORK33enableServiceFramework?: boolean;3435subject?: string;36}3738export interface ServiceCall extends ServiceDescription {39mesg: any;40timeout?: number;4142// if it fails with error.code 503, we wait for service to be ready and try again,43// unless this is set -- e.g., when waiting for the service in the first44// place we set this to avoid an infinite loop.45// This now just uses the waitForInterest option to request.46noRetry?: boolean;4748client?: Client;49}5051export async function callConatService(opts: ServiceCall): Promise<any> {52// console.log("callConatService", opts);53const cn = opts.client ?? (await conat());54const subject = serviceSubject(opts);55let resp;56const timeout = opts.timeout ?? DEFAULT_TIMEOUT;57// ensure not undefined, since undefined can't be published.58const data = opts.mesg ?? null;5960const doRequest = async () => {61resp = await cn.request(subject, data, {62timeout,63waitForInterest: !opts.noRetry,64});65const result = resp.data;66if (result?.error) {67throw Error(result.error);68}69return result;70};71return await doRequest();72}7374export type CallConatServiceFunction = typeof callConatService;7576export interface Options extends ServiceDescription {77description?: string;78version?: string;79handler: (mesg) => Promise<any>;80client?: Client;81}8283export function createConatService(options: Options) {84return new ConatService(options);85}8687export type CreateConatServiceFunction = typeof createConatService;8889export function serviceSubject({90service,9192account_id,93browser_id,9495project_id,96compute_server_id,9798path,99100subject,101}: ServiceDescription): string {102if (subject) {103return subject;104}105let segments;106path = path ? encodeBase64(path) : "_";107if (!project_id && !account_id) {108segments = ["public", service];109} else if (account_id) {110segments = [111"services",112`account-${account_id}`,113browser_id ?? "_",114project_id ?? "_",115path ?? "_",116service,117];118} else if (project_id) {119segments = [120"services",121`project-${project_id}`,122compute_server_id ?? "_",123service,124path,125];126}127return segments.join(".");128}129130export function serviceName({131service,132133account_id,134browser_id,135136project_id,137compute_server_id,138}: ServiceDescription): string {139let segments;140if (!project_id && !account_id) {141segments = [service];142} else if (account_id) {143segments = [`account-${account_id}`, browser_id ?? "-", service];144} else if (project_id) {145segments = [`project-${project_id}`, compute_server_id ?? "-", service];146}147return segments.join("-");148}149150export function serviceDescription({151description,152path,153}: ServiceDescription): string {154return [description, path ? `\nPath: ${path}` : ""].join("");155}156157export class ConatService extends EventEmitter {158private options: Options;159public readonly subject: string;160public readonly name: string;161private sub?;162163constructor(options: Options) {164super();165this.options = options;166this.name = serviceName(this.options);167this.subject = serviceSubject(options);168this.runService();169}170171private log = (...args) => {172logger.debug(`service:subject='${this.subject}' -- `, ...args);173};174175// create and run the service until something goes wrong, when this176// willl return. It does not throw an error.177private runService = async () => {178this.emit("starting");179this.log("starting service", {180name: this.name,181description: this.options.description,182version: this.options.version,183});184const cn = this.options.client ?? (await conat());185const queue = this.options.all ? randomId() : "0";186// service=true so upon disconnect the socketio backend server187// immediately stops routing traffic to this.188this.sub = await cn.subscribe(this.subject, { queue });189this.emit("running");190await this.listen();191};192193private listen = async () => {194for await (const mesg of this.sub) {195const request = mesg.data ?? {};196197// console.logger.debug("handle conat service call", request);198let resp;199if (request == "ping") {200resp = "pong";201} else {202try {203resp = await this.options.handler(request);204} catch (err) {205resp = { error: `${err}` };206}207}208try {209await mesg.respond(resp);210} catch (err) {211const data = { error: `${err}` };212await mesg.respond(data);213}214}215};216217close = () => {218if (!this.subject) {219return;220}221this.emit("closed");222this.removeAllListeners();223this.sub?.stop();224delete this.sub;225// @ts-ignore226delete this.subject;227// @ts-ignore228delete this.options;229};230}231232interface ServiceClientOpts {233options: ServiceDescription;234maxWait?: number;235id?: string;236}237238export async function pingConatService({239options,240maxWait = 3000,241}: ServiceClientOpts): Promise<string[]> {242const pong = await callConatService({243...options,244mesg: "ping",245timeout: Math.max(3000, maxWait),246// set no-retry to avoid infinite loop247noRetry: true,248});249return [pong];250}251252// NOTE: anything that has to rely on waitForConatService should253// likely be rewritten differently...254export async function waitForConatService({255options,256maxWait = 60000,257}: {258options: ServiceDescription;259maxWait?: number;260}) {261let ping: string[] = [];262let pingMaxWait = 250;263await until(264async () => {265pingMaxWait = Math.min(3000, pingMaxWait * 1.4);266try {267ping = await pingConatService({ options, maxWait: pingMaxWait });268return ping.length > 0;269} catch {270return false;271}272},273{274start: 1000,275max: 10000,276decay: 1.3,277timeout: maxWait,278},279);280return ping;281}282283284