Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/conat/sync/astream.ts
1453 views
1
/*
2
Asynchronous Memory Efficient Access to Core Stream.
3
4
This provides access to the same data as dstream, except it doesn't download any
5
data to the client until you actually call get. The calls to get and
6
set are thus async.
7
8
There is no need to close this because it is stateless.
9
10
[ ] TODO: efficiently get or set many values at once in a single call. This will be
11
very useful, e.g., for jupyter notebook timetravel browsing.
12
13
DEVELOPMENT:
14
15
~/cocalc/src/packages/backend$ node
16
17
a = await require("@cocalc/backend/conat/sync").dstream({name:'test'})
18
19
20
b = require("@cocalc/backend/conat/sync").astream({name:'test'})
21
const {seq} = await b.push('x')
22
23
a.get() // ['x']
24
25
await b.get(seq) // 'x'
26
27
*/
28
29
import {
30
type StorageOptions,
31
type PersistStreamClient,
32
stream,
33
} from "@cocalc/conat/persist/client";
34
import { type DStreamOptions } from "./dstream";
35
import {
36
type Headers,
37
messageData,
38
type Client,
39
Message,
40
decode,
41
} from "@cocalc/conat/core/client";
42
import { storagePath, type User } from "./core-stream";
43
import { connect } from "@cocalc/conat/core/client";
44
import { type Configuration } from "@cocalc/conat/persist/storage";
45
46
export class AStream<T = any> {
47
private storage: StorageOptions;
48
private user: User;
49
private stream: PersistStreamClient;
50
private client: Client;
51
52
constructor(options: DStreamOptions) {
53
this.user = {
54
account_id: options.account_id,
55
project_id: options.project_id,
56
};
57
this.storage = { path: storagePath(options) };
58
this.client = options.client ?? connect();
59
this.stream = stream({
60
client: this.client,
61
user: this.user,
62
storage: this.storage,
63
});
64
}
65
66
close = () => {
67
this.stream.close();
68
};
69
70
getMessage = async (
71
seq_or_key: number | string,
72
{ timeout }: { timeout?: number } = {},
73
): Promise<Message<T> | undefined> => {
74
return await this.stream.get({
75
...opt(seq_or_key),
76
timeout,
77
});
78
};
79
80
get = async (
81
seq_or_key: number | string,
82
opts?: { timeout?: number },
83
): Promise<T | undefined> => {
84
return (await this.getMessage(seq_or_key, opts))?.data;
85
};
86
87
headers = async (
88
seq_or_key: number | string,
89
opts?: { timeout?: number },
90
): Promise<Headers | undefined> => {
91
return (await this.getMessage(seq_or_key, opts))?.headers;
92
};
93
94
// this is an async iterator so you can iterate over the
95
// data without having to have it all in RAM at once.
96
// Of course, you can put it all in a single list if you want.
97
async *getAll(opts): AsyncGenerator<
98
{
99
mesg: T;
100
headers?: Headers;
101
seq: number;
102
time: number;
103
key?: string;
104
},
105
void,
106
unknown
107
> {
108
for await (const messages of this.stream.getAll(opts)) {
109
for (const { seq, time, key, encoding, raw, headers } of messages) {
110
const mesg = decode({ encoding, data: raw });
111
yield { mesg, headers, seq, time, key };
112
}
113
}
114
}
115
116
async *changefeed(): AsyncGenerator<
117
| {
118
op: "set";
119
mesg: T;
120
headers?: Headers;
121
seq: number;
122
time: number;
123
key?: string;
124
}
125
| { op: "delete"; seqs: number[] },
126
void,
127
unknown
128
> {
129
const cf = await this.stream.changefeed();
130
for await (const updates of cf) {
131
for (const event of updates) {
132
if (event.op == "delete") {
133
yield event;
134
} else {
135
const { seq, time, key, encoding, raw, headers } = event;
136
const mesg = decode({ encoding, data: raw });
137
yield { op: "set", mesg, headers, seq, time, key };
138
}
139
}
140
}
141
}
142
143
delete = async (opts: {
144
timeout?: number;
145
seq?: number;
146
last_seq?: number;
147
all?: boolean;
148
}): Promise<{ seqs: number[] }> => {
149
return await this.stream.delete(opts);
150
};
151
152
publish = async (
153
value: T,
154
options?: {
155
headers?: Headers;
156
previousSeq?: number;
157
timeout?: number;
158
key?: string;
159
ttl?: number;
160
msgID?: string;
161
},
162
): Promise<{ seq: number; time: number }> => {
163
const { headers, ...options0 } = options ?? {};
164
return await this.stream.set({
165
messageData: messageData(value, { headers }),
166
...options0,
167
});
168
};
169
170
push = async (
171
...args: T[]
172
): Promise<({ seq: number; time: number } | { error: string })[]> => {
173
// [ ] TODO: should break this up into chunks with a limit on size.
174
const ops = args.map((mesg) => {
175
return { messageData: messageData(mesg) };
176
});
177
return await this.stream.setMany(ops);
178
};
179
180
config = async (
181
config: Partial<Configuration> = {},
182
): Promise<Configuration> => {
183
if (this.storage == null) {
184
throw Error("bug -- storage must be set");
185
}
186
return await this.stream.config({ config });
187
};
188
189
sqlite = async (
190
statement: string,
191
params?: any[],
192
{ timeout }: { timeout?: number } = {},
193
): Promise<any[]> => {
194
return await this.stream.sqlite({
195
timeout,
196
statement,
197
params,
198
});
199
};
200
}
201
202
export function astream<T>(opts: DStreamOptions) {
203
return new AStream<T>(opts);
204
}
205
206
function opt(seq_or_key: number | string): { seq: number } | { key: string } {
207
const t = typeof seq_or_key;
208
if (t == "number") {
209
return { seq: seq_or_key as number };
210
} else if (t == "string") {
211
return { key: seq_or_key as string };
212
}
213
throw Error(`arg must be number or string`);
214
}
215
216