Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/conat/sync/limits.ts
1452 views
1
import type { RawMsg } from "./core-stream";
2
3
export const ENFORCE_LIMITS_THROTTLE_MS = process.env.COCALC_TEST_MODE
4
? 100
5
: 45000;
6
7
class PublishRejectError extends Error {
8
code: string;
9
mesg: any;
10
subject?: string;
11
limit?: string;
12
}
13
14
export interface FilteredStreamLimitOptions {
15
// How many messages may be in a Stream, oldest messages will be removed
16
// if the Stream exceeds this size. -1 for unlimited.
17
max_msgs: number;
18
// Maximum age of any message in the stream matching the filter,
19
// expressed in milliseconds. 0 for unlimited.
20
// **Note that max_age is in milliseconds.**
21
max_age: number;
22
// How big the Stream may be, when the combined stream size matching the filter
23
// exceeds this old messages are removed. -1 for unlimited.
24
// This is enforced only on write, so if you change it, it only applies
25
// to future messages.
26
max_bytes: number;
27
// The largest message that will be accepted by the Stream. -1 for unlimited.
28
max_msg_size: number;
29
30
// Attempting to publish a message that causes this to be exceeded
31
// throws an exception instead. -1 (or 0) for unlimited
32
// For dstream, the messages are explicitly rejected and the client
33
// gets a "reject" event emitted. E.g., the terminal running in the project
34
// writes [...] when it gets these rejects, indicating that data was
35
// dropped.
36
max_bytes_per_second: number;
37
max_msgs_per_second: number;
38
}
39
40
export interface KVLimits {
41
// How many keys may be in the KV store. Oldest keys will be removed
42
// if the key-value store exceeds this size. -1 for unlimited.
43
max_msgs: number;
44
45
// Maximum age of any key, expressed in milliseconds. 0 for unlimited.
46
// Age is updated whenever value of the key is changed.
47
max_age: number;
48
49
// The maximum number of bytes to store in this KV, which means
50
// the total of the bytes used to store everything. Since we store
51
// the key with each value (to have arbitrary keys), this includes
52
// the size of the keys.
53
max_bytes: number;
54
55
// The maximum size of any single value, including the key.
56
max_msg_size: number;
57
}
58
59
export function enforceLimits<T>({
60
messages,
61
raw,
62
limits,
63
}: {
64
messages: T[];
65
raw: RawMsg[];
66
limits: FilteredStreamLimitOptions;
67
}) {
68
const { max_msgs, max_age, max_bytes } = limits;
69
// we check with each defined limit if some old messages
70
// should be dropped, and if so move limit forward. If
71
// it is above -1 at the end, we do the drop.
72
let index = -1;
73
const setIndex = (i, _limit) => {
74
// console.log("setIndex", { i, _limit });
75
index = Math.max(i, index);
76
};
77
// max_msgs
78
// console.log({ max_msgs, l: messages.length, messages });
79
if (max_msgs > -1 && messages.length > max_msgs) {
80
// ensure there are at most limits.max_msgs messages
81
// by deleting the oldest ones up to a specified point.
82
const i = messages.length - max_msgs;
83
if (i > 0) {
84
setIndex(i - 1, "max_msgs");
85
}
86
}
87
88
// max_age
89
if (max_age > 0) {
90
// expire messages older than max_age nanoseconds
91
const recent = raw[raw.length - 1];
92
if (recent != null) {
93
// to avoid potential clock skew, we define *now* as the time of the most
94
// recent message. For us, this should be fine, since we only impose limits
95
// when writing new messages, and none of these limits are guaranteed.
96
const now = recent.timestamp;
97
if (now) {
98
const cutoff = now - max_age;
99
for (let i = raw.length - 1; i >= 0; i--) {
100
const t = raw[i].timestamp;
101
if (t < cutoff) {
102
// it just went over the limit. Everything before
103
// and including the i-th message must be deleted.
104
setIndex(i, "max_age");
105
break;
106
}
107
}
108
}
109
}
110
}
111
112
// max_bytes
113
if (max_bytes >= 0) {
114
let t = 0;
115
for (let i = raw.length - 1; i >= 0; i--) {
116
t += raw[i].data.length;
117
if (t > max_bytes) {
118
// it just went over the limit. Everything before
119
// and including the i-th message must be deleted.
120
setIndex(i, "max_bytes");
121
break;
122
}
123
}
124
}
125
126
return index;
127
}
128
129
export function enforceRateLimits({
130
limits,
131
bytesSent,
132
subject,
133
bytes,
134
}: {
135
limits: { max_bytes_per_second: number; max_msgs_per_second: number };
136
bytesSent: { [time: number]: number };
137
subject?: string;
138
bytes;
139
}) {
140
const now = Date.now();
141
if (!(limits.max_bytes_per_second > 0) && !(limits.max_msgs_per_second > 0)) {
142
return;
143
}
144
145
const cutoff = now - 1000;
146
let totalBytes = 0,
147
msgs = 0;
148
for (const t in bytesSent) {
149
if (parseInt(t) < cutoff) {
150
delete bytesSent[t];
151
} else {
152
totalBytes += bytesSent[t];
153
msgs += 1;
154
}
155
}
156
if (
157
limits.max_bytes_per_second > 0 &&
158
totalBytes + bytes > limits.max_bytes_per_second
159
) {
160
const err = new PublishRejectError(
161
`bytes per second limit of ${limits.max_bytes_per_second} exceeded`,
162
);
163
err.code = "REJECT";
164
err.subject = subject;
165
err.limit = "max_bytes_per_second";
166
throw err;
167
}
168
if (limits.max_msgs_per_second > 0 && msgs > limits.max_msgs_per_second) {
169
const err = new PublishRejectError(
170
`messages per second limit of ${limits.max_msgs_per_second} exceeded`,
171
);
172
err.code = "REJECT";
173
err.subject = subject;
174
err.limit = "max_msgs_per_second";
175
throw err;
176
}
177
bytesSent[now] = bytes;
178
}
179
180