Path: blob/master/src/packages/project/project-status/server.ts
1447 views
/*1* This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.2* License: MS-RSL – see LICENSE.md for details3*/45/*6Project status server, doing the heavy lifting of telling the client7what's going on in the project, especially if there is a problem.89Under the hood, it subscribes to the ProjectInfoServer, which updates10various statistics at a high-frequency. Therefore, this here filters11that information to a low-frequency low-volume stream of important12status updates.1314Hence in particular, information like cpu, memory and disk are smoothed out and throttled.15*/1617import { getLogger } from "@cocalc/project/logger";18import { how_long_ago_m, round1 } from "@cocalc/util/misc";19import { version as smcVersion } from "@cocalc/util/smc-version";20import { delay } from "awaiting";21import { EventEmitter } from "events";22import { isEqual } from "lodash";23import { get_ProjectInfoServer, ProjectInfoServer } from "../project-info";24import { ProjectInfo } from "@cocalc/util/types/project-info/types";25import {26ALERT_DISK_FREE,27ALERT_HIGH_PCT /* ALERT_MEDIUM_PCT */,28RAISE_ALERT_AFTER_MIN,29STATUS_UPDATES_INTERVAL_S,30} from "@cocalc/comm/project-status/const";31import {32Alert,33AlertType,34ComponentName,35ProjectStatus,36} from "@cocalc/comm/project-status/types";37import { cgroup_stats } from "@cocalc/comm/project-status/utils";38import { createPublisher } from "@cocalc/conat/project/project-status";39import { compute_server_id, project_id } from "@cocalc/project/data";4041// TODO: only return the "next" value, if it is significantly different from "prev"42//function threshold(prev?: number, next?: number): number | undefined {43// return next;44//}4546const logger = getLogger("project-status:server");4748function quantize(val, order) {49const q = Math.round(Math.pow(10, order));50return Math.round(q * Math.ceil(val / q));51}5253// tracks, when for the first time we saw an elevated value54// we clear it if we're below a threshold (in the clear)55interface Elevated {56cpu: number | null; // timestamps57memory: number | null; // timestamps58disk: number | null; // timestamps59}6061export class ProjectStatusServer extends EventEmitter {62private readonly dbg: Function;63private running = false;64private readonly testing: boolean;65private readonly project_info: ProjectInfoServer;66private info?: ProjectInfo;67private status?: ProjectStatus;68private last?: ProjectStatus;69private elevated: Elevated = {70cpu: null,71disk: null,72memory: null,73};74private elevated_cpu_procs: { [pid: string]: number } = {};75private disk_mb?: number;76private cpu_pct?: number;77private cpu_tot?: number; // total time in seconds78private mem_pct?: number;79private mem_rss?: number;80private mem_tot?: number;81private components: { [name in ComponentName]?: number | undefined } = {};82private lastEmit: number = 0; // timestamp, when status was emitted last8384constructor(testing = false) {85super();86this.testing = testing;87this.dbg = (...msg) => logger.debug(...msg);88this.project_info = get_ProjectInfoServer();89}9091private async init(): Promise<void> {92this.project_info.start();93this.project_info.on("info", (info) => {94//this.dbg(`got info timestamp=${info.timestamp}`);95this.info = info;96this.update();97this.emitInfo();98});99}100101// checks if there the current state (after update()) should be emitted102private emitInfo(): void {103if (this.lastEmit === 0) {104this.dbg("emitInfo[last=0]", this.status);105this.doEmit();106return;107}108109// if alert changed, emit immediately110if (!isEqual(this.last?.alerts, this.status?.alerts)) {111this.dbg("emitInfo[alert]", this.status);112this.doEmit();113} else {114// deep comparison check via lodash and we rate limit115const recent =116this.lastEmit + 1000 * STATUS_UPDATES_INTERVAL_S > Date.now();117const changed = !isEqual(this.status, this.last);118if (!recent && changed) {119this.dbg("emitInfo[changed]", this.status);120this.doEmit();121}122}123}124125private doEmit(): void {126this.emit("status", this.status);127this.lastEmit = Date.now();128}129130public setComponentAlert(name: ComponentName) {131// we set this to the time when we first got notified about the problem132if (this.components[name] == null) {133this.components[name] = Date.now();134}135}136137public clearComponentAlert(name: ComponentName) {138delete this.components[name];139}140141// this derives elevated levels from the project info object142private update_alerts() {143if (this.info == null) return;144const du = this.info.disk_usage.project;145const ts = this.info.timestamp;146147const do_alert = (type: AlertType, is_bad: boolean) => {148if (is_bad) {149// if it isn't fine, set it once to the timestamp (and let it age)150if (this.elevated[type] == null) {151this.elevated[type] = ts;152}153} else {154// unless it's fine again, then remove the timestamp155this.elevated[type] = null;156}157};158159do_alert("disk", du.free < ALERT_DISK_FREE);160this.disk_mb = du.usage;161162const cg = this.info.cgroup;163const du_tmp = this.info.disk_usage.tmp;164if (cg != null) {165// we round/quantisize values to reduce the number of updates166// and also send less data with each update167const cgStats = cgroup_stats(cg, du_tmp);168this.mem_pct = Math.round(cgStats.mem_pct);169this.cpu_pct = Math.round(cgStats.cpu_pct);170this.cpu_tot = Math.round(cgStats.cpu_tot);171this.mem_tot = quantize(cgStats.mem_tot, 1);172this.mem_rss = quantize(cgStats.mem_rss, 1);173do_alert("memory", cgStats.mem_pct > ALERT_HIGH_PCT);174do_alert("cpu-cgroup", cgStats.cpu_pct > ALERT_HIGH_PCT);175}176}177178private alert_cpu_processes(): string[] {179const pids: string[] = [];180if (this.info == null) return [];181const ts = this.info.timestamp;182const ecp = this.elevated_cpu_procs;183// we have to check if there aren't any processes left which no longer exist184const leftovers = new Set(Object.keys(ecp));185// bookkeeping of elevated process PIDS186for (const [pid, proc] of Object.entries(this.info.processes ?? {})) {187leftovers.delete(pid);188if (proc.cpu.pct > ALERT_HIGH_PCT) {189if (ecp[pid] == null) {190ecp[pid] = ts;191}192} else {193delete ecp[pid];194}195}196for (const pid of leftovers) {197delete ecp[pid];198}199// to actually fire alert when necessary200for (const [pid, ts] of Object.entries(ecp)) {201if (ts != null && how_long_ago_m(ts) > RAISE_ALERT_AFTER_MIN) {202pids.push(pid);203}204}205pids.sort(); // to make this stable across iterations206//this.dbg("alert_cpu_processes", pids, ecp);207return pids;208}209210// update alert levels and set alert states if they persist to be active211private alerts(): Alert[] {212this.update_alerts();213const alerts: Alert[] = [];214const alert_keys: AlertType[] = ["cpu-cgroup", "disk", "memory"];215for (const k of alert_keys) {216const ts = this.elevated[k];217if (ts != null && how_long_ago_m(ts) > RAISE_ALERT_AFTER_MIN) {218alerts.push({ type: k } as Alert);219}220}221const pids: string[] = this.alert_cpu_processes();222if (pids.length > 0) alerts.push({ type: "cpu-process", pids });223224const componentNames: ComponentName[] = [];225for (const [k, ts] of Object.entries(this.components)) {226if (ts == null) continue;227// we alert without a delay228componentNames.push(k as ComponentName);229}230// only send any alert if there is actually a problem!231if (componentNames.length > 0) {232alerts.push({ type: "component", names: componentNames });233}234return alerts;235}236237private fake_data(): ProjectStatus["usage"] {238const lastUsage = this.last?.["usage"];239240const next = (key, max) => {241const last = lastUsage?.[key] ?? max / 2;242const dx = max / 50;243const val = last + dx * Math.random() - dx / 2;244return Math.round(Math.min(max, Math.max(0, val)));245};246247const mem_tot = 3000;248const mem_pct = next("mem_pct", 100);249const mem_rss = Math.round((mem_tot * mem_pct) / 100);250const cpu_tot = round1((lastUsage?.["cpu_tot"] ?? 0) + Math.random() / 10);251252return {253disk_mb: next("disk", 3000),254mem_tot,255mem_pct,256cpu_pct: next("cpu_pct", 100),257cpu_tot,258mem_rss,259};260}261262// this function takes the "info" we have (+ more maybe?)263// and derives various states from it.264// It shouldn't really matter how often it is being called,265// but still only emit new objects if it is either really necessary (new alert)266// or after some time. This must be a low-frequency and low-volume stream of data.267private update(): void {268this.last = this.status;269270// alerts must come first, it updates usage status fields271const alerts = this.alerts();272273// set this to true if you're developing (otherwise you don't get any data)274const fake_data = false;275276// collect status fields in usage object277const usage = fake_data278? this.fake_data()279: {280disk_mb: this.disk_mb,281mem_pct: this.mem_pct,282cpu_pct: this.cpu_pct,283cpu_tot: this.cpu_tot,284mem_rss: this.mem_rss,285mem_tot: this.mem_tot,286};287288this.status = { alerts, usage, version: smcVersion };289}290291private async get_status(): Promise<ProjectStatus | undefined> {292this.update();293return this.status;294}295296public stop(): void {297this.running = false;298}299300public async start(): Promise<void> {301if (!this.running) {302await this._start();303}304}305306private async _start(): Promise<void> {307this.dbg("start");308if (this.running) {309throw Error("Cannot start ProjectStatusServer twice");310}311this.running = true;312await this.init();313314const status = await this.get_status();315this.emit("status", status);316317while (this.testing) {318await delay(5000);319const status = await this.get_status();320this.emit("status", status);321}322}323}324325// singleton, we instantiate it when we need it326let status: ProjectStatusServer | undefined = undefined;327328export function init() {329logger.debug("initializing project status server, and enabling publishing");330if (status == null) {331status = new ProjectStatusServer();332}333createPublisher({334projectStatusServer: status,335compute_server_id,336project_id,337});338status.start();339}340341// testing: $ ts-node server.ts342if (require.main === module) {343const pss = new ProjectStatusServer(true);344pss.start();345let cnt = 0;346pss.on("status", (status) => {347console.log(JSON.stringify(status, null, 2));348cnt += 1;349if (cnt >= 2) process.exit();350});351}352353354