Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
export function tooltipObservable(
channels: Channels,
editor: CMI,
message: JupyterMessage
) {
const tip$ = channels.pipe(
childOf(message),
ofMessageType("inspect_reply"),
map((entry: JupyterMessage) => entry.content),
first(),
map(results => ({
dict: results.data
}))
);
// On subscription, send the message
return Observable.create((observer: Observer) => {
const subscription = tip$.subscribe(observer);
channels.next(message);
return subscription;
});
}
export function codeCompleteObservable(
channels: Channels,
editor: Doc,
message: JupyterMessage
) {
const completion$ = channels.pipe(
childOf(message),
ofMessageType("complete_reply"),
map(entry => entry.content),
first(),
map(expand_completions(editor)),
timeout(15000) // Large timeout for slower languages; this is just here to make sure we eventually clean up resources
);
// On subscription, send the message
return Observable.create((observer: Observer) => {
const subscription = completion$.subscribe(observer);
channels.next(message);
return subscription;
});
}
export function codeCompleteObservable(
channels: Channels,
editor: CMI,
message: Object
) {
const completion$ = channels.pipe(
childOf(message),
ofMessageType("complete_reply"),
pluck("content"),
first(),
map(expand_completions(editor)),
timeout(2000)
); // 2s
// On subscription, send the message
return Observable.create(observer => {
const subscription = completion$.subscribe(observer);
channels.next(message);
return subscription;
});
}
export function executeCellStream(
channels: Channels,
id: string,
message: ExecuteRequest,
contentRef: ContentRef
) {
const executeRequest = message;
// All the streams intended for all frontends
const cellMessages: Observable> = channels.pipe(childOf(executeRequest), share());
// All the payload streams, intended for one user
const payloadStream = cellMessages.pipe(payloads());
const cellAction$ = merge(
payloadStream.pipe(
map((payload: PayloadMessage) =>
actions.acceptPayloadMessage({ id, payload, contentRef })
)
),
// All actions for updating cell status
cellMessages.pipe(
kernelStatuses() as any,
map((status: string) =>
actions.updateCellStatus({ id, status, contentRef })
export function executeCellStream(
channels: Channels,
id: string,
message: ExecuteRequest,
contentRef: ContentRef
) {
if (!channels || !channels.pipe) {
return throwError(new Error("kernel not connected"));
}
const executeRequest = message;
// All the streams intended for all frontends
const cellMessages = channels.pipe(
childOf(executeRequest),
share()
);
// All the payload streams, intended for one user
const payloadStream = cellMessages.pipe(payloads());
const cellAction$ = merge(
payloadStream.pipe(
map(payload => actions.acceptPayloadMessage({ id, payload, contentRef }))
),
// All actions for updating cell status
cellMessages.pipe(
kernelStatuses(),
map(status => actions.updateCellStatus({ id, status, contentRef }))
),
channels: Channels,
id: string,
message: ExecuteRequest,
contentRef: ContentRef
) {
if (!channels || !channels.pipe) {
return throwError(new Error("kernel not connected"));
}
const executeRequest = message;
// All the streams intended for all frontends
const cellMessages: Observable> = channels.pipe(childOf(executeRequest), share());
// All the payload streams, intended for one user
const payloadStream = cellMessages.pipe(payloads());
const cellAction$ = merge(
payloadStream.pipe(
map((payload: PayloadMessage) =>
actions.acceptPayloadMessage({ id, payload, contentRef })
)
),
// All actions for updating cell status
cellMessages.pipe(
kernelStatuses() as any,
map((status: string) =>
actions.updateCellStatus({ id, status, contentRef })
shutdownEpic(timeoutMs: number = 2000) {
const request: JupyterMessage<"shutdown_request", any> = shutdownRequest({
restart: false
});
// Try to make a shutdown request
// If we don't get a response within X time, force a shutdown
// Either way do the same cleanup
const shutDownHandling = this.channels.pipe(
/* Get the first response to our message request. */
childOf(request),
ofMessageType("shutdown_reply"),
first(),
// If we got a reply, great! :)
map((msg: { content: { restart: boolean } }) => {
return {
status: "shutting down",
content: msg.content
};
}),
/**
* If we don't get a response within timeoutMs, then throw an error.
*/
timeout(timeoutMs),
catchError(err => of({ error: err, status: "error" })),
/**
* Even if we don't receive a shutdown_reply from the kernel to our
export function acquireKernelInfo(
channels: Channels,
kernelRef: KernelRef,
contentRef: ContentRef
) {
const message = createMessage("kernel_info_request");
const obs = channels.pipe(
childOf(message),
ofMessageType("kernel_info_reply"),
first(),
mergeMap(msg => {
const c = msg.content;
const l = c.language_info;
const info: KernelInfo = {
protocolVersion: c.protocol_version,
implementation: c.implementation,
implementationVersion: c.implementation_version,
banner: c.banner,
helpLinks: c.help_links,
languageName: l.name,
languageVersion: l.version,
mimetype: l.mimetype,
fileExtension: l.file_extension,
hookupReplyCallbacks(message: JupyterMessage, callbacks: any) {
this.kernel.channels.pipe(childOf(message)).subscribe((reply: any) => {
if (
reply.channel == "shell" &&
callbacks.shell &&
callbacks.shell.reply
) {
callbacks.shell.reply(reply);
} else if (reply.channel == "stdin" && callbacks.input) {
callbacks.input(reply);
} else if (reply.channel == "iopub" && callbacks.iopub) {
if (callbacks.iopub.status && reply.header.msg_type === "status") {
callbacks.iopub.status(reply);
} else if (
callbacks.iopub.clear_output &&
reply.header.msg_type === "clear_output"
) {
callbacks.iopub.clear_output(reply);