Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/jupyter/zmq/index.ts
1447 views
1
import { EventEmitter } from "events";
2
import { Dealer, Subscriber } from "zeromq";
3
import { Message } from "./message";
4
import { getLogger } from "@cocalc/backend/logger";
5
import type { JupyterMessage } from "./types";
6
7
const logger = getLogger("jupyter:zmq");
8
9
type JupyterSocketName = "iopub" | "shell" | "stdin" | "control";
10
11
export const ZMQ_TYPE = {
12
iopub: "sub",
13
stdin: "dealer",
14
shell: "dealer",
15
control: "dealer",
16
} as const;
17
18
export interface JupyterConnectionInfo {
19
version: number;
20
iopub_port: number;
21
shell_port: number;
22
stdin_port: number;
23
control_port: number;
24
signature_scheme: "hmac-sha256";
25
hb_port: number;
26
ip: string;
27
key: string;
28
transport: "tcp" | "ipc";
29
}
30
31
export async function jupyterSockets(
32
config: JupyterConnectionInfo,
33
identity: string,
34
) {
35
const sockets = new JupyterSockets(config, identity);
36
await sockets.init();
37
return sockets;
38
}
39
40
export class JupyterSockets extends EventEmitter {
41
private sockets?: {
42
iopub: Subscriber;
43
stdin: Dealer;
44
shell: Dealer;
45
control: Dealer;
46
};
47
48
constructor(
49
private config: JupyterConnectionInfo,
50
private identity: string,
51
) {
52
super();
53
}
54
55
close = () => {
56
if (this.sockets != null) {
57
for (const name in this.sockets) {
58
// close doesn't work and shouldn't be used according to the
59
// zmq docs: https://zeromq.github.io/zeromq.js/classes/Dealer.html#close
60
delete this.sockets[name];
61
}
62
delete this.sockets;
63
}
64
};
65
66
send = (message: JupyterMessage) => {
67
if (this.sockets == null) {
68
throw Error("JupyterSockets not initialized");
69
}
70
const name = message.channel;
71
if (name == "iopub") {
72
throw Error("name must not be iopub");
73
}
74
const socket = this.sockets[name];
75
if (socket == null) {
76
throw Error(`invalid socket name '${name}'`);
77
}
78
79
logger.debug("send message", message);
80
const jMessage = new Message(message);
81
socket.send(
82
jMessage._encode(
83
this.config.signature_scheme.slice("hmac-".length),
84
this.config.key,
85
),
86
);
87
};
88
89
init = async () => {
90
const names = Object.keys(ZMQ_TYPE);
91
const v = await Promise.all(
92
names.map((name: JupyterSocketName) => this.createSocket(name)),
93
);
94
const sockets: any = {};
95
let i = 0;
96
for (const name of names) {
97
sockets[name] = v[i];
98
i += 1;
99
}
100
this.sockets = sockets;
101
};
102
103
private createSocket = async (name: JupyterSocketName) => {
104
const zmqType = ZMQ_TYPE[name];
105
let socket;
106
if (zmqType == "dealer") {
107
socket = new Dealer({ routingId: this.identity });
108
} else if (zmqType == "sub") {
109
socket = new Subscriber();
110
} else {
111
throw Error(`bug -- invalid zmqType ${zmqType}`);
112
}
113
const url = connectionString(this.config, name);
114
await socket.connect(url);
115
// console.log("connected to", url);
116
this.listen(name, socket);
117
return socket;
118
};
119
120
private listen = async (name: JupyterSocketName, socket) => {
121
if (ZMQ_TYPE[name] == "sub") {
122
// subscribe to everything --
123
// https://zeromq.github.io/zeromq.js/classes/Subscriber.html#subscribe
124
socket.subscribe();
125
}
126
for await (const data of socket) {
127
const mesg = Message._decode(
128
data,
129
this.config.signature_scheme.slice("hmac-".length),
130
this.config.key,
131
);
132
this.emit(name, mesg);
133
}
134
};
135
}
136
137
export const connectionString = (
138
config: JupyterConnectionInfo,
139
name: JupyterSocketName,
140
) => {
141
const portDelimiter = config.transport === "tcp" ? ":" : "-";
142
const port = config[`${name}_port` as keyof JupyterConnectionInfo];
143
if (!port) {
144
throw new Error(`Port not found for name "${name}"`);
145
}
146
return `${config.transport}://${config.ip}${portDelimiter}${port}`;
147
};
148
149