Skip to content

Commit 8525d37

Browse files
authored
Start polling after data computation is started (#128575) (#128729)
1 parent c89122a commit 8525d37

File tree

2 files changed

+16
-16
lines changed

2 files changed

+16
-16
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -125,14 +125,6 @@ void startComputeOnRemoteCluster(
125125
updateExecutionInfo(executionInfo, clusterAlias, finalResponse.get());
126126
return profiles;
127127
}))) {
128-
var remoteSink = exchangeService.newRemoteSink(groupTask, childSessionId, transportService, cluster.connection);
129-
exchangeSource.addRemoteSink(
130-
remoteSink,
131-
failFast,
132-
() -> pagesFetched.set(true),
133-
queryPragmas.concurrentExchangeClients(),
134-
computeListener.acquireAvoid()
135-
);
136128
var remotePlan = new RemoteClusterPlan(plan, cluster.concreteIndices, cluster.originalIndices);
137129
var clusterRequest = new ClusterComputeRequest(clusterAlias, childSessionId, configuration, remotePlan);
138130
final ActionListener<ComputeResponse> clusterListener = computeListener.acquireCompute().map(r -> {
@@ -147,6 +139,14 @@ void startComputeOnRemoteCluster(
147139
TransportRequestOptions.EMPTY,
148140
new ActionListenerResponseHandler<>(clusterListener, ComputeResponse::new, esqlExecutor)
149141
);
142+
var remoteSink = exchangeService.newRemoteSink(groupTask, childSessionId, transportService, cluster.connection);
143+
exchangeSource.addRemoteSink(
144+
remoteSink,
145+
failFast,
146+
() -> pagesFetched.set(true),
147+
queryPragmas.concurrentExchangeClients(),
148+
computeListener.acquireAvoid()
149+
);
150150
}
151151
})
152152
);

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -171,14 +171,6 @@ protected void sendRequest(
171171
try (
172172
var computeListener = new ComputeListener(threadPool, onGroupFailure, l.map(ignored -> nodeResponseRef.get()))
173173
) {
174-
final var remoteSink = exchangeService.newRemoteSink(groupTask, childSessionId, transportService, connection);
175-
exchangeSource.addRemoteSink(
176-
remoteSink,
177-
configuration.allowPartialResults() == false,
178-
pagesFetched::incrementAndGet,
179-
queryPragmas.concurrentExchangeClients(),
180-
computeListener.acquireAvoid()
181-
);
182174
final boolean sameNode = transportService.getLocalNode().getId().equals(connection.getNode().getId());
183175
var dataNodeRequest = new DataNodeRequest(
184176
childSessionId,
@@ -202,6 +194,14 @@ protected void sendRequest(
202194
return r.profiles();
203195
}), DataNodeComputeResponse::new, esqlExecutor)
204196
);
197+
final var remoteSink = exchangeService.newRemoteSink(groupTask, childSessionId, transportService, connection);
198+
exchangeSource.addRemoteSink(
199+
remoteSink,
200+
configuration.allowPartialResults() == false,
201+
pagesFetched::incrementAndGet,
202+
queryPragmas.concurrentExchangeClients(),
203+
computeListener.acquireAvoid()
204+
);
205205
}
206206
})
207207
);

0 commit comments

Comments
 (0)