Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
pwang347 committed Jan 13, 2025
1 parent 6d670b6 commit 742464f
Show file tree
Hide file tree
Showing 8 changed files with 165 additions and 23 deletions.
12 changes: 11 additions & 1 deletion src/api.proposed.kernelStartHook.d.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

import type { Event, Uri } from 'vscode';
import type { CancellationToken, Event, Uri } from 'vscode';

declare module './api' {
export interface Kernels {
Expand All @@ -11,6 +11,16 @@ declare module './api' {
onDidStart: Event<{
uri: Uri;
kernel: Kernel;
token: CancellationToken;
/**
* Allows to pause the event loop until the provided thenable resolved.
* This can be useful to ensure startup code is executed before user code.
*
* *Note:* This function can only be called during event dispatch.
*
* @param thenable A thenable that delays kernel startup.
*/
waitUntil(thenable: Thenable<unknown>): void;
}>;
}
}
25 changes: 19 additions & 6 deletions src/kernels/kernel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ import {
Memento,
CancellationError,
window,
workspace
workspace,
CancellationToken
} from 'vscode';
import {
CodeSnippets,
Expand Down Expand Up @@ -66,6 +67,7 @@ import { KernelInterruptTimeoutError } from './errors/kernelInterruptTimeoutErro
import { dispose } from '../platform/common/utils/lifecycle';
import { getCachedVersion, getEnvironmentType } from '../platform/interpreter/helpers';
import { getNotebookTelemetryTracker } from './telemetry/notebookTelemetry';
import { AsyncEmitter } from '../platform/common/event';

const widgetVersionOutPrefix = 'e976ee50-99ed-4aba-9b6b-9dcd5634d07d:IPyWidgets:';
/**
Expand Down Expand Up @@ -129,7 +131,7 @@ abstract class BaseKernel implements IBaseKernel {
get onStarted(): Event<void> {
return this._onStarted.event;
}
get onPostInitialized(): Event<void> {
get onPostInitialized(): Event<{ token: CancellationToken; waitUntil(thenable: Thenable<unknown>): void }> {
return this._onPostInitialized.event;
}
get onDisposed(): Event<void> {
Expand Down Expand Up @@ -183,7 +185,10 @@ abstract class BaseKernel implements IBaseKernel {
private readonly _onStatusChanged = new EventEmitter<KernelMessage.Status>();
private readonly _onRestarted = new EventEmitter<void>();
private readonly _onStarted = new EventEmitter<void>();
private readonly _onPostInitialized = new EventEmitter<void>();
private readonly _onPostInitialized = new AsyncEmitter<{
waitUntil: (thenable: Thenable<unknown>) => void;
token: CancellationToken;
}>();
private readonly _onDisposed = new EventEmitter<void>();
private _jupyterSessionPromise?: Promise<IKernelSession>;
private readonly hookedSessionForEvents = new WeakSet<IKernelSession>();
Expand All @@ -196,6 +201,10 @@ abstract class BaseKernel implements IBaseKernel {
public get restarting() {
return this._restartPromise || Promise.resolve();
}
private _postInitializingDeferred = createDeferred<void>();
public get postInitializing() {
return this._postInitializingDeferred.promise;
}
constructor(
public readonly id: string,
public readonly uri: Uri,
Expand Down Expand Up @@ -257,10 +266,12 @@ abstract class BaseKernel implements IBaseKernel {
return this.startJupyterSession(options).then((result) => {
// If we started and the UI is no longer disabled (ie., a user executed a cell)
// then we can signal that the kernel was created and can be used by third-party extensions.
// We also only want to fire off a single event here.
// We also only want to fire off a single. event here.
if (!options?.disableUI && !this._postInitializedOnStart) {
this._onPostInitialized.fire();
this._postInitializedOnStart = true;
void this._onPostInitialized.fireAsync({}, this.startCancellation.token).then(() => {
this._postInitializingDeferred.resolve();
});
}
return result;
});
Expand Down Expand Up @@ -428,7 +439,9 @@ abstract class BaseKernel implements IBaseKernel {
this._onRestarted.fire();

// Also signal that the kernel post initialization completed.
this._onPostInitialized.fire();
void this._onPostInitialized.fireAsync({}, this.startCancellation.token).then(() => {
this._postInitializingDeferred.resolve();
});
} catch (ex) {
logger.error(`Failed to restart kernel ${getDisplayPath(this.uri)}`, ex);
throw ex;
Expand Down
4 changes: 4 additions & 0 deletions src/kernels/kernelExecution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,10 @@ export class NotebookKernelExecution implements INotebookKernelExecution {
const sessionPromise = this.kernel.restarting.then(() => this.kernel.start(new DisplayOptions(false)));

traceCellMessage(cell, `NotebookKernelExecution.executeCell (3), ${getDisplayPath(cell.notebook.uri)}`);

// Wait for the kernel to complete post initialization before queueing the cell in case
// we need to allow extensions to run code before the initial user-triggered execution
await this.kernel.postInitializing;
const executionQueue = this.getOrCreateCellExecutionQueue(cell.notebook, sessionPromise);
executionQueue.queueCell(cell, codeOverride);
let success = true;
Expand Down
26 changes: 21 additions & 5 deletions src/kernels/kernelProvider.base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// Licensed under the MIT License.

import type { KernelMessage } from '@jupyterlab/services';
import { Event, EventEmitter, NotebookDocument, Uri, workspace } from 'vscode';
import { CancellationToken, Event, EventEmitter, NotebookDocument, Uri, workspace } from 'vscode';
import { logger } from '../platform/logging';
import { getDisplayPath } from '../platform/common/platform/fs-paths';
import { IAsyncDisposable, IAsyncDisposableRegistry, IDisposableRegistry } from '../platform/common/types';
Expand Down Expand Up @@ -36,7 +36,11 @@ export abstract class BaseCoreKernelProvider implements IKernelProvider {
protected readonly _onDidCreateKernel = new EventEmitter<IKernel>();
protected readonly _onDidDisposeKernel = new EventEmitter<IKernel>();
protected readonly _onKernelStatusChanged = new EventEmitter<{ status: KernelMessage.Status; kernel: IKernel }>();
protected readonly _onDidPostInitializeKernel = new EventEmitter<IKernel>();
protected readonly _onDidPostInitializeKernel = new EventEmitter<{
kernel: IKernel;
token: CancellationToken;
waitUntil(thenable: Thenable<unknown>): void;
}>();
public readonly onKernelStatusChanged = this._onKernelStatusChanged.event;
public get kernels() {
const kernels = new Set<IKernel>();
Expand Down Expand Up @@ -75,7 +79,11 @@ export abstract class BaseCoreKernelProvider implements IKernelProvider {
public get onDidCreateKernel(): Event<IKernel> {
return this._onDidCreateKernel.event;
}
public get onDidPostInitializeKernel(): Event<IKernel> {
public get onDidPostInitializeKernel(): Event<{
kernel: IKernel;
token: CancellationToken;
waitUntil(thenable: Thenable<unknown>): void;
}> {
return this._onDidPostInitializeKernel.event;
}
public get(uriOrNotebook: Uri | NotebookDocument | string): IKernel | undefined {
Expand Down Expand Up @@ -192,7 +200,11 @@ export abstract class BaseThirdPartyKernelProvider implements IThirdPartyKernelP
protected readonly _onDidStartKernel = new EventEmitter<IThirdPartyKernel>();
protected readonly _onDidCreateKernel = new EventEmitter<IThirdPartyKernel>();
protected readonly _onDidDisposeKernel = new EventEmitter<IThirdPartyKernel>();
protected readonly _onDidPostInitializeKernel = new EventEmitter<IThirdPartyKernel>();
protected readonly _onDidPostInitializeKernel = new EventEmitter<{
kernel: IThirdPartyKernel;
token: CancellationToken;
waitUntil(thenable: Thenable<unknown>): void;
}>();
protected readonly _onKernelStatusChanged = new EventEmitter<{
status: KernelMessage.Status;
kernel: IThirdPartyKernel;
Expand Down Expand Up @@ -234,7 +246,11 @@ export abstract class BaseThirdPartyKernelProvider implements IThirdPartyKernelP
public get onDidCreateKernel(): Event<IThirdPartyKernel> {
return this._onDidCreateKernel.event;
}
public get onDidPostInitializeKernel(): Event<IThirdPartyKernel> {
public get onDidPostInitializeKernel(): Event<{
kernel: IThirdPartyKernel;
token: CancellationToken;
waitUntil(thenable: Thenable<unknown>): void;
}> {
return this._onDidPostInitializeKernel.event;
}
public get(uri: Uri | string): IThirdPartyKernel | undefined {
Expand Down
8 changes: 4 additions & 4 deletions src/kernels/kernelProvider.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ export class KernelProvider extends BaseCoreKernelProvider {
this.disposables
);
kernel.onPostInitialized(
() => {
this._onDidPostInitializeKernel.fire(kernel);
({ token, waitUntil }) => {
this._onDidPostInitializeKernel.fire({ kernel, token, waitUntil });
},
this,
this.disposables
Expand Down Expand Up @@ -160,8 +160,8 @@ export class ThirdPartyKernelProvider extends BaseThirdPartyKernelProvider {
this.disposables
);
kernel.onPostInitialized(
() => {
this._onDidPostInitializeKernel.fire(kernel);
({ token, waitUntil }) => {
this._onDidPostInitializeKernel.fire({ kernel, token, waitUntil });
},
this,
this.disposables
Expand Down
9 changes: 7 additions & 2 deletions src/kernels/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -355,8 +355,9 @@ export interface IBaseKernel extends IAsyncDisposable {
readonly onDisposed: Event<void>;
readonly onStarted: Event<void>;
readonly onRestarted: Event<void>;
readonly onPostInitialized: Event<void>;
readonly onPostInitialized: Event<{ token: CancellationToken; waitUntil(thenable: Thenable<unknown>): void }>;
readonly restarting: Promise<void>;
readonly postInitializing: Promise<void>;
readonly status: KernelMessage.Status;
readonly disposed: boolean;
readonly disposing: boolean;
Expand Down Expand Up @@ -513,7 +514,11 @@ export interface IBaseKernelProvider<T extends IBaseKernel> extends IAsyncDispos
onDidRestartKernel: Event<T>;
onDidDisposeKernel: Event<T>;
onKernelStatusChanged: Event<{ status: KernelMessage.Status; kernel: T }>;
onDidPostInitializeKernel: Event<T>;
onDidPostInitializeKernel: Event<{
kernel: T;
token: CancellationToken;
waitUntil(thenable: Thenable<unknown>): void;
}>;
}

/**
Expand Down
82 changes: 82 additions & 0 deletions src/platform/common/event.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

import { Event, CancellationToken, Disposable } from 'vscode';

export interface IWaitUntil {
token: CancellationToken;
waitUntil(thenable: Promise<unknown>): void;
}
export type IWaitUntilData<T> = Omit<Omit<T, 'waitUntil'>, 'token'>;

// Based on AsyncEmitter in https://github.com/microsoft/vscode/blob/88e6face01795b161523824ba075444fad55466a/src/vs/base/common/event.ts#L1303
export class AsyncEmitter<T extends IWaitUntil> {
private _listeners: Set<(e: T) => unknown> = new Set();
private _asyncDeliveryQueue?: Array<[(ev: T) => void, IWaitUntilData<T>]>;
private _event: Event<T> | undefined;

public get event(): Event<T> {
if (!this._event) {
this._event = (listener: (e: T) => unknown): Disposable => {
this._listeners.add(listener);
return {
dispose: () => this._listeners.delete(listener)
};
};
}
return this._event;
}

async fireAsync(data: IWaitUntilData<T>, token: CancellationToken): Promise<void> {
if (!this._listeners) {
return;
}

if (!this._asyncDeliveryQueue) {
this._asyncDeliveryQueue = [];
}

for (const listener of this._listeners) {
this._asyncDeliveryQueue!.push([listener, data]);
}

while (this._asyncDeliveryQueue.length > 0 && !token.isCancellationRequested) {
const [listener, data] = this._asyncDeliveryQueue.shift()!;
const thenables: Promise<unknown>[] = [];

const event = <T>{
...data,
token,
waitUntil: (p: Promise<unknown>): void => {
if (Object.isFrozen(thenables)) {
throw new Error('waitUntil can NOT be called asynchronous');
}
thenables.push(p);
}
};

try {
listener(event);
} catch (e) {
console.error(e);
continue;
}

// freeze thenables-collection to enforce sync-calls to
// wait until and then wait for all thenables to resolve
Object.freeze(thenables);

await Promise.allSettled(thenables).then((values) => {
for (const value of values) {
if (value.status === 'rejected') {
console.error(value.reason);
}
}
});
}
}

dispose(): void {
this._listeners.clear();
}
}
22 changes: 17 additions & 5 deletions src/standalone/api/kernels/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

import { Uri, workspace, EventEmitter } from 'vscode';
import { Uri, workspace, EventEmitter, CancellationToken } from 'vscode';
import { Kernel, Kernels } from '../../../api';
import { ServiceContainer } from '../../../platform/ioc/container';
import { IKernel, IKernelProvider } from '../../../kernels/types';
Expand All @@ -12,7 +12,9 @@ import { initializeInteractiveOrNotebookTelemetryBasedOnUserAction } from '../..
import { IDisposableRegistry } from '../../../platform/common/types';

const kernelCache = new WeakMap<IKernel, Kernel>();
let _onDidStart: EventEmitter<{ uri: Uri; kernel: Kernel }> | undefined = undefined;
let _onDidStart:
| EventEmitter<{ uri: Uri; kernel: Kernel; token: CancellationToken; waitUntil(thenable: Thenable<unknown>): void }>
| undefined = undefined;

function getWrappedKernel(kernel: IKernel, extensionId: string) {
let wrappedKernel = kernelCache.get(kernel) || createKernelApiForExtension(extensionId, kernel);
Expand Down Expand Up @@ -55,11 +57,21 @@ export function getKernelsApi(extensionId: string): Kernels {
if (!_onDidStart) {
const kernelProvider = ServiceContainer.instance.get<IKernelProvider>(IKernelProvider);
const disposableRegistry = ServiceContainer.instance.get<IDisposableRegistry>(IDisposableRegistry);
_onDidStart = new EventEmitter<{ uri: Uri; kernel: Kernel }>();
_onDidStart = new EventEmitter<{
uri: Uri;
kernel: Kernel;
token: CancellationToken;
waitUntil(thenable: Thenable<unknown>): void;
}>();

disposableRegistry.push(
kernelProvider.onDidPostInitializeKernel((kernel) => {
_onDidStart?.fire({ uri: kernel.uri, kernel: getWrappedKernel(kernel, extensionId) });
kernelProvider.onDidPostInitializeKernel(({ kernel, token, waitUntil }) => {
_onDidStart?.fire({
uri: kernel.uri,
kernel: getWrappedKernel(kernel, extensionId),
token,
waitUntil
});
}),
_onDidStart,
{ dispose: () => (_onDidStart = undefined) }
Expand Down

0 comments on commit 742464f

Please sign in to comment.