Skip to content

Commit 3965cb8

Browse files
committed
Prototype for re-attach
1 parent 7614923 commit 3965cb8

13 files changed

+301
-57
lines changed

src/kernels/execution/cellExecution.ts

Lines changed: 63 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,21 @@ export class CellExecutionFactory {
4444
private readonly requestListener: CellExecutionMessageHandlerService
4545
) {}
4646

47-
public create(cell: NotebookCell, code: string | undefined, metadata: Readonly<KernelConnectionMetadata>) {
47+
public create(
48+
cell: NotebookCell,
49+
code: string | undefined,
50+
metadata: Readonly<KernelConnectionMetadata>,
51+
resumeExecutionMsgId?: string
52+
) {
4853
// eslint-disable-next-line @typescript-eslint/no-use-before-define
49-
return CellExecution.fromCell(cell, code, metadata, this.controller, this.requestListener);
54+
return CellExecution.fromCell(
55+
cell,
56+
code,
57+
metadata,
58+
this.controller,
59+
this.requestListener,
60+
resumeExecutionMsgId
61+
);
5062
}
5163
}
5264

@@ -88,7 +100,8 @@ export class CellExecution implements IDisposable {
88100
private readonly codeOverride: string | undefined,
89101
private readonly kernelConnection: Readonly<KernelConnectionMetadata>,
90102
private readonly controller: IKernelController,
91-
private readonly requestListener: CellExecutionMessageHandlerService
103+
private readonly requestListener: CellExecutionMessageHandlerService,
104+
private readonly resumeExecutionMsgId?: string
92105
) {
93106
workspace.onDidCloseTextDocument(
94107
(e) => {
@@ -134,11 +147,55 @@ export class CellExecution implements IDisposable {
134147
code: string | undefined,
135148
metadata: Readonly<KernelConnectionMetadata>,
136149
controller: IKernelController,
137-
requestListener: CellExecutionMessageHandlerService
150+
requestListener: CellExecutionMessageHandlerService,
151+
resumeExecutionMsgId?: string
138152
) {
139-
return new CellExecution(cell, code, metadata, controller, requestListener);
153+
return new CellExecution(cell, code, metadata, controller, requestListener, resumeExecutionMsgId);
154+
}
155+
public async resume(session: IKernelConnectionSession) {
156+
if (this.cancelHandled) {
157+
traceCellMessage(this.cell, 'Not resuming as it was cancelled');
158+
return;
159+
}
160+
traceCellMessage(this.cell, 'Start resuming execution');
161+
traceInfoIfCI(`Cell Exec (resuming) contents ${this.cell.document.getText().substring(0, 50)}...`);
162+
if (!this.canExecuteCell()) {
163+
// End state is bool | undefined not optional. Undefined == not success or failure
164+
this.execution?.end(undefined);
165+
this.execution = undefined;
166+
this._result.resolve();
167+
return;
168+
}
169+
if (this.started) {
170+
traceCellMessage(this.cell, 'Cell has already been started yet CellExecution.Start invoked again');
171+
traceError(`Cell has already been started yet CellExecution.Start invoked again ${this.cell.index}`);
172+
// TODO: Send telemetry this should never happen, if it does we have problems.
173+
return this.result;
174+
}
175+
this.started = true;
176+
177+
this.startTime = new Date().getTime();
178+
activeNotebookCellExecution.set(this.cell.notebook, this.execution);
179+
this.execution?.start(this.startTime);
180+
NotebookCellStateTracker.setCellState(this.cell, NotebookCellExecutionState.Executing);
181+
this.stopWatch.reset();
182+
183+
this.cellExecutionHandler = this.requestListener.registerListenerForResumingExecution(this.cell, {
184+
kernel: session.kernel!,
185+
cellExecution: this.execution!,
186+
msg_id: this.resumeExecutionMsgId!,
187+
onErrorHandlingExecuteRequestIOPubMessage: (error) => {
188+
traceError(`Cell (index = ${this.cell.index}) execution completed with errors (2).`, error);
189+
// If not a restart error, then tell the subscriber
190+
this.completedWithErrors(error);
191+
}
192+
});
193+
this.cellExecutionHandler.completed.finally(() => this.completedSuccessfully());
140194
}
141195
public async start(session: IKernelConnectionSession) {
196+
if (this.resumeExecutionMsgId) {
197+
return this.resume(session);
198+
}
142199
if (this.cancelHandled) {
143200
traceCellMessage(this.cell, 'Not starting as it was cancelled');
144201
return;
@@ -348,7 +405,7 @@ export class CellExecution implements IDisposable {
348405
traceError(`Cell execution failed without request, for cell Index ${this.cell.index}`, ex);
349406
return this.completedWithErrors(ex);
350407
}
351-
this.cellExecutionHandler = this.requestListener.registerListener(this.cell, {
408+
this.cellExecutionHandler = this.requestListener.registerListenerForExecution(this.cell, {
352409
kernel: session.kernel!,
353410
cellExecution: this.execution!,
354411
request: this.request,

src/kernels/execution/cellExecutionMessageHandler.ts

Lines changed: 49 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ import { IKernelController, ITracebackFormatter } from '../../kernels/types';
4141
import { handleTensorBoardDisplayDataOutput } from './executionHelpers';
4242
import { Identifiers, WIDGET_MIMETYPE } from '../../platform/common/constants';
4343
import { CellOutputDisplayIdTracker } from './cellDisplayIdTracker';
44+
import { createDeferred } from '../../platform/common/utils/async';
4445

4546
// Helper interface for the set_next_input execute reply payload
4647
interface ISetNextInputPayload {
@@ -166,18 +167,26 @@ export class CellExecutionMessageHandler implements IDisposable {
166167
* or for any subsequent requests as a result of outputs sending custom messages.
167168
*/
168169
private readonly ownedRequestMsgIds = new Set<string>();
170+
private readonly _completed = createDeferred<void>();
171+
public get completed() {
172+
return this._completed.promise;
173+
}
169174
constructor(
170175
public readonly cell: NotebookCell,
171176
private readonly applicationService: IApplicationShell,
172177
private readonly controller: IKernelController,
173178
private readonly context: IExtensionContext,
174179
private readonly formatters: ITracebackFormatter[],
175180
private readonly kernel: Kernel.IKernelConnection,
176-
request: Kernel.IShellFuture<KernelMessage.IExecuteRequestMsg, KernelMessage.IExecuteReplyMsg>,
177-
cellExecution: NotebookCellExecution
181+
private readonly request:
182+
| Kernel.IShellFuture<KernelMessage.IExecuteRequestMsg, KernelMessage.IExecuteReplyMsg>
183+
| undefined,
184+
cellExecution: NotebookCellExecution,
185+
executionMessageId: string
178186
) {
179-
this.executeRequestMessageId = request.msg.header.msg_id;
180-
this.ownedRequestMsgIds.add(request.msg.header.msg_id);
187+
this._completed.promise.catch(noop);
188+
this.executeRequestMessageId = executionMessageId;
189+
this.ownedRequestMsgIds.add(executionMessageId);
181190
workspace.onDidChangeNotebookDocument(
182191
(e) => {
183192
if (!isJupyterNotebook(e.notebook)) {
@@ -203,27 +212,29 @@ export class CellExecutionMessageHandler implements IDisposable {
203212
this.kernel.anyMessage.connect(this.onKernelAnyMessage, this);
204213
this.kernel.iopubMessage.connect(this.onKernelIOPubMessage, this);
205214

206-
request.onIOPub = () => {
207-
// Cell has been deleted or the like.
208-
if (this.cell.document.isClosed && !this.completedExecution) {
209-
request.dispose();
210-
}
211-
};
212-
request.onReply = (msg) => {
213-
// Cell has been deleted or the like.
214-
if (this.cell.document.isClosed) {
215-
request.dispose();
216-
return;
217-
}
218-
this.handleReply(msg);
219-
};
220-
request.onStdin = this.handleInputRequest.bind(this);
221-
request.done
222-
.finally(() => {
223-
this.completedExecution = true;
224-
this.endCellExecution();
225-
})
226-
.catch(noop);
215+
if (request) {
216+
request.onIOPub = () => {
217+
// Cell has been deleted or the like.
218+
if (this.cell.document.isClosed && !this.completedExecution) {
219+
request.dispose();
220+
}
221+
};
222+
request.onReply = (msg) => {
223+
// Cell has been deleted or the like.
224+
if (this.cell.document.isClosed) {
225+
request.dispose();
226+
return;
227+
}
228+
this.handleReply(msg);
229+
};
230+
request.onStdin = this.handleInputRequest.bind(this);
231+
request.done
232+
.finally(() => {
233+
this.completedExecution = true;
234+
this.endCellExecution();
235+
})
236+
.catch(noop);
237+
}
227238
}
228239
/**
229240
* This method is called when all execution has been completed (successfully or failed).
@@ -254,12 +265,24 @@ export class CellExecutionMessageHandler implements IDisposable {
254265
this.prompts.clear();
255266
this.clearLastUsedStreamOutput();
256267
this.execution = undefined;
268+
this._completed.resolve();
257269
}
258270
private onKernelAnyMessage(_: unknown, { direction, msg }: Kernel.IAnyMessageArgs) {
259271
if (this.cell.document.isClosed) {
260272
return this.endCellExecution();
261273
}
262-
274+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
275+
console.log((msg as any).msg_type);
276+
if (
277+
!this.request &&
278+
'msg_type' in msg &&
279+
(msg.msg_type === 'kernel_info_reply' ||
280+
msg.msg_type === 'execute_input' ||
281+
msg.msg_type === 'execute_reply')
282+
) {
283+
this.completedExecution = true;
284+
return this.endCellExecution();
285+
}
263286
// We're only interested in messages after execution has completed.
264287
// See https://github.com/microsoft/vscode-jupyter/issues/9503 for more information.
265288
if (direction !== 'send' || !this.completedExecution) {

src/kernels/execution/cellExecutionMessageHandlerService.ts

Lines changed: 46 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,13 @@
22
// Licensed under the MIT License.
33

44
import type { Kernel, KernelMessage } from '@jupyterlab/services';
5-
import { NotebookCell, NotebookCellExecution, NotebookDocument, workspace } from 'vscode';
5+
import { Memento, NotebookCell, NotebookCellExecution, NotebookDocument, workspace } from 'vscode';
66
import { IKernelController, ITracebackFormatter } from '../../kernels/types';
77
import { IApplicationShell } from '../../platform/common/application/types';
88
import { disposeAllDisposables } from '../../platform/common/helpers';
99
import { IDisposable, IExtensionContext } from '../../platform/common/types';
1010
import { CellExecutionMessageHandler } from './cellExecutionMessageHandler';
11+
import { noop } from '../../platform/common/utils/misc';
1112

1213
/**
1314
* Allows registering a CellExecutionMessageHandler for a given execution.
@@ -20,7 +21,8 @@ export class CellExecutionMessageHandlerService {
2021
private readonly appShell: IApplicationShell,
2122
private readonly controller: IKernelController,
2223
private readonly context: IExtensionContext,
23-
private readonly formatters: ITracebackFormatter[]
24+
private readonly formatters: ITracebackFormatter[],
25+
private readonly workspaceStorage: Memento
2426
) {
2527
workspace.onDidChangeNotebookDocument(
2628
(e) => {
@@ -42,7 +44,7 @@ export class CellExecutionMessageHandlerService {
4244
this.notebook.getCells().forEach((cell) => this.messageHandlers.get(cell)?.dispose());
4345
}
4446
}
45-
public registerListener(
47+
public registerListenerForExecution(
4648
cell: NotebookCell,
4749
options: {
4850
kernel: Kernel.IKernelConnection;
@@ -54,6 +56,12 @@ export class CellExecutionMessageHandlerService {
5456
this.notebook = cell.notebook;
5557
// Always dispose any previous handlers & create new ones.
5658
this.messageHandlers.get(cell)?.dispose();
59+
this.workspaceStorage
60+
.update(`LAST_EXECUTED_CELL_${cell.notebook.uri.toString()}`, {
61+
index: cell.index,
62+
msg_id: options.request?.msg.header.msg_id
63+
})
64+
.then(noop, noop);
5765
const handler = new CellExecutionMessageHandler(
5866
cell,
5967
this.appShell,
@@ -62,7 +70,41 @@ export class CellExecutionMessageHandlerService {
6270
this.formatters,
6371
options.kernel,
6472
options.request,
65-
options.cellExecution
73+
options.cellExecution,
74+
options.request.msg.header.msg_id
75+
);
76+
// This object must be kept in memory has it monitors the kernel messages.
77+
this.messageHandlers.set(cell, handler);
78+
return handler;
79+
}
80+
public registerListenerForResumingExecution(
81+
cell: NotebookCell,
82+
options: {
83+
kernel: Kernel.IKernelConnection;
84+
msg_id: string;
85+
cellExecution: NotebookCellExecution;
86+
onErrorHandlingExecuteRequestIOPubMessage: (error: Error) => void;
87+
}
88+
): CellExecutionMessageHandler {
89+
this.notebook = cell.notebook;
90+
// Always dispose any previous handlers & create new ones.
91+
this.messageHandlers.get(cell)?.dispose();
92+
this.workspaceStorage
93+
.update(`LAST_EXECUTED_CELL_${cell.notebook.uri.toString()}`, {
94+
index: cell.index,
95+
msg_id: options.msg_id
96+
})
97+
.then(noop, noop);
98+
const handler = new CellExecutionMessageHandler(
99+
cell,
100+
this.appShell,
101+
this.controller,
102+
this.context,
103+
this.formatters,
104+
options.kernel,
105+
undefined,
106+
options.cellExecution,
107+
options.msg_id
66108
);
67109
// This object must be kept in memory has it monitors the kernel messages.
68110
this.messageHandlers.set(cell, handler);

src/kernels/execution/cellExecutionQueue.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,25 @@ export class CellExecutionQueue implements Disposable {
7777
// Start executing the cells.
7878
this.startExecutingCells();
7979
}
80+
81+
/**
82+
* Queue the cell for execution & start processing it immediately.
83+
*/
84+
public resumeCell(cell: NotebookCell, msg_id: string): void {
85+
const existingCellExecution = this.queueOfCellsToExecute.find((item) => item.cell === cell);
86+
if (existingCellExecution) {
87+
traceCellMessage(cell, 'Use existing cell execution');
88+
return;
89+
}
90+
const cellExecution = this.executionFactory.create(cell, '', this.metadata, msg_id);
91+
this.disposables.push(cellExecution);
92+
this.queueOfCellsToExecute.push(cellExecution);
93+
94+
traceCellMessage(cell, 'User queued cell for execution');
95+
96+
// Start executing the cells.
97+
this.startExecutingCells();
98+
}
8099
/**
81100
* Cancel all cells that have been queued & wait for them to complete.
82101
* @param {boolean} [forced=false]

src/kernels/kernel.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -684,7 +684,6 @@ abstract class BaseKernel implements IBaseKernel {
684684
// So that we don't have problems with ipywidgets, always register the default ipywidgets comm target.
685685
// Restart sessions and retries might make this hard to do correctly otherwise.
686686
session.registerCommTarget(Identifiers.DefaultCommTarget, noop);
687-
688687
if (this.kernelConnectionMetadata.kind === 'connectToLiveRemoteKernel') {
689688
// As users can have IPyWidgets at any point in time, we need to determine the version of ipywidgets
690689
// This must happen early on as the state of the kernel needs to be synced with the Kernel in the webview (renderer)

0 commit comments

Comments
 (0)