Path: blob/master/src/packages/conat/sync/astream.ts
1453 views
/*1Asynchronous Memory Efficient Access to Core Stream.23This provides access to the same data as dstream, except it doesn't download any4data to the client until you actually call get. The calls to get and5set are thus async.67There is no need to close this because it is stateless.89[ ] TODO: efficiently get or set many values at once in a single call. This will be10very useful, e.g., for jupyter notebook timetravel browsing.1112DEVELOPMENT:1314~/cocalc/src/packages/backend$ node1516a = await require("@cocalc/backend/conat/sync").dstream({name:'test'})171819b = require("@cocalc/backend/conat/sync").astream({name:'test'})20const {seq} = await b.push('x')2122a.get() // ['x']2324await b.get(seq) // 'x'2526*/2728import {29type StorageOptions,30type PersistStreamClient,31stream,32} from "@cocalc/conat/persist/client";33import { type DStreamOptions } from "./dstream";34import {35type Headers,36messageData,37type Client,38Message,39decode,40} from "@cocalc/conat/core/client";41import { storagePath, type User } from "./core-stream";42import { connect } from "@cocalc/conat/core/client";43import { type Configuration } from "@cocalc/conat/persist/storage";4445export class AStream<T = any> {46private storage: StorageOptions;47private user: User;48private stream: PersistStreamClient;49private client: Client;5051constructor(options: DStreamOptions) {52this.user = {53account_id: options.account_id,54project_id: options.project_id,55};56this.storage = { path: storagePath(options) };57this.client = options.client ?? connect();58this.stream = stream({59client: this.client,60user: this.user,61storage: this.storage,62});63}6465close = () => {66this.stream.close();67};6869getMessage = async (70seq_or_key: number | string,71{ timeout }: { timeout?: number } = {},72): Promise<Message<T> | undefined> => {73return await this.stream.get({74...opt(seq_or_key),75timeout,76});77};7879get = async (80seq_or_key: number | string,81opts?: { timeout?: number },82): Promise<T | undefined> => {83return (await this.getMessage(seq_or_key, opts))?.data;84};8586headers = async (87seq_or_key: number | string,88opts?: { timeout?: number },89): Promise<Headers | undefined> => {90return (await this.getMessage(seq_or_key, opts))?.headers;91};9293// this is an async iterator so you can iterate over the94// data without having to have it all in RAM at once.95// Of course, you can put it all in a single list if you want.96async *getAll(opts): AsyncGenerator<97{98mesg: T;99headers?: Headers;100seq: number;101time: number;102key?: string;103},104void,105unknown106> {107for await (const messages of this.stream.getAll(opts)) {108for (const { seq, time, key, encoding, raw, headers } of messages) {109const mesg = decode({ encoding, data: raw });110yield { mesg, headers, seq, time, key };111}112}113}114115async *changefeed(): AsyncGenerator<116| {117op: "set";118mesg: T;119headers?: Headers;120seq: number;121time: number;122key?: string;123}124| { op: "delete"; seqs: number[] },125void,126unknown127> {128const cf = await this.stream.changefeed();129for await (const updates of cf) {130for (const event of updates) {131if (event.op == "delete") {132yield event;133} else {134const { seq, time, key, encoding, raw, headers } = event;135const mesg = decode({ encoding, data: raw });136yield { op: "set", mesg, headers, seq, time, key };137}138}139}140}141142delete = async (opts: {143timeout?: number;144seq?: number;145last_seq?: number;146all?: boolean;147}): Promise<{ seqs: number[] }> => {148return await this.stream.delete(opts);149};150151publish = async (152value: T,153options?: {154headers?: Headers;155previousSeq?: number;156timeout?: number;157key?: string;158ttl?: number;159msgID?: string;160},161): Promise<{ seq: number; time: number }> => {162const { headers, ...options0 } = options ?? {};163return await this.stream.set({164messageData: messageData(value, { headers }),165...options0,166});167};168169push = async (170...args: T[]171): Promise<({ seq: number; time: number } | { error: string })[]> => {172// [ ] TODO: should break this up into chunks with a limit on size.173const ops = args.map((mesg) => {174return { messageData: messageData(mesg) };175});176return await this.stream.setMany(ops);177};178179config = async (180config: Partial<Configuration> = {},181): Promise<Configuration> => {182if (this.storage == null) {183throw Error("bug -- storage must be set");184}185return await this.stream.config({ config });186};187188sqlite = async (189statement: string,190params?: any[],191{ timeout }: { timeout?: number } = {},192): Promise<any[]> => {193return await this.stream.sqlite({194timeout,195statement,196params,197});198};199}200201export function astream<T>(opts: DStreamOptions) {202return new AStream<T>(opts);203}204205function opt(seq_or_key: number | string): { seq: number } | { key: string } {206const t = typeof seq_or_key;207if (t == "number") {208return { seq: seq_or_key as number };209} else if (t == "string") {210return { key: seq_or_key as string };211}212throw Error(`arg must be number or string`);213}214215216