Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/conat/sync/pubsub.ts
1452 views
1
/*
2
Use Conat simple pub/sub to share state for something very *ephemeral* in a project.
3
4
This is used, e.g., for broadcasting a user's cursors when they are editing a file.
5
*/
6
7
import { projectSubject } from "@cocalc/conat/names";
8
import { State } from "@cocalc/conat/types";
9
import { EventEmitter } from "events";
10
import { type Subscription, getClient, Client } from "@cocalc/conat/core/client";
11
12
export class PubSub extends EventEmitter {
13
private subject: string;
14
private client: Client;
15
private sub?: Subscription;
16
private state: State = "disconnected";
17
18
constructor({
19
project_id,
20
path,
21
name,
22
client,
23
}: {
24
project_id: string;
25
name: string;
26
path?: string;
27
client?: Client;
28
}) {
29
super();
30
this.client = client ?? getClient();
31
this.subject = projectSubject({
32
project_id,
33
path,
34
service: `pubsub-${name}`,
35
});
36
this.subscribe();
37
}
38
39
private setState = (state: State) => {
40
this.state = state;
41
this.emit(state);
42
};
43
44
close = () => {
45
if (this.state == "closed") {
46
return;
47
}
48
this.setState("closed");
49
this.removeAllListeners();
50
// @ts-ignore
51
this.sub?.close();
52
delete this.sub;
53
};
54
55
set = (obj) => {
56
this.client.publish(this.subject, obj);
57
};
58
59
private subscribe = async () => {
60
this.sub = await this.client.subscribe(this.subject);
61
this.setState("connected");
62
for await (const mesg of this.sub) {
63
this.emit("change", mesg.data);
64
}
65
};
66
}
67
68