Path: blob/master/src/packages/conat/core/cluster.ts
1542 views
import { type Client, connect } from "./client";1import { Patterns } from "./patterns";2import {3updateInterest,4updateSticky,5type InterestUpdate,6type StickyUpdate,7} from "@cocalc/conat/core/server";8import type { DStream } from "@cocalc/conat/sync/dstream";9import { once } from "@cocalc/util/async-utils";10import { server as createPersistServer } from "@cocalc/conat/persist/server";11import { getLogger } from "@cocalc/conat/client";12import { hash_string } from "@cocalc/util/misc";13const CREATE_LINK_TIMEOUT = 45_000;1415const logger = getLogger("conat:core:cluster");1617export async function clusterLink(18address: string,19systemAccountPassword: string,20timeout = CREATE_LINK_TIMEOUT,21) {22const client = connect({ address, systemAccountPassword });23if (client.info == null) {24try {25await client.waitUntilSignedIn({26timeout: timeout ?? CREATE_LINK_TIMEOUT,27});28} catch (err) {29client.close();30throw err;31}32if (client.info == null) {33// this is impossible34throw Error("BUG -- failed to sign in");35}36}37const { id, clusterName } = client.info;38if (!id) {39throw Error("id must be specified");40}41if (!clusterName) {42throw Error("clusterName must be specified");43}44const link = new ClusterLink(client, id, clusterName, address);45await link.init();46return link;47}4849export type Sticky = { [pattern: string]: { [subject: string]: string } };50export type Interest = Patterns<{ [queue: string]: Set<string> }>;5152export { type ClusterLink };5354class ClusterLink {55public interest: Interest = new Patterns();56public sticky: Sticky = {};57private streams: ClusterStreams;58private state: "init" | "ready" | "closed" = "init";59private clientStateChanged = Date.now(); // when client status last changed6061constructor(62public readonly client: Client,63public readonly id: string,64public readonly clusterName: string,65public readonly address: string,66) {67if (!client) {68throw Error("client must be specified");69}70if (!clusterName) {71throw Error("clusterName must be specified");72}73if (!id) {74throw Error("id must be specified");75}76}7778init = async () => {79this.client.on("connected", this.handleClientStateChanged);80this.client.on("disconnected", this.handleClientStateChanged);81this.streams = await clusterStreams({82client: this.client,83id: this.id,84clusterName: this.clusterName,85});86for (const update of this.streams.interest.getAll()) {87updateInterest(update, this.interest, this.sticky);88}89for (const update of this.streams.sticky.getAll()) {90updateSticky(update, this.sticky);91}92// I have a slight concern about this because updates might not93// arrive in order during automatic failover. That said, maybe94// automatic failover doesn't matter with these streams, since95// it shouldn't really happen -- each stream is served from the server96// it is about, and when that server goes down none of this state97// matters anymore.98this.streams.interest.on("change", this.handleInterestUpdate);99this.streams.sticky.on("change", this.handleStickyUpdate);100this.state = "ready";101};102103isConnected = () => {104return this.client.state == "connected";105};106107handleInterestUpdate = (update: InterestUpdate) => {108updateInterest(update, this.interest, this.sticky);109};110111handleStickyUpdate = (update: StickyUpdate) => {112updateSticky(update, this.sticky);113};114115private handleClientStateChanged = () => {116this.clientStateChanged = Date.now();117};118119howLongDisconnected = () => {120if (this.isConnected()) {121return 0;122}123return Date.now() - this.clientStateChanged;124};125126close = () => {127if (this.state == "closed") {128return;129}130this.state = "closed";131this.client.removeListener("connected", this.handleClientStateChanged);132this.client.removeListener("disconnected", this.handleClientStateChanged);133if (this.streams != null) {134this.streams.interest.removeListener("change", this.handleInterestUpdate);135this.streams.interest.close();136this.streams.sticky.close();137// @ts-ignore138delete this.streams;139}140this.client.close();141// @ts-ignore142delete this.client;143};144145hasInterest = (subject) => {146return this.interest.hasMatch(subject);147};148149waitForInterest = async (150subject: string,151timeout: number,152signal?: AbortSignal,153) => {154const hasMatch = this.interest.hasMatch(subject);155156if (hasMatch || !timeout) {157// NOTE: we never return the actual matches, since this is a158// potential security vulnerability.159// it could make it very easy to figure out private inboxes, etc.160return hasMatch;161}162const start = Date.now();163while (this.state != "closed" && !signal?.aborted) {164if (Date.now() - start >= timeout) {165throw Error("timeout");166}167await once(this.interest, "change");168if ((this.state as any) == "closed" || signal?.aborted) {169return false;170}171const hasMatch = this.interest.hasMatch(subject);172if (hasMatch) {173return true;174}175}176177return false;178};179180hash = (): { interest: number; sticky: number } => {181return {182interest: hashInterest(this.interest),183sticky: hashSticky(this.sticky),184};185};186}187188function clusterStreamNames({189clusterName,190id,191}: {192clusterName: string;193id: string;194}) {195return {196interest: `cluster/${clusterName}/${id}/interest`,197sticky: `cluster/${clusterName}/${id}/sticky`,198};199}200201export function clusterService({202id,203clusterName,204}: {205id: string;206clusterName: string;207}) {208return `persist:${clusterName}:${id}`;209}210211export async function createClusterPersistServer({212client,213id,214clusterName,215}: {216client: Client;217id: string;218clusterName: string;219}) {220const service = clusterService({ clusterName, id });221logger.debug("createClusterPersistServer: ", { service });222return await createPersistServer({ client, service });223}224225export interface ClusterStreams {226interest: DStream<InterestUpdate>;227sticky: DStream<StickyUpdate>;228}229230export async function clusterStreams({231client,232clusterName,233id,234}: {235client: Client;236clusterName: string;237id: string;238}): Promise<ClusterStreams> {239logger.debug("clusterStream: ", { clusterName, id });240if (!clusterName) {241throw Error("clusterName must be set");242}243const names = clusterStreamNames({ clusterName, id });244const opts = {245service: clusterService({ clusterName, id }),246noCache: true,247ephemeral: true,248};249const interest = await client.sync.dstream<InterestUpdate>({250noInventory: true,251name: names.interest,252...opts,253});254const sticky = await client.sync.dstream<StickyUpdate>({255noInventory: true,256name: names.sticky,257...opts,258});259logger.debug("clusterStreams: got them", { clusterName });260return { interest, sticky };261}262263// Periodically delete not-necessary updates from the interest stream264export async function trimClusterStreams(265streams: ClusterStreams,266data: {267interest: Patterns<{ [queue: string]: Set<string> }>;268sticky: { [pattern: string]: { [subject: string]: string } };269links: { interest: Patterns<{ [queue: string]: Set<string> }> }[];270},271// don't delete anything that isn't at lest minAge ms old.272minAge: number,273): Promise<{ seqsInterest: number[]; seqsSticky: number[] }> {274const { interest, sticky } = streams;275// First deal with interst276// we iterate over the interest stream checking for subjects277// with no current interest at all; in such cases it is safe278// to purge them entirely from the stream.279const seqs: number[] = [];280const now = Date.now();281for (let n = 0; n < interest.length; n++) {282const time = interest.time(n);283if (time == null) continue;284if (now - time.valueOf() <= minAge) {285break;286}287const update = interest.get(n) as InterestUpdate;288if (!data.interest.hasPattern(update.subject)) {289const seq = interest.seq(n);290if (seq != null) {291seqs.push(seq);292}293}294}295if (seqs.length > 0) {296// [ ] todo -- add to interest.delete a version where it takes an array of sequence numbers297logger.debug("trimClusterStream: trimming interest", { seqs });298await interest.delete({ seqs });299logger.debug("trimClusterStream: successfully trimmed interest", { seqs });300}301302// Next deal with sticky -- trim ones where the pattern is no longer of interest.303// There could be other reasons to trim but it gets much trickier. This one is more304// obvious, except we have to check for any interest in the whole cluster, not305// just this node.306const seqs2: number[] = [];307function noInterest(pattern: string) {308if (data.interest.hasPattern(pattern)) {309return false;310}311for (const link of data.links) {312if (link.interest.hasPattern(pattern)) {313return false;314}315}316// nobody cares317return true;318}319for (let n = 0; n < sticky.length; n++) {320const time = sticky.time(n);321if (time == null) continue;322if (now - time.valueOf() <= minAge) {323break;324}325const update = sticky.get(n) as StickyUpdate;326if (noInterest(update.pattern)) {327const seq = sticky.seq(n);328if (seq != null) {329seqs2.push(seq);330}331}332}333if (seqs2.length > 0) {334// [ ] todo -- add to interest.delete a version where it takes an array of sequence numbers335logger.debug("trimClusterStream: trimming sticky", { seqs2 });336await sticky.delete({ seqs: seqs2 });337logger.debug("trimClusterStream: successfully trimmed sticky", { seqs2 });338}339340return { seqsInterest: seqs, seqsSticky: seqs2 };341}342343function hashSet(X: Set<string>): number {344let h = 0;345for (const a of X) {346h += hash_string(a); // integers, and not too many, so should commute347}348return h;349}350351function hashInterestValue(X: { [queue: string]: Set<string> }): number {352let h = 0;353for (const queue in X) {354h += hashSet(X[queue]); // integers, and not too many, so should commute355}356return h;357}358359export function hashInterest(360interest: Patterns<{ [queue: string]: Set<string> }>,361): number {362return interest.hash(hashInterestValue);363}364365export function hashSticky(sticky: Sticky): number {366let h = 0;367for (const pattern in sticky) {368h += hash_string(pattern);369const x = sticky[pattern];370for (const subject in x) {371h += hash_string(x[subject]);372}373}374return h;375}376377378