Path: blob/master/src/packages/conat/persist/server.ts
1452 views
/*1CONAT_SERVER=http://localhost:3000 node23// making a server from scratch45// initialize persist context67require('@cocalc/backend/conat/persist');89// a conat server and client10s = require('@cocalc/conat/core/server').init({port:4567, getUser:()=>{return {hub_id:'hub'}}}); client = s.client();1112// persist server13p = require('@cocalc/conat/persist/server').server({client}); 0;14151617// a client for persist server1819c = require('@cocalc/conat/persist/client').stream({client, user:{hub_id:'hub'}, storage:{path:'b.txt'}});2021for await (x of await c.getAll()) { console.log(x) }222324await c.set({messageData:client.message(123)})2526for await (x of await c.getAll()) { console.log(x) }2728[ { seq: 1, time: 1750218209211, encoding: 0, raw: <Buffer 7b> } ]2930(await c.get({seq:5})).data3132await c.set({key:'foo', messageData:client.message('bar')})33(await c.get({key:'foo'})).data3435await c.delete({seq:6})363738client = await require('@cocalc/backend/conat').conat(); kv = require('@cocalc/backend/conat/sync').akv({project_id:'3fa218e5-7196-4020-8b30-e2127847cc4f', name:'a.txt', client})3940client = await require('@cocalc/backend/conat').conat(); s = require('@cocalc/backend/conat/sync').astream({project_id:'3fa218e5-7196-4020-8b30-e2127847cc4f', name:'b.txt', client})4142client = await require('@cocalc/backend/conat').conat(); s = await require('@cocalc/backend/conat/sync').dstream({project_id:'3fa218e5-7196-4020-8b30-e2127847cc4f', name:'ds2.txt', client})434445client = await require('@cocalc/backend/conat').conat(); kv = require('@cocalc/backend/conat/sync').akv({project_id:'3fa218e5-7196-4020-8b30-e2127847cc4f', name:'a.txt', client})464748client = await require('@cocalc/backend/conat').conat(); kv = await require('@cocalc/backend/conat/sync').dkv({project_id:'3fa218e5-7196-4020-8b30-e2127847cc4f', name:'a1', client})495051client = await require('@cocalc/backend/conat').conat(); s = await require('@cocalc/conat/sync/core-stream').cstream({name:'d.txt',client})525354*/5556import { type Client, ConatError } from "@cocalc/conat/core/client";57import {58type ConatSocketServer,59type ServerSocket,60} from "@cocalc/conat/socket";61import { getLogger } from "@cocalc/conat/client";62import type {63StoredMessage,64PersistentStream,65StorageOptions,66} from "./storage";67import { getStream, SERVICE, MAX_PER_USER, MAX_GLOBAL, RESOURCE } from "./util";68import { throttle } from "lodash";69import { type SetOptions } from "./client";70import { once } from "@cocalc/util/async-utils";71import { UsageMonitor } from "@cocalc/conat/monitor/usage";7273const logger = getLogger("persist:server");7475// When sending a large number of message for76// getAll or change updates, we combine together messages77// until hitting this size, then send them all at once.78// This bound is to avoid potentially using a huge amount of RAM79// when streaming a large saved database to the client.80// Note: if a single message is larger than this, it still81// gets sent, just individually.82const DEFAULT_MESSAGES_THRESH = 20 * 1e6;83//const DEFAULT_MESSAGES_THRESH = 1e5;8485// I added an experimental way to run any sqlite query... but it is disabled86// since of course there are major DOS and security concerns.87const ENABLE_SQLITE_GENERAL_QUERIES = false;8889const SEND_THROTTLE = 30;9091export function server({92client,93messagesThresh = DEFAULT_MESSAGES_THRESH,94}: {95client: Client;96messagesThresh?: number;97}) {98logger.debug("server: creating...");99if (client == null) {100throw Error("client must be specified");101}102const subject = `${SERVICE}.*`;103const server: ConatSocketServer = client.socket.listen(subject);104logger.debug("server: listening on ", { subject });105const usage = new UsageMonitor({106maxPerUser: MAX_PER_USER,107max: MAX_GLOBAL,108resource: RESOURCE,109log: (...args) => {110logger.debug(RESOURCE, ...args);111},112});113server.on("close", () => {114usage.close();115});116117server.on("connection", (socket: ServerSocket) => {118logger.debug("server: got new connection", {119id: socket.id,120subject: socket.subject,121});122let error = "";123let errorCode: any = undefined;124let changefeed = false;125let storage: undefined | StorageOptions = undefined;126let stream: undefined | PersistentStream = undefined;127let user = "";128let added = false;129socket.on("data", async (data) => {130// logger.debug("server: got data ", data);131if (stream == null) {132storage = data.storage;133changefeed = data.changefeed;134try {135user = socket.subject.split(".")[1];136usage.add(user);137added = true;138stream = await getStream({139subject: socket.subject,140storage,141});142if (changefeed) {143startChangefeed({ socket, stream, messagesThresh });144}145socket.emit("stream-initialized");146} catch (err) {147error = `${err}`;148errorCode = err.code;149socket.write(null, { headers: { error, code: errorCode } });150}151}152});153socket.on("closed", () => {154logger.debug("socket closed", socket.subject);155storage = undefined;156stream?.close();157stream = undefined;158if (added) {159usage.delete(user);160}161});162163socket.on("request", async (mesg) => {164const request = mesg.headers;165// logger.debug("got request", request);166167try {168if (error) {169throw new ConatError(error, { code: errorCode });170}171if (stream == null) {172await once(socket, "stream-initialized", request.timeout ?? 30000);173}174if (stream == null) {175throw Error("bug");176}177if (request.cmd == "set") {178mesg.respondSync(179stream.set({180key: request.key,181previousSeq: request.previousSeq,182raw: mesg.raw,183ttl: request.ttl,184encoding: mesg.encoding,185headers: request.headers,186msgID: request.msgID,187}),188);189} else if (request.cmd == "setMany") {190// just like set except the main data of the mesg191// has an array of set operations192const resp: (193| { seq: number; time: number }194| { error: string; code?: any }195)[] = [];196for (const {197key,198previousSeq,199ttl,200msgID,201messageData,202} of mesg.data as SetOptions[]) {203try {204resp.push(205stream.set({206key,207previousSeq,208ttl,209headers: messageData.headers,210msgID,211raw: messageData.raw,212encoding: messageData.encoding,213}),214);215} catch (err) {216resp.push({ error: `${err}`, code: err.code });217}218}219mesg.respondSync(resp);220} else if (request.cmd == "delete") {221mesg.respondSync(stream.delete(request));222} else if (request.cmd == "config") {223mesg.respondSync(stream.config(request.config));224} else if (request.cmd == "inventory") {225mesg.respondSync(stream.inventory());226} else if (request.cmd == "get") {227const resp = stream.get({ key: request.key, seq: request.seq });228//console.log("got resp = ", resp);229if (resp == null) {230mesg.respondSync(null);231} else {232const { raw, encoding, headers, seq, time, key } = resp;233mesg.respondSync(null, {234raw,235encoding,236headers: { ...headers, seq, time, key },237});238}239} else if (request.cmd == "keys") {240const resp = stream.keys();241mesg.respondSync(resp);242} else if (request.cmd == "sqlite") {243if (!ENABLE_SQLITE_GENERAL_QUERIES) {244throw Error("sqlite command not currently supported");245}246const resp = stream.sqlite(request.statement, request.params);247mesg.respondSync(resp);248} else if (request.cmd == "serverId") {249mesg.respondSync(server.id);250} else if (request.cmd == "getAll") {251logger.debug("getAll", { subject: socket.subject, request });252// getAll uses requestMany which responds with all matching messages,253// so no call to mesg.respond here.254getAll({ stream, mesg, request, messagesThresh });255} else if (request.cmd == "changefeed") {256logger.debug("changefeed", changefeed);257if (!changefeed) {258changefeed = true;259startChangefeed({ socket, stream, messagesThresh });260}261mesg.respondSync("created");262} else {263mesg.respondSync(null, {264headers: { error: `unknown command ${request.cmd}`, code: 404 },265});266}267} catch (err) {268mesg.respondSync(null, {269headers: { error: `${err}`, code: err.code },270});271}272});273});274275return server;276}277278async function getAll({ stream, mesg, request, messagesThresh }) {279let seq = 0;280const respond = (error?, messages?: StoredMessage[]) => {281mesg.respondSync(messages, { headers: { error, seq, code: error?.code } });282seq += 1;283};284285try {286const messages: StoredMessage[] = [];287let size = 0;288for (const message of stream.getAll({289start_seq: request.start_seq,290end_seq: request.end_seq,291})) {292messages.push(message);293size += message.raw.length;294if (size >= messagesThresh) {295respond(undefined, messages);296messages.length = 0;297size = 0;298}299}300301if (messages.length > 0) {302respond(undefined, messages);303}304// successful finish305respond();306} catch (err) {307respond(`${err}`);308}309}310311function startChangefeed({ socket, stream, messagesThresh }) {312logger.debug("startChangefeed", { subject: socket.subject });313// this seq here has nothing to do with the seq of the StoredMessage!314let seq = 0;315const respond = (error?, messages?: StoredMessage[]) => {316if (socket.state == "closed") {317return;318}319//logger.debug("changefeed: writing messages to socket", { seq, messages });320socket.write(messages, { headers: { error, seq } });321seq += 1;322};323324const unsentMessages: StoredMessage[] = [];325const sendAllUnsentMessages = throttle(326() => {327while (socket.state != "closed" && unsentMessages.length > 0) {328const messages: StoredMessage[] = [];329let size = 0;330while (unsentMessages.length > 0 && socket.state != "closed") {331const message = unsentMessages.shift();332// e.g. op:'delete' messages have length 0 and no raw field333size += message?.raw?.length ?? 0;334messages.push(message!);335if (size >= messagesThresh) {336respond(undefined, messages);337size = 0;338messages.length = 0;339}340}341if (messages.length > 0) {342respond(undefined, messages);343}344}345},346SEND_THROTTLE,347{ leading: true, trailing: true },348);349350stream.on("change", (message) => {351if (socket.state == "closed") {352return;353}354//console.log("stream change event", message);355// logger.debug("changefeed got message", message, socket.state);356unsentMessages.push(message);357sendAllUnsentMessages();358});359}360361362