@@ -78,7 +78,7 @@ public class FlatMapPipelinedCursor<T, V> implements RecordCursor<V> {
78
78
* should generally be mediated through one of the {@code synchronized} methods.
79
79
*/
80
80
@ Nonnull
81
- private final Queue <PipelineQueueEntry > pipeline ;
81
+ private final Queue <PipelineQueueEntry < V , T > > pipeline ;
82
82
/**
83
83
* The next value to pull from the outer cursor. This value is cleared out by {@link #close()}, so
84
84
* some care should be taken when accessing it. In general, it should be set via one of the {@code synchronized}
@@ -124,7 +124,7 @@ public CompletableFuture<RecordCursorResult<V>> onNext() {
124
124
return CompletableFuture .completedFuture (lastResult );
125
125
}
126
126
return AsyncUtil .whileTrue (this ::tryToFillPipeline , getExecutor ()).thenApply (vignore -> {
127
- PipelineQueueEntry peeked = peekPipeline ();
127
+ PipelineQueueEntry < V , T > peeked = peekPipeline ();
128
128
if (peeked == null ) {
129
129
throw new CancellationException ("cursor closed while iterating" );
130
130
}
@@ -190,12 +190,12 @@ protected CompletableFuture<Boolean> tryToFillPipeline() {
190
190
191
191
if (!outerNext .isDone ()) {
192
192
// Still waiting for outer future. Check back when it has finished.
193
- final PipelineQueueEntry nextEntry = peekPipeline ();
193
+ final PipelineQueueEntry < V , T > nextEntry = peekPipeline ();
194
194
if (nextEntry == null ) {
195
195
return outerNext .thenApply (vignore -> true ); // loop back to process outer result
196
196
} else {
197
197
// keep looping unless we get something from the next entry's inner cursor or the next cursor is ready
198
- final CompletableFuture <PipelineQueueEntry > innerPipelineFuture = nextEntry .getNextInnerPipelineFuture ();
198
+ final CompletableFuture <PipelineQueueEntry < V , T > > innerPipelineFuture = nextEntry .getNextInnerPipelineFuture ();
199
199
return CompletableFuture .anyOf (outerNext , innerPipelineFuture ).thenApply (vignore ->
200
200
!innerPipelineFuture .isDone () || innerPipelineFuture .join ().doesNotHaveReturnableResult ());
201
201
}
@@ -220,12 +220,12 @@ protected CompletableFuture<Boolean> tryToFillPipeline() {
220
220
}
221
221
final RecordCursor <V > innerCursor = innerCursorFunction .apply (outerValue , innerContinuation );
222
222
outerContinuation = outerResult .getContinuation ();
223
- addEntryToPipeline (new PipelineQueueEntry (innerCursor , priorOuterContinuation , outerResult , outerCheckValue ));
223
+ addEntryToPipeline (PipelineQueueEntry . newInstanceWithBackgroundComputationOfFirstResult (innerCursor , priorOuterContinuation , outerResult , outerCheckValue ));
224
224
outerNextFuture = null ; // done with this future, advance outer cursor next time
225
225
// keep looping to fill pipeline
226
226
} else { // don't have next, and won't ever with this cursor
227
227
// Add sentinel to end of pipeline
228
- addEntryToPipeline (new PipelineQueueEntry ( null , outerContinuation , outerResult , null ));
228
+ addEntryToPipeline (PipelineQueueEntry . newSentinel ( outerContinuation , outerResult ));
229
229
outerExhausted = true ;
230
230
// Wait for next entry, as if pipeline were full
231
231
break ;
@@ -239,7 +239,7 @@ protected CompletableFuture<Boolean> tryToFillPipeline() {
239
239
// 4) A concurrent operation cancelled this cursor and so the pipeline is empty
240
240
// The only case where the element should be null is when the cursor has been closed, so return CANCELLED in
241
241
// that case.
242
- PipelineQueueEntry peeked = peekPipeline ();
242
+ PipelineQueueEntry < V , T > peeked = peekPipeline ();
243
243
if (peeked == null ) {
244
244
return ALREADY_CANCELLED ;
245
245
}
@@ -263,45 +263,42 @@ private synchronized CompletableFuture<RecordCursorResult<T>> ensureOuterCursorA
263
263
return outerNextFuture ;
264
264
}
265
265
266
- private synchronized void addEntryToPipeline (PipelineQueueEntry pipelineQueueEntry ) {
266
+ private synchronized void addEntryToPipeline (PipelineQueueEntry < V , T > pipelineQueueEntry ) {
267
267
if (closed ) {
268
268
pipelineQueueEntry .close ();
269
269
}
270
- pipelineQueueEntry .getNextInnerPipelineFuture ();
271
270
pipeline .add (pipelineQueueEntry );
272
271
}
273
272
274
273
private synchronized boolean continueFillingPipeline () {
275
274
return !closed && !outerExhausted && pipeline .size () < pipelineSize ;
276
275
}
277
276
278
- private synchronized PipelineQueueEntry peekPipeline () {
277
+ private synchronized PipelineQueueEntry < V , T > peekPipeline () {
279
278
// Not a lot in this method, but the underlying queue isn't thread safe so
280
279
return pipeline .peek ();
281
280
}
282
281
283
- private class PipelineQueueEntry {
282
+ private static class PipelineQueueEntry < V , T > {
284
283
final RecordCursor <V > innerCursor ;
285
284
final RecordCursorContinuation priorOuterContinuation ;
286
285
final RecordCursorResult <T > outerResult ;
287
286
final byte [] outerCheckValue ;
288
287
289
288
private CompletableFuture <RecordCursorResult <V >> innerFuture ;
290
289
291
- public PipelineQueueEntry (RecordCursor <V > innerCursor ,
290
+ private PipelineQueueEntry (RecordCursor <V > innerCursor ,
292
291
RecordCursorContinuation priorOuterContinuation ,
293
292
RecordCursorResult <T > outerResult ,
294
293
byte [] outerCheckValue ) {
295
294
this .innerCursor = innerCursor ;
296
295
this .priorOuterContinuation = priorOuterContinuation ;
297
296
this .outerResult = outerResult ;
298
297
this .outerCheckValue = outerCheckValue ;
299
- // start calculating the next result in the background.
300
- setInnerFuture ();
301
298
}
302
299
303
300
@ Nonnull
304
- public CompletableFuture <PipelineQueueEntry > getNextInnerPipelineFuture () {
301
+ public CompletableFuture <PipelineQueueEntry < V , T > > getNextInnerPipelineFuture () {
305
302
if (innerFuture == null ) {
306
303
setInnerFuture ();
307
304
}
@@ -366,6 +363,30 @@ public RecordCursorResult<V> nextResult() {
366
363
private Continuation <T , V > toContinuation () {
367
364
return new Continuation <>(priorOuterContinuation , outerResult , outerCheckValue , innerFuture .join ());
368
365
}
366
+
367
+ @ Nonnull
368
+ public static <V , T > PipelineQueueEntry <V , T > newInstance (RecordCursor <V > innerCursor ,
369
+ RecordCursorContinuation priorOuterContinuation ,
370
+ RecordCursorResult <T > outerResult ,
371
+ byte [] outerCheckValue ) {
372
+ return new PipelineQueueEntry <>(innerCursor , priorOuterContinuation , outerResult , outerCheckValue );
373
+ }
374
+
375
+ @ Nonnull
376
+ public static <V , T > PipelineQueueEntry <V , T > newInstanceWithBackgroundComputationOfFirstResult (RecordCursor <V > innerCursor ,
377
+ RecordCursorContinuation priorOuterContinuation ,
378
+ RecordCursorResult <T > outerResult ,
379
+ byte [] outerCheckValue ) {
380
+ final var result = newInstance (innerCursor , priorOuterContinuation , outerResult , outerCheckValue );
381
+ result .setInnerFuture ();
382
+ return result ;
383
+ }
384
+
385
+ @ Nonnull
386
+ public static <V , T > PipelineQueueEntry <V , T > newSentinel (RecordCursorContinuation priorOuterContinuation ,
387
+ RecordCursorResult <T > outerResult ) {
388
+ return new PipelineQueueEntry <>(null , priorOuterContinuation , outerResult , null );
389
+ }
369
390
}
370
391
371
392
private static class Continuation <T , V > implements RecordCursorContinuation {
0 commit comments