Skip to content

Commit ea5945f

Browse files
committed
QueryManager.stop: unsubscribe ObservableQuery consumers
1 parent 8d5ef31 commit ea5945f

File tree

3 files changed

+25
-15
lines changed

3 files changed

+25
-15
lines changed

src/core/ObservableQuery.ts

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -252,19 +252,17 @@ export class ObservableQuery<
252252
query: TypedDocumentNode<TData, TVariables>;
253253
variables: TVariables;
254254
};
255-
private input: Subject<
255+
private input!: Subject<
256256
QueryNotification.Value<TData> & {
257257
query: DocumentNode | TypedDocumentNode<TData, TVariables>;
258258
variables: TVariables;
259259
meta: Meta;
260260
}
261261
>;
262-
private subject: BehaviorSubject<
262+
private subject!: BehaviorSubject<
263263
SubjectValue<MaybeMasked<TData>, TVariables>
264264
>;
265-
private readonly observable: Observable<
266-
ApolloQueryResult<MaybeMasked<TData>>
267-
>;
265+
private observable!: Observable<ApolloQueryResult<MaybeMasked<TData>>>;
268266

269267
private isTornDown: boolean;
270268
private queryManager: QueryManager;
@@ -339,6 +337,13 @@ export class ObservableQuery<
339337
variables: this.getVariablesWithDefaults(options.variables),
340338
};
341339

340+
this.initializeObservablesQueue();
341+
342+
const opDef = getOperationDefinition(this.query);
343+
this.queryName = opDef && opDef.name && opDef.name.value;
344+
}
345+
346+
private initializeObservablesQueue() {
342347
this.subject = new BehaviorSubject<
343348
SubjectValue<MaybeMasked<TData>, TVariables>
344349
>({
@@ -438,26 +443,23 @@ export class ObservableQuery<
438443
// be able to close `this.input`
439444
this.input.complete = () => {};
440445
this.input.pipe(this.operator).subscribe(this.subject);
441-
442-
const opDef = getOperationDefinition(this.query);
443-
this.queryName = opDef && opDef.name && opDef.name.value;
444446
}
445447

446448
// We can't use Observable['subscribe'] here as the type as it conflicts with
447449
// the ability to infer T from Subscribable<T>. This limits the surface area
448450
// to the non-deprecated signature which works properly with type inference.
449-
public subscribe: (
451+
public subscribe!: (
450452
observer:
451453
| Partial<Observer<ApolloQueryResult<MaybeMasked<TData>>>>
452454
| ((value: ApolloQueryResult<MaybeMasked<TData>>) => void)
453455
) => Subscription;
454456

455-
public pipe: Observable<ApolloQueryResult<MaybeMasked<TData>>>["pipe"];
457+
public pipe!: Observable<ApolloQueryResult<MaybeMasked<TData>>>["pipe"];
456458

457459
public [Symbol.observable]!: () => Subscribable<
458460
ApolloQueryResult<MaybeMasked<TData>>
459461
>;
460-
public ["@@observable"]: () => Subscribable<
462+
public ["@@observable"]!: () => Subscribable<
461463
ApolloQueryResult<MaybeMasked<TData>>
462464
>;
463465

@@ -1435,6 +1437,16 @@ Did you mean to call refetch(variables) instead of refetch({ variables })?`,
14351437
return this.subject.observed;
14361438
}
14371439

1440+
/**
1441+
* @internal
1442+
* Tears down the `ObservableQuery` and stops all active operations by sending a `complete` notification.
1443+
*/
1444+
public stop() {
1445+
this.subject.complete();
1446+
this.initializeObservablesQueue();
1447+
this.tearDownQuery();
1448+
}
1449+
14381450
private tearDownQuery() {
14391451
if (this.isTornDown) return;
14401452

src/core/QueryManager.ts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -247,9 +247,7 @@ export class QueryManager {
247247
* to dispose of this QueryManager instance.
248248
*/
249249
public stop() {
250-
this.obsQueries.forEach((oq) => {
251-
oq["tearDownQuery"]();
252-
});
250+
this.obsQueries.forEach((oq) => oq.stop());
253251

254252
this.cancelPendingFetches(
255253
newInvariantError("QueryManager stopped while query was in flight")

src/core/__tests__/ObservableQuery.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5026,7 +5026,7 @@ describe("ObservableQuery", () => {
50265026
expect(onWatchUpdatedCount).toBe(1);
50275027
client.stop();
50285028

5029-
await expect(stream).not.toEmitAnything();
5029+
await expect(stream).toComplete();
50305030
});
50315031
});
50325032

0 commit comments

Comments
 (0)