16
16
import org .elasticsearch .action .support .ActionFilters ;
17
17
import org .elasticsearch .action .support .CountDownActionListener ;
18
18
import org .elasticsearch .action .support .IndicesOptions ;
19
- import org .elasticsearch .action .support .master .AcknowledgedResponse ;
20
19
import org .elasticsearch .action .support .master .TransportMasterNodeAction ;
21
20
import org .elasticsearch .cluster .ClusterState ;
22
21
import org .elasticsearch .cluster .block .ClusterBlockException ;
@@ -102,53 +101,46 @@ protected void masterOperation(
102
101
request .indices ()
103
102
);
104
103
List <UpdateDataStreamSettingsAction .DataStreamSettingsResponse > dataStreamSettingsResponse = new ArrayList <>();
105
- CountDownActionListener countDownListener = new CountDownActionListener (dataStreamNames .size () + 1 , new ActionListener <>() {
106
- @ Override
107
- public void onResponse (Void unused ) {
108
- listener .onResponse (new UpdateDataStreamSettingsAction .Response (dataStreamSettingsResponse ));
109
- }
110
-
111
- @ Override
112
- public void onFailure (Exception e ) {
113
- listener .onFailure (e );
114
- }
115
- });
104
+ CountDownActionListener countDownListener = new CountDownActionListener (
105
+ dataStreamNames .size () + 1 ,
106
+ listener .delegateFailure (
107
+ (responseActionListener , unused ) -> responseActionListener .onResponse (
108
+ new UpdateDataStreamSettingsAction .Response (dataStreamSettingsResponse )
109
+ )
110
+ )
111
+ );
116
112
countDownListener .onResponse (null );
117
113
for (String dataStreamName : dataStreamNames ) {
118
114
updateSingleDataStream (
119
115
dataStreamName ,
120
116
request .getSettings (),
117
+ request .isDryRun (),
121
118
request .masterNodeTimeout (),
122
119
request .ackTimeout (),
123
- new ActionListener <>() {
124
- @ Override
125
- public void onResponse (UpdateDataStreamSettingsAction .DataStreamSettingsResponse dataStreamResponse ) {
126
- dataStreamSettingsResponse .add (dataStreamResponse );
127
- countDownListener .onResponse (null );
128
- }
129
-
130
- @ Override
131
- public void onFailure (Exception e ) {
132
- dataStreamSettingsResponse .add (
133
- new UpdateDataStreamSettingsAction .DataStreamSettingsResponse (
134
- dataStreamName ,
135
- false ,
136
- e .getMessage (),
137
- EMPTY ,
138
- EMPTY ,
139
- UpdateDataStreamSettingsAction .DataStreamSettingsResponse .IndicesSettingsResult .EMPTY
140
- )
141
- );
142
- countDownListener .onResponse (null );
143
- }
144
- }
120
+ ActionListener .wrap (dataStreamResponse -> {
121
+ dataStreamSettingsResponse .add (dataStreamResponse );
122
+ countDownListener .onResponse (null );
123
+ }, e -> {
124
+ dataStreamSettingsResponse .add (
125
+ new UpdateDataStreamSettingsAction .DataStreamSettingsResponse (
126
+ dataStreamName ,
127
+ false ,
128
+ e .getMessage (),
129
+ EMPTY ,
130
+ EMPTY ,
131
+ UpdateDataStreamSettingsAction .DataStreamSettingsResponse .IndicesSettingsResult .EMPTY
132
+ )
133
+ );
134
+ countDownListener .onResponse (null );
135
+ })
145
136
);
146
137
}
147
138
}
148
139
149
140
private void updateSingleDataStream (
150
141
String dataStreamName ,
151
142
Settings settingsOverrides ,
143
+ boolean dryRun ,
152
144
TimeValue masterNodeTimeout ,
153
145
TimeValue ackTimeout ,
154
146
ActionListener <UpdateDataStreamSettingsAction .DataStreamSettingsResponse > listener
@@ -187,13 +179,17 @@ private void updateSingleDataStream(
187
179
);
188
180
return ;
189
181
}
190
- metadataDataStreamsService .updateSettings (masterNodeTimeout , ackTimeout , dataStreamName , settingsOverrides , new ActionListener <>() {
191
- @ Override
192
- public void onResponse (AcknowledgedResponse acknowledgedResponse ) {
193
- if (acknowledgedResponse .isAcknowledged ()) {
194
- updateSettingsOnIndices (dataStreamName , settingsOverrides , masterNodeTimeout , ackTimeout , listener );
182
+ metadataDataStreamsService .updateSettings (
183
+ masterNodeTimeout ,
184
+ ackTimeout ,
185
+ dataStreamName ,
186
+ settingsOverrides ,
187
+ dryRun ,
188
+ listener .delegateFailure ((dataStreamSettingsResponseActionListener , dataStream ) -> {
189
+ if (dataStream != null ) {
190
+ updateSettingsOnIndices (dataStream , settingsOverrides , dryRun , masterNodeTimeout , ackTimeout , listener );
195
191
} else {
196
- listener .onResponse (
192
+ dataStreamSettingsResponseActionListener .onResponse (
197
193
new UpdateDataStreamSettingsAction .DataStreamSettingsResponse (
198
194
dataStreamName ,
199
195
false ,
@@ -204,18 +200,14 @@ public void onResponse(AcknowledgedResponse acknowledgedResponse) {
204
200
)
205
201
);
206
202
}
207
- }
208
-
209
- @ Override
210
- public void onFailure (Exception e ) {
211
- listener .onFailure (e );
212
- }
213
- });
203
+ })
204
+ );
214
205
}
215
206
216
207
private void updateSettingsOnIndices (
217
- String dataStreamName ,
208
+ DataStream dataStream ,
218
209
Settings requestSettings ,
210
+ boolean dryRun ,
219
211
TimeValue masterNodeTimeout ,
220
212
TimeValue ackTimeout ,
221
213
ActionListener <UpdateDataStreamSettingsAction .DataStreamSettingsResponse > listener
@@ -231,17 +223,15 @@ private void updateSettingsOnIndices(
231
223
appliedToDataStreamOnly .add (settingName );
232
224
}
233
225
}
234
- final List <Index > concreteIndices = clusterService . state (). metadata (). dataStreams (). get ( dataStreamName ) .getIndices ();
226
+ final List <Index > concreteIndices = dataStream .getIndices ();
235
227
final List <UpdateDataStreamSettingsAction .DataStreamSettingsResponse .IndexSettingError > indexSettingErrors = new ArrayList <>();
236
228
237
- CountDownActionListener indexCountDownListener = new CountDownActionListener (concreteIndices .size () + 1 , new ActionListener <>() {
238
- // Called when all indices for all settings are complete
239
- @ Override
240
- public void onResponse (Void unused ) {
241
- DataStream dataStream = clusterService .state ().metadata ().dataStreams ().get (dataStreamName );
242
- listener .onResponse (
229
+ CountDownActionListener indexCountDownListener = new CountDownActionListener (
230
+ concreteIndices .size () + 1 ,
231
+ listener .delegateFailure (
232
+ (dataStreamSettingsResponseActionListener , unused ) -> dataStreamSettingsResponseActionListener .onResponse (
243
233
new UpdateDataStreamSettingsAction .DataStreamSettingsResponse (
244
- dataStreamName ,
234
+ dataStream . getName () ,
245
235
true ,
246
236
null ,
247
237
settingsFilter .filter (dataStream .getSettings ()),
@@ -252,37 +242,33 @@ public void onResponse(Void unused) {
252
242
indexSettingErrors
253
243
)
254
244
)
255
- );
256
- }
245
+ )
246
+ )
247
+ );
257
248
258
- @ Override
259
- public void onFailure (Exception e ) {
260
- listener .onFailure (e );
261
- }
262
- });
263
249
indexCountDownListener .onResponse (null ); // handles the case where there were zero indices
264
250
Settings applyToIndexSettings = builder ().loadFromMap (settingsToApply ).build ();
265
251
for (Index index : concreteIndices ) {
266
- updateSettingsOnSingleIndex (index , applyToIndexSettings , masterNodeTimeout , ackTimeout , new ActionListener <>() {
267
- @ Override
268
- public void onResponse (UpdateDataStreamSettingsAction .DataStreamSettingsResponse .IndexSettingError indexSettingError ) {
252
+ updateSettingsOnSingleIndex (
253
+ index ,
254
+ applyToIndexSettings ,
255
+ dryRun ,
256
+ masterNodeTimeout ,
257
+ ackTimeout ,
258
+ indexCountDownListener .delegateFailure ((listener1 , indexSettingError ) -> {
269
259
if (indexSettingError != null ) {
270
260
indexSettingErrors .add (indexSettingError );
271
261
}
272
- indexCountDownListener .onResponse (null );
273
- }
274
-
275
- @ Override
276
- public void onFailure (Exception e ) {
277
- indexCountDownListener .onFailure (e );
278
- }
279
- });
262
+ listener1 .onResponse (null );
263
+ })
264
+ );
280
265
}
281
266
}
282
267
283
268
private void updateSettingsOnSingleIndex (
284
269
Index index ,
285
270
Settings requestSettings ,
271
+ boolean dryRun ,
286
272
TimeValue masterNodeTimeout ,
287
273
TimeValue ackTimeout ,
288
274
ActionListener <UpdateDataStreamSettingsAction .DataStreamSettingsResponse .IndexSettingError > listener
@@ -302,18 +288,23 @@ private void updateSettingsOnSingleIndex(
302
288
);
303
289
return ;
304
290
}
305
- updateSettingsService .updateSettings (
306
- new UpdateSettingsClusterStateUpdateRequest (
307
- masterNodeTimeout ,
308
- ackTimeout ,
309
- requestSettings ,
310
- UpdateSettingsClusterStateUpdateRequest .OnExisting .OVERWRITE ,
311
- UpdateSettingsClusterStateUpdateRequest .OnStaticSetting .REOPEN_INDICES ,
312
- index
313
- ),
314
- new ActionListener <>() {
315
- @ Override
316
- public void onResponse (AcknowledgedResponse response ) {
291
+ if (dryRun ) {
292
+ /*
293
+ * This is as far as we go with dry run mode. We get the benefit of having checked that all the indices that will be touced
294
+ * are not blocked, but there is no value in going beyond this. So just respond to the listener and move on.
295
+ */
296
+ listener .onResponse (null );
297
+ } else {
298
+ updateSettingsService .updateSettings (
299
+ new UpdateSettingsClusterStateUpdateRequest (
300
+ masterNodeTimeout ,
301
+ ackTimeout ,
302
+ requestSettings ,
303
+ UpdateSettingsClusterStateUpdateRequest .OnExisting .OVERWRITE ,
304
+ UpdateSettingsClusterStateUpdateRequest .OnStaticSetting .REOPEN_INDICES ,
305
+ index
306
+ ),
307
+ ActionListener .wrap (response -> {
317
308
UpdateDataStreamSettingsAction .DataStreamSettingsResponse .IndexSettingError error ;
318
309
if (response .isAcknowledged () == false ) {
319
310
error = new UpdateDataStreamSettingsAction .DataStreamSettingsResponse .IndexSettingError (
@@ -324,16 +315,13 @@ public void onResponse(AcknowledgedResponse response) {
324
315
error = null ;
325
316
}
326
317
listener .onResponse (error );
327
- }
328
-
329
- @ Override
330
- public void onFailure (Exception e ) {
331
- listener .onResponse (
318
+ },
319
+ e -> listener .onResponse (
332
320
new UpdateDataStreamSettingsAction .DataStreamSettingsResponse .IndexSettingError (index .getName (), e .getMessage ())
333
- );
334
- }
335
- }
336
- );
321
+ )
322
+ )
323
+ );
324
+ }
337
325
}
338
326
339
327
}
0 commit comments