import type { RawMsg } from "./core-stream";12export const ENFORCE_LIMITS_THROTTLE_MS = process.env.COCALC_TEST_MODE3? 1004: 45000;56class PublishRejectError extends Error {7code: string;8mesg: any;9subject?: string;10limit?: string;11}1213export interface FilteredStreamLimitOptions {14// How many messages may be in a Stream, oldest messages will be removed15// if the Stream exceeds this size. -1 for unlimited.16max_msgs: number;17// Maximum age of any message in the stream matching the filter,18// expressed in milliseconds. 0 for unlimited.19// **Note that max_age is in milliseconds.**20max_age: number;21// How big the Stream may be, when the combined stream size matching the filter22// exceeds this old messages are removed. -1 for unlimited.23// This is enforced only on write, so if you change it, it only applies24// to future messages.25max_bytes: number;26// The largest message that will be accepted by the Stream. -1 for unlimited.27max_msg_size: number;2829// Attempting to publish a message that causes this to be exceeded30// throws an exception instead. -1 (or 0) for unlimited31// For dstream, the messages are explicitly rejected and the client32// gets a "reject" event emitted. E.g., the terminal running in the project33// writes [...] when it gets these rejects, indicating that data was34// dropped.35max_bytes_per_second: number;36max_msgs_per_second: number;37}3839export interface KVLimits {40// How many keys may be in the KV store. Oldest keys will be removed41// if the key-value store exceeds this size. -1 for unlimited.42max_msgs: number;4344// Maximum age of any key, expressed in milliseconds. 0 for unlimited.45// Age is updated whenever value of the key is changed.46max_age: number;4748// The maximum number of bytes to store in this KV, which means49// the total of the bytes used to store everything. Since we store50// the key with each value (to have arbitrary keys), this includes51// the size of the keys.52max_bytes: number;5354// The maximum size of any single value, including the key.55max_msg_size: number;56}5758export function enforceLimits<T>({59messages,60raw,61limits,62}: {63messages: T[];64raw: RawMsg[];65limits: FilteredStreamLimitOptions;66}) {67const { max_msgs, max_age, max_bytes } = limits;68// we check with each defined limit if some old messages69// should be dropped, and if so move limit forward. If70// it is above -1 at the end, we do the drop.71let index = -1;72const setIndex = (i, _limit) => {73// console.log("setIndex", { i, _limit });74index = Math.max(i, index);75};76// max_msgs77// console.log({ max_msgs, l: messages.length, messages });78if (max_msgs > -1 && messages.length > max_msgs) {79// ensure there are at most limits.max_msgs messages80// by deleting the oldest ones up to a specified point.81const i = messages.length - max_msgs;82if (i > 0) {83setIndex(i - 1, "max_msgs");84}85}8687// max_age88if (max_age > 0) {89// expire messages older than max_age nanoseconds90const recent = raw[raw.length - 1];91if (recent != null) {92// to avoid potential clock skew, we define *now* as the time of the most93// recent message. For us, this should be fine, since we only impose limits94// when writing new messages, and none of these limits are guaranteed.95const now = recent.timestamp;96if (now) {97const cutoff = now - max_age;98for (let i = raw.length - 1; i >= 0; i--) {99const t = raw[i].timestamp;100if (t < cutoff) {101// it just went over the limit. Everything before102// and including the i-th message must be deleted.103setIndex(i, "max_age");104break;105}106}107}108}109}110111// max_bytes112if (max_bytes >= 0) {113let t = 0;114for (let i = raw.length - 1; i >= 0; i--) {115t += raw[i].data.length;116if (t > max_bytes) {117// it just went over the limit. Everything before118// and including the i-th message must be deleted.119setIndex(i, "max_bytes");120break;121}122}123}124125return index;126}127128export function enforceRateLimits({129limits,130bytesSent,131subject,132bytes,133}: {134limits: { max_bytes_per_second: number; max_msgs_per_second: number };135bytesSent: { [time: number]: number };136subject?: string;137bytes;138}) {139const now = Date.now();140if (!(limits.max_bytes_per_second > 0) && !(limits.max_msgs_per_second > 0)) {141return;142}143144const cutoff = now - 1000;145let totalBytes = 0,146msgs = 0;147for (const t in bytesSent) {148if (parseInt(t) < cutoff) {149delete bytesSent[t];150} else {151totalBytes += bytesSent[t];152msgs += 1;153}154}155if (156limits.max_bytes_per_second > 0 &&157totalBytes + bytes > limits.max_bytes_per_second158) {159const err = new PublishRejectError(160`bytes per second limit of ${limits.max_bytes_per_second} exceeded`,161);162err.code = "REJECT";163err.subject = subject;164err.limit = "max_bytes_per_second";165throw err;166}167if (limits.max_msgs_per_second > 0 && msgs > limits.max_msgs_per_second) {168const err = new PublishRejectError(169`messages per second limit of ${limits.max_msgs_per_second} exceeded`,170);171err.code = "REJECT";172err.subject = subject;173err.limit = "max_msgs_per_second";174throw err;175}176bytesSent[now] = bytes;177}178179180