Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/conat/sync/akv.ts
1452 views
1
/*
2
Asynchronous Memory-Efficient Access to Key:Value Store
3
4
This provides access to the same data as dkv, except it doesn't download any
5
data to the client until you actually call get. The calls to get and
6
set are thus async.
7
8
There is no need to close this because it is stateless.
9
10
[ ] TODO: efficiently get or set many values at once in a single call. This will be
11
very useful, e.g., for jupyter notebook timetravel browsing.
12
13
DEVELOPMENT:
14
15
~/cocalc/src/packages/backend$ node
16
17
a = await require("@cocalc/backend/conat/sync").dkv({name:'test'}); a.set('x',5)
18
19
20
b = require("@cocalc/backend/conat/sync").akv({name:'test'})
21
await b.set('x',10)
22
23
a.get('x')
24
25
await b.get('x')
26
27
*/
28
29
import {
30
type StorageOptions,
31
type PersistStreamClient,
32
stream,
33
} from "@cocalc/conat/persist/client";
34
import { type DKVOptions } from "./dkv";
35
import {
36
type Headers,
37
messageData,
38
type Message,
39
} from "@cocalc/conat/core/client";
40
import { storagePath, type User, COCALC_TOMBSTONE_HEADER } from "./core-stream";
41
import { connect } from "@cocalc/conat/core/client";
42
43
export class AKV<T = any> {
44
private storage: StorageOptions;
45
private user: User;
46
private stream: PersistStreamClient;
47
48
constructor(options: DKVOptions) {
49
this.user = {
50
account_id: options.account_id,
51
project_id: options.project_id,
52
};
53
this.storage = { path: storagePath(options) };
54
const client = options.client ?? connect();
55
this.stream = stream({
56
client,
57
user: this.user,
58
storage: this.storage,
59
});
60
}
61
62
close = () => {
63
this.stream.close();
64
};
65
66
getMessage = async (
67
key: string,
68
{ timeout }: { timeout?: number } = {},
69
): Promise<Message<T> | undefined> => {
70
const mesg = await this.stream.get({ key, timeout });
71
if (mesg?.headers?.[COCALC_TOMBSTONE_HEADER]) {
72
return undefined;
73
}
74
return mesg;
75
};
76
77
// // Just get one value asynchronously, rather than the entire dkv.
78
// // If the timeout option is given and the value of key is not set,
79
// // will wait until that many ms to get the key.
80
get = async (
81
key: string,
82
opts?: { timeout?: number },
83
): Promise<T | undefined> => {
84
return (await this.getMessage(key, opts))?.data;
85
};
86
87
headers = async (
88
key: string,
89
opts?: { timeout?: number },
90
): Promise<Headers | undefined> => {
91
return (await this.getMessage(key, opts))?.headers;
92
};
93
94
time = async (
95
key: string,
96
opts?: { timeout?: number },
97
): Promise<Date | undefined> => {
98
const time = (await this.getMessage(key, opts))?.headers?.time;
99
return time !== undefined ? new Date(time as number) : undefined;
100
};
101
102
delete = async (key: string, opts?: { timeout?: number }): Promise<void> => {
103
await this.set(key, null as any, {
104
...opts,
105
headers: { [COCALC_TOMBSTONE_HEADER]: true },
106
});
107
};
108
109
seq = async (
110
key: string,
111
opts?: { timeout?: number },
112
): Promise<number | undefined> => {
113
return (await this.getMessage(key, opts))?.headers?.seq as
114
| number
115
| undefined;
116
};
117
118
set = async (
119
key: string,
120
value: T,
121
options?: {
122
headers?: Headers;
123
previousSeq?: number;
124
timeout?: number;
125
ttl?: number;
126
msgID?: string;
127
},
128
): Promise<{ seq: number; time: number }> => {
129
const { headers, ...options0 } = options ?? {};
130
return await this.stream.set({
131
key,
132
messageData: messageData(value, { headers }),
133
...options0,
134
});
135
};
136
137
keys = async ({ timeout }: { timeout?: number } = {}): Promise<string[]> => {
138
return await this.stream.keys({
139
timeout,
140
});
141
};
142
143
sqlite = async (
144
statement: string,
145
params?: any[],
146
{ timeout }: { timeout?: number } = {},
147
): Promise<any[]> => {
148
return await this.stream.sqlite({
149
timeout,
150
statement,
151
params,
152
});
153
};
154
}
155
156
export function akv<T>(opts: DKVOptions) {
157
return new AKV<T>(opts);
158
}
159
160