Skip to content

Commit aab4a16

Browse files
idegtiarenkojoshua-adams-1
authored andcommitted
Start polling after data computation is started (elastic#128575)
1 parent e84d27c commit aab4a16

File tree

2 files changed

+16
-16
lines changed

2 files changed

+16
-16
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -125,14 +125,6 @@ void startComputeOnRemoteCluster(
125125
updateExecutionInfo(executionInfo, clusterAlias, finalResponse.get());
126126
return completionInfo;
127127
}))) {
128-
var remoteSink = exchangeService.newRemoteSink(groupTask, childSessionId, transportService, cluster.connection);
129-
exchangeSource.addRemoteSink(
130-
remoteSink,
131-
failFast,
132-
() -> pagesFetched.set(true),
133-
queryPragmas.concurrentExchangeClients(),
134-
computeListener.acquireAvoid()
135-
);
136128
var remotePlan = new RemoteClusterPlan(plan, cluster.concreteIndices, cluster.originalIndices);
137129
var clusterRequest = new ClusterComputeRequest(clusterAlias, childSessionId, configuration, remotePlan);
138130
final ActionListener<ComputeResponse> clusterListener = computeListener.acquireCompute().map(r -> {
@@ -147,6 +139,14 @@ void startComputeOnRemoteCluster(
147139
TransportRequestOptions.EMPTY,
148140
new ActionListenerResponseHandler<>(clusterListener, ComputeResponse::new, esqlExecutor)
149141
);
142+
var remoteSink = exchangeService.newRemoteSink(groupTask, childSessionId, transportService, cluster.connection);
143+
exchangeSource.addRemoteSink(
144+
remoteSink,
145+
failFast,
146+
() -> pagesFetched.set(true),
147+
queryPragmas.concurrentExchangeClients(),
148+
computeListener.acquireAvoid()
149+
);
150150
}
151151
})
152152
);

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -176,14 +176,6 @@ protected void sendRequest(
176176
try (
177177
var computeListener = new ComputeListener(threadPool, onGroupFailure, l.map(ignored -> nodeResponseRef.get()))
178178
) {
179-
final var remoteSink = exchangeService.newRemoteSink(groupTask, childSessionId, transportService, connection);
180-
exchangeSource.addRemoteSink(
181-
remoteSink,
182-
configuration.allowPartialResults() == false,
183-
pagesFetched::incrementAndGet,
184-
queryPragmas.concurrentExchangeClients(),
185-
computeListener.acquireAvoid()
186-
);
187179
final boolean sameNode = transportService.getLocalNode().getId().equals(connection.getNode().getId());
188180
var dataNodeRequest = new DataNodeRequest(
189181
childSessionId,
@@ -207,6 +199,14 @@ protected void sendRequest(
207199
return r.completionInfo();
208200
}), DataNodeComputeResponse::new, esqlExecutor)
209201
);
202+
final var remoteSink = exchangeService.newRemoteSink(groupTask, childSessionId, transportService, connection);
203+
exchangeSource.addRemoteSink(
204+
remoteSink,
205+
configuration.allowPartialResults() == false,
206+
pagesFetched::incrementAndGet,
207+
queryPragmas.concurrentExchangeClients(),
208+
computeListener.acquireAvoid()
209+
);
210210
}
211211
})
212212
);

0 commit comments

Comments
 (0)