16
16
import org .elasticsearch .action .support .ActionFilters ;
17
17
import org .elasticsearch .action .support .CountDownActionListener ;
18
18
import org .elasticsearch .action .support .IndicesOptions ;
19
- import org .elasticsearch .action .support .master .AcknowledgedResponse ;
20
19
import org .elasticsearch .action .support .master .TransportMasterNodeAction ;
21
20
import org .elasticsearch .cluster .ClusterState ;
22
21
import org .elasticsearch .cluster .block .ClusterBlockException ;
@@ -107,53 +106,46 @@ protected void masterOperation(
107
106
request .indices ()
108
107
);
109
108
List <UpdateDataStreamSettingsAction .DataStreamSettingsResponse > dataStreamSettingsResponse = new ArrayList <>();
110
- CountDownActionListener countDownListener = new CountDownActionListener (dataStreamNames .size () + 1 , new ActionListener <>() {
111
- @ Override
112
- public void onResponse (Void unused ) {
113
- listener .onResponse (new UpdateDataStreamSettingsAction .Response (dataStreamSettingsResponse ));
114
- }
115
-
116
- @ Override
117
- public void onFailure (Exception e ) {
118
- listener .onFailure (e );
119
- }
120
- });
109
+ CountDownActionListener countDownListener = new CountDownActionListener (
110
+ dataStreamNames .size () + 1 ,
111
+ listener .delegateFailure (
112
+ (responseActionListener , unused ) -> responseActionListener .onResponse (
113
+ new UpdateDataStreamSettingsAction .Response (dataStreamSettingsResponse )
114
+ )
115
+ )
116
+ );
121
117
countDownListener .onResponse (null );
122
118
for (String dataStreamName : dataStreamNames ) {
123
119
updateSingleDataStream (
124
120
dataStreamName ,
125
121
request .getSettings (),
122
+ request .isDryRun (),
126
123
request .masterNodeTimeout (),
127
124
request .ackTimeout (),
128
- new ActionListener <>() {
129
- @ Override
130
- public void onResponse (UpdateDataStreamSettingsAction .DataStreamSettingsResponse dataStreamResponse ) {
131
- dataStreamSettingsResponse .add (dataStreamResponse );
132
- countDownListener .onResponse (null );
133
- }
134
-
135
- @ Override
136
- public void onFailure (Exception e ) {
137
- dataStreamSettingsResponse .add (
138
- new UpdateDataStreamSettingsAction .DataStreamSettingsResponse (
139
- dataStreamName ,
140
- false ,
141
- e .getMessage (),
142
- EMPTY ,
143
- EMPTY ,
144
- UpdateDataStreamSettingsAction .DataStreamSettingsResponse .IndicesSettingsResult .EMPTY
145
- )
146
- );
147
- countDownListener .onResponse (null );
148
- }
149
- }
125
+ ActionListener .wrap (dataStreamResponse -> {
126
+ dataStreamSettingsResponse .add (dataStreamResponse );
127
+ countDownListener .onResponse (null );
128
+ }, e -> {
129
+ dataStreamSettingsResponse .add (
130
+ new UpdateDataStreamSettingsAction .DataStreamSettingsResponse (
131
+ dataStreamName ,
132
+ false ,
133
+ e .getMessage (),
134
+ EMPTY ,
135
+ EMPTY ,
136
+ UpdateDataStreamSettingsAction .DataStreamSettingsResponse .IndicesSettingsResult .EMPTY
137
+ )
138
+ );
139
+ countDownListener .onResponse (null );
140
+ })
150
141
);
151
142
}
152
143
}
153
144
154
145
private void updateSingleDataStream (
155
146
String dataStreamName ,
156
147
Settings settingsOverrides ,
148
+ boolean dryRun ,
157
149
TimeValue masterNodeTimeout ,
158
150
TimeValue ackTimeout ,
159
151
ActionListener <UpdateDataStreamSettingsAction .DataStreamSettingsResponse > listener
@@ -198,36 +190,30 @@ private void updateSingleDataStream(
198
190
ackTimeout ,
199
191
dataStreamName ,
200
192
settingsOverrides ,
201
- new ActionListener <>() {
202
- @ Override
203
- public void onResponse (AcknowledgedResponse acknowledgedResponse ) {
204
- if (acknowledgedResponse .isAcknowledged ()) {
205
- updateSettingsOnIndices (dataStreamName , settingsOverrides , masterNodeTimeout , ackTimeout , listener );
206
- } else {
207
- listener .onResponse (
208
- new UpdateDataStreamSettingsAction .DataStreamSettingsResponse (
209
- dataStreamName ,
210
- false ,
211
- "Updating settings not accepted for unknown reasons" ,
212
- EMPTY ,
213
- EMPTY ,
214
- UpdateDataStreamSettingsAction .DataStreamSettingsResponse .IndicesSettingsResult .EMPTY
215
- )
216
- );
217
- }
218
- }
219
-
220
- @ Override
221
- public void onFailure (Exception e ) {
222
- listener .onFailure (e );
193
+ dryRun ,
194
+ listener .delegateFailure ((dataStreamSettingsResponseActionListener , dataStream ) -> {
195
+ if (dataStream != null ) {
196
+ updateSettingsOnIndices (dataStream , settingsOverrides , dryRun , masterNodeTimeout , ackTimeout , listener );
197
+ } else {
198
+ dataStreamSettingsResponseActionListener .onResponse (
199
+ new UpdateDataStreamSettingsAction .DataStreamSettingsResponse (
200
+ dataStreamName ,
201
+ false ,
202
+ "Updating settings not accepted for unknown reasons" ,
203
+ EMPTY ,
204
+ EMPTY ,
205
+ UpdateDataStreamSettingsAction .DataStreamSettingsResponse .IndicesSettingsResult .EMPTY
206
+ )
207
+ );
223
208
}
224
- }
209
+ })
225
210
);
226
211
}
227
212
228
213
private void updateSettingsOnIndices (
229
- String dataStreamName ,
214
+ DataStream dataStream ,
230
215
Settings requestSettings ,
216
+ boolean dryRun ,
231
217
TimeValue masterNodeTimeout ,
232
218
TimeValue ackTimeout ,
233
219
ActionListener <UpdateDataStreamSettingsAction .DataStreamSettingsResponse > listener
@@ -243,26 +229,15 @@ private void updateSettingsOnIndices(
243
229
appliedToDataStreamOnly .add (settingName );
244
230
}
245
231
}
246
- final List <Index > concreteIndices = clusterService .state ()
247
- .projectState (projectResolver .getProjectId ())
248
- .metadata ()
249
- .dataStreams ()
250
- .get (dataStreamName )
251
- .getIndices ();
232
+ final List <Index > concreteIndices = dataStream .getIndices ();
252
233
final List <UpdateDataStreamSettingsAction .DataStreamSettingsResponse .IndexSettingError > indexSettingErrors = new ArrayList <>();
253
234
254
- CountDownActionListener indexCountDownListener = new CountDownActionListener (concreteIndices .size () + 1 , new ActionListener <>() {
255
- // Called when all indices for all settings are complete
256
- @ Override
257
- public void onResponse (Void unused ) {
258
- DataStream dataStream = clusterService .state ()
259
- .projectState (projectResolver .getProjectId ())
260
- .metadata ()
261
- .dataStreams ()
262
- .get (dataStreamName );
263
- listener .onResponse (
235
+ CountDownActionListener indexCountDownListener = new CountDownActionListener (
236
+ concreteIndices .size () + 1 ,
237
+ listener .delegateFailure (
238
+ (dataStreamSettingsResponseActionListener , unused ) -> dataStreamSettingsResponseActionListener .onResponse (
264
239
new UpdateDataStreamSettingsAction .DataStreamSettingsResponse (
265
- dataStreamName ,
240
+ dataStream . getName () ,
266
241
true ,
267
242
null ,
268
243
settingsFilter .filter (dataStream .getSettings ()),
@@ -275,37 +250,33 @@ public void onResponse(Void unused) {
275
250
indexSettingErrors
276
251
)
277
252
)
278
- );
279
- }
253
+ )
254
+ )
255
+ );
280
256
281
- @ Override
282
- public void onFailure (Exception e ) {
283
- listener .onFailure (e );
284
- }
285
- });
286
257
indexCountDownListener .onResponse (null ); // handles the case where there were zero indices
287
258
Settings applyToIndexSettings = builder ().loadFromMap (settingsToApply ).build ();
288
259
for (Index index : concreteIndices ) {
289
- updateSettingsOnSingleIndex (index , applyToIndexSettings , masterNodeTimeout , ackTimeout , new ActionListener <>() {
290
- @ Override
291
- public void onResponse (UpdateDataStreamSettingsAction .DataStreamSettingsResponse .IndexSettingError indexSettingError ) {
260
+ updateSettingsOnSingleIndex (
261
+ index ,
262
+ applyToIndexSettings ,
263
+ dryRun ,
264
+ masterNodeTimeout ,
265
+ ackTimeout ,
266
+ indexCountDownListener .delegateFailure ((listener1 , indexSettingError ) -> {
292
267
if (indexSettingError != null ) {
293
268
indexSettingErrors .add (indexSettingError );
294
269
}
295
- indexCountDownListener .onResponse (null );
296
- }
297
-
298
- @ Override
299
- public void onFailure (Exception e ) {
300
- indexCountDownListener .onFailure (e );
301
- }
302
- });
270
+ listener1 .onResponse (null );
271
+ })
272
+ );
303
273
}
304
274
}
305
275
306
276
private void updateSettingsOnSingleIndex (
307
277
Index index ,
308
278
Settings requestSettings ,
279
+ boolean dryRun ,
309
280
TimeValue masterNodeTimeout ,
310
281
TimeValue ackTimeout ,
311
282
ActionListener <UpdateDataStreamSettingsAction .DataStreamSettingsResponse .IndexSettingError > listener
@@ -326,19 +297,24 @@ private void updateSettingsOnSingleIndex(
326
297
);
327
298
return ;
328
299
}
329
- updateSettingsService .updateSettings (
330
- new UpdateSettingsClusterStateUpdateRequest (
331
- projectResolver .getProjectId (),
332
- masterNodeTimeout ,
333
- ackTimeout ,
334
- requestSettings ,
335
- UpdateSettingsClusterStateUpdateRequest .OnExisting .OVERWRITE ,
336
- UpdateSettingsClusterStateUpdateRequest .OnStaticSetting .REOPEN_INDICES ,
337
- index
338
- ),
339
- new ActionListener <>() {
340
- @ Override
341
- public void onResponse (AcknowledgedResponse response ) {
300
+ if (dryRun ) {
301
+ /*
302
+ * This is as far as we go with dry run mode. We get the benefit of having checked that all the indices that will be touced
303
+ * are not blocked, but there is no value in going beyond this. So just respond to the listener and move on.
304
+ */
305
+ listener .onResponse (null );
306
+ } else {
307
+ updateSettingsService .updateSettings (
308
+ new UpdateSettingsClusterStateUpdateRequest (
309
+ projectResolver .getProjectId (),
310
+ masterNodeTimeout ,
311
+ ackTimeout ,
312
+ requestSettings ,
313
+ UpdateSettingsClusterStateUpdateRequest .OnExisting .OVERWRITE ,
314
+ UpdateSettingsClusterStateUpdateRequest .OnStaticSetting .REOPEN_INDICES ,
315
+ index
316
+ ),
317
+ ActionListener .wrap (response -> {
342
318
UpdateDataStreamSettingsAction .DataStreamSettingsResponse .IndexSettingError error ;
343
319
if (response .isAcknowledged () == false ) {
344
320
error = new UpdateDataStreamSettingsAction .DataStreamSettingsResponse .IndexSettingError (
@@ -349,16 +325,13 @@ public void onResponse(AcknowledgedResponse response) {
349
325
error = null ;
350
326
}
351
327
listener .onResponse (error );
352
- }
353
-
354
- @ Override
355
- public void onFailure (Exception e ) {
356
- listener .onResponse (
328
+ },
329
+ e -> listener .onResponse (
357
330
new UpdateDataStreamSettingsAction .DataStreamSettingsResponse .IndexSettingError (index .getName (), e .getMessage ())
358
- );
359
- }
360
- }
361
- );
331
+ )
332
+ )
333
+ );
334
+ }
362
335
}
363
336
364
337
}
0 commit comments