Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/conat/sync/inventory.ts
1452 views
1
/*
2
Inventory of all streams and key:value stores in a specific project or account.
3
4
DEVELOPMENT:
5
6
i = await require('@cocalc/backend/conat/sync').inventory({project_id:'00847397-d6a8-4cb0-96a8-6ef64ac3e6cf'})
7
8
i.ls()
9
10
*/
11
12
import { dkv, type DKV } from "./dkv";
13
import { dstream, type DStream } from "./dstream";
14
import { dko, type DKO } from "./dko";
15
import getTime from "@cocalc/conat/time";
16
import refCache from "@cocalc/util/refcache";
17
import type { JSONValue } from "@cocalc/util/types";
18
import {
19
human_readable_size as humanReadableSize,
20
trunc_middle,
21
} from "@cocalc/util/misc";
22
import { DKO_PREFIX } from "./dko";
23
import { waitUntilTimeAvailable } from "@cocalc/conat/time";
24
import {
25
type Configuration,
26
type PartialInventory,
27
} from "@cocalc/conat/persist/storage";
28
import { AsciiTable3 } from "ascii-table3";
29
30
export const INVENTORY_UPDATE_INTERVAL = 90000;
31
export const INVENTORY_NAME = "CoCalc-Inventory";
32
33
type Sort =
34
| "last"
35
| "created"
36
| "count"
37
| "bytes"
38
| "name"
39
| "type"
40
| "-last"
41
| "-created"
42
| "-count"
43
| "-bytes"
44
| "-name"
45
| "-type";
46
47
interface Location {
48
account_id?: string;
49
project_id?: string;
50
}
51
52
type StoreType = "stream" | "kv";
53
54
export interface InventoryItem extends PartialInventory {
55
// when it was created
56
created: number;
57
// last time this stream was updated
58
last: number;
59
// optional description, which can be anything
60
desc?: JSONValue;
61
}
62
63
interface FullItem extends InventoryItem {
64
type: StoreType;
65
name: string;
66
}
67
68
export class Inventory {
69
public location: Location;
70
private dkv?: DKV<InventoryItem>;
71
72
constructor(location: { account_id?: string; project_id?: string }) {
73
this.location = location;
74
}
75
76
init = async () => {
77
this.dkv = await dkv({
78
name: INVENTORY_NAME,
79
...this.location,
80
});
81
await waitUntilTimeAvailable();
82
};
83
84
// Set but with NO LIMITS and no MERGE conflict algorithm. Use with care!
85
set = ({
86
type,
87
name,
88
bytes,
89
count,
90
desc,
91
limits,
92
seq,
93
}: {
94
type: StoreType;
95
name: string;
96
bytes: number;
97
count: number;
98
limits: Partial<Configuration>;
99
desc?: JSONValue;
100
seq: number;
101
}) => {
102
if (this.dkv == null) {
103
throw Error("not initialized");
104
}
105
const last = getTime();
106
const key = this.encodeKey({ name, type });
107
const cur = this.dkv.get(key);
108
const created = cur?.created ?? last;
109
desc = desc ?? cur?.desc;
110
this.dkv.set(key, {
111
desc,
112
last,
113
created,
114
bytes,
115
count,
116
limits,
117
seq,
118
});
119
};
120
121
private encodeKey = ({ name, type }) => JSON.stringify({ name, type });
122
123
private decodeKey = (key) => JSON.parse(key);
124
125
delete = ({ name, type }: { name: string; type: StoreType }) => {
126
if (this.dkv == null) {
127
throw Error("not initialized");
128
}
129
this.dkv.delete(this.encodeKey({ name, type }));
130
};
131
132
get = (
133
x: { name: string; type: StoreType } | string,
134
): (InventoryItem & { type: StoreType; name: string }) | undefined => {
135
if (this.dkv == null) {
136
throw Error("not initialized");
137
}
138
let cur;
139
let name, type;
140
if (typeof x == "string") {
141
// just the name -- we infer/guess the type
142
name = x;
143
type = "kv";
144
cur = this.dkv.get(this.encodeKey({ name, type }));
145
if (cur == null) {
146
type = "stream";
147
cur = this.dkv.get(this.encodeKey({ name, type }));
148
}
149
} else {
150
name = x.name;
151
cur = this.dkv.get(this.encodeKey(x));
152
}
153
if (cur == null) {
154
return;
155
}
156
return { ...cur, type, name };
157
};
158
159
getStores = async ({
160
filter,
161
sort = "-last",
162
}: { filter?: string; sort?: Sort } = {}): Promise<
163
(DKV | DStream | DKO)[]
164
> => {
165
const v: (DKV | DStream | DKO)[] = [];
166
const all = this.getAll({ filter });
167
for (const key of this.sortedKeys(all, sort)) {
168
const x = all[key];
169
const { desc, name, type } = x;
170
if (type == "kv") {
171
if (name.startsWith(DKO_PREFIX)) {
172
v.push(await dko<any>({ name, ...this.location, desc }));
173
} else {
174
v.push(await dkv({ name, ...this.location, desc }));
175
}
176
} else if (type == "stream") {
177
v.push(await dstream({ name, ...this.location, desc }));
178
} else {
179
throw Error(`unknown store type '${type}'`);
180
}
181
}
182
return v;
183
};
184
185
getAll = ({ filter }: { filter?: string } = {}): FullItem[] => {
186
if (this.dkv == null) {
187
throw Error("not initialized");
188
}
189
const all = this.dkv.getAll();
190
if (filter) {
191
filter = filter.toLowerCase();
192
}
193
const v: FullItem[] = [];
194
for (const key of Object.keys(all)) {
195
const { name, type } = this.decodeKey(key);
196
if (filter) {
197
const { desc } = all[key];
198
const s = `${desc ? JSON.stringify(desc) : ""} ${name}`.toLowerCase();
199
if (!s.includes(filter)) {
200
continue;
201
}
202
}
203
v.push({ ...all[key], name, type });
204
}
205
return v;
206
};
207
208
close = async () => {
209
await this.dkv?.close();
210
delete this.dkv;
211
};
212
213
private sortedKeys = (all, sort0: Sort) => {
214
let reverse: boolean, sort: string;
215
if (sort0[0] == "-") {
216
reverse = true;
217
sort = sort0.slice(1);
218
} else {
219
reverse = false;
220
sort = sort0;
221
}
222
// return keys of all, sorted as specified
223
const x: { k: string; v: any }[] = [];
224
for (const k in all) {
225
x.push({ k, v: { ...all[k], ...this.decodeKey(k) } });
226
}
227
x.sort((a, b) => {
228
const a0 = a.v[sort];
229
const b0 = b.v[sort];
230
if (a0 < b0) {
231
return -1;
232
}
233
if (a0 > b0) {
234
return 1;
235
}
236
return 0;
237
});
238
const y = x.map(({ k }) => k);
239
if (reverse) {
240
y.reverse();
241
}
242
return y;
243
};
244
245
ls = ({
246
log = console.log,
247
filter,
248
noTrunc,
249
path: path0,
250
sort = "last",
251
noHelp,
252
}: {
253
log?: Function;
254
filter?: string;
255
noTrunc?: boolean;
256
path?: string;
257
sort?: Sort;
258
noHelp?: boolean;
259
} = {}) => {
260
if (this.dkv == null) {
261
throw Error("not initialized");
262
}
263
const all = this.dkv.getAll();
264
if (!noHelp) {
265
log(
266
"ls(opts: {filter?: string; noTrunc?: boolean; path?: string; sort?: 'last'|'created'|'count'|'bytes'|'name'|'type'|'-last'|...})",
267
);
268
}
269
270
const rows: any[] = [];
271
for (const key of this.sortedKeys(all, sort)) {
272
const { last, created, count, bytes, desc, limits } = all[key];
273
if (path0 && desc?.["path"] != path0) {
274
continue;
275
}
276
let { name, type } = this.decodeKey(key);
277
if (name.startsWith(DKO_PREFIX)) {
278
type = "kvobject";
279
name = name.slice(DKO_PREFIX.length);
280
}
281
if (!noTrunc) {
282
name = trunc_middle(name, 50);
283
}
284
if (
285
filter &&
286
!`${desc ? JSON.stringify(desc) : ""} ${name}`
287
.toLowerCase()
288
.includes(filter.toLowerCase())
289
) {
290
continue;
291
}
292
rows.push([
293
type,
294
name,
295
dateToString(new Date(created)),
296
humanReadableSize(bytes),
297
count,
298
dateToString(new Date(last)),
299
desc ? JSON.stringify(desc) : "",
300
limits != null && Object.keys(limits).length > 0
301
? JSON.stringify(limits)
302
: "--",
303
]);
304
}
305
306
const table = new AsciiTable3(
307
`Inventory for ${JSON.stringify(this.location)}`,
308
)
309
.setHeading(
310
"Type",
311
"Name",
312
"Created",
313
"Size",
314
"Count",
315
"Last Update",
316
"Desc",
317
"Limits",
318
)
319
.addRowMatrix(rows);
320
table.setStyle("unicode-round");
321
if (!noTrunc) {
322
table.setWidth(7, 50).setWrapped(1);
323
table.setWidth(8, 30).setWrapped(1);
324
}
325
log(table.toString());
326
};
327
}
328
329
function dateToString(d: Date) {
330
return d.toISOString().replace("T", " ").replace("Z", "").split(".")[0];
331
}
332
333
export const cache = refCache<Location & { noCache?: boolean }, Inventory>({
334
name: "inventory",
335
createObject: async (loc) => {
336
const k = new Inventory(loc);
337
await k.init();
338
return k;
339
},
340
});
341
342
export async function inventory(options: Location = {}): Promise<Inventory> {
343
return await cache(options);
344
}
345
346