@@ -23,10 +23,9 @@ internal AsyncQueue(CancellationToken taskCancellationToken)
23
23
ConcurrentQueue < T > queue = new ConcurrentQueue < T > ( ) ;
24
24
25
25
/// <summary>
26
- /// Keeps a list of pending Dequeue tasks in FIFO order
26
+ /// Keeps any pending Dequeue task to wake up once data arrives
27
27
/// </summary>
28
- ConcurrentQueue < TaskCompletionSource < T > > dequeueTasks
29
- = new ConcurrentQueue < TaskCompletionSource < T > > ( ) ;
28
+ TaskCompletionSource < T > dequeueTask ;
30
29
31
30
/// <summary>
32
31
/// Assumes a single threaded producer!
@@ -36,18 +35,15 @@ internal void Enqueue(T value)
36
35
{
37
36
queue . Enqueue ( value ) ;
38
37
39
- //Set the earlist waiting Dequeue task
40
- TaskCompletionSource < T > task ;
41
-
42
- if ( dequeueTasks . TryDequeue ( out task ) )
38
+ //wake up the dequeue task with result
39
+ if ( dequeueTask != null
40
+ && ! dequeueTask . Task . IsCompleted )
43
41
{
44
- //return the result
45
42
T result ;
46
43
queue . TryDequeue ( out result ) ;
47
- task . SetResult ( result ) ;
44
+ dequeueTask . SetResult ( result ) ;
48
45
}
49
46
50
-
51
47
}
52
48
53
49
/// <summary>
@@ -57,14 +53,17 @@ internal void Enqueue(T value)
57
53
internal async Task < T > DequeueAsync ( )
58
54
{
59
55
T result ;
60
-
61
56
queue . TryDequeue ( out result ) ;
62
57
63
- var tcs = new TaskCompletionSource < T > ( ) ;
64
- taskCancellationToken . Register ( ( ) => tcs . TrySetCanceled ( ) ) ;
58
+ if ( result != null )
59
+ {
60
+ return result ;
61
+ }
65
62
66
- dequeueTasks . Enqueue ( tcs ) ;
67
- result = await tcs . Task ;
63
+ dequeueTask = new TaskCompletionSource < T > ( ) ;
64
+ taskCancellationToken . Register ( ( ) => dequeueTask . TrySetCanceled ( ) ) ;
65
+ result = await dequeueTask . Task ;
66
+ dequeueTask = null ;
68
67
69
68
return result ;
70
69
}
0 commit comments