5
5
namespace EventHook . Helpers
6
6
{
7
7
/// <summary>
8
- /// A concurrent queue facilitating async dequeue
9
- /// Since our consumer is always single threaded no locking is needed
8
+ /// A concurrent queue facilitating async dequeue with minimal locking
9
+ /// Assumes single/multi-threaded producer and a single- threaded consumer
10
10
/// </summary>
11
11
/// <typeparam name="T"></typeparam>
12
12
internal class AsyncQueue < T >
@@ -25,10 +25,10 @@ internal AsyncQueue(CancellationToken taskCancellationToken)
25
25
/// <summary>
26
26
/// Keeps any pending Dequeue task to wake up once data arrives
27
27
/// </summary>
28
- TaskCompletionSource < T > dequeueTask ;
28
+ TaskCompletionSource < bool > dequeueTask ;
29
29
30
30
/// <summary>
31
- /// Assumes a single threaded producer!
31
+ /// Supports multi- threaded producers
32
32
/// </summary>
33
33
/// <param name="value"></param>
34
34
internal void Enqueue ( T value )
@@ -39,15 +39,12 @@ internal void Enqueue(T value)
39
39
if ( dequeueTask != null
40
40
&& ! dequeueTask . Task . IsCompleted )
41
41
{
42
- T result ;
43
- queue . TryDequeue ( out result ) ;
44
- dequeueTask . SetResult ( result ) ;
42
+ dequeueTask . SetResult ( true ) ;
45
43
}
46
-
47
44
}
48
45
49
46
/// <summary>
50
- /// Assumes a single threaded consumer!
47
+ /// Assumes a single- threaded consumer!
51
48
/// </summary>
52
49
/// <returns></returns>
53
50
internal async Task < T > DequeueAsync ( )
@@ -60,11 +57,12 @@ internal async Task<T> DequeueAsync()
60
57
return result ;
61
58
}
62
59
63
- dequeueTask = new TaskCompletionSource < T > ( ) ;
60
+ dequeueTask = new TaskCompletionSource < bool > ( ) ;
64
61
taskCancellationToken . Register ( ( ) => dequeueTask . TrySetCanceled ( ) ) ;
65
- result = await dequeueTask . Task ;
62
+ await dequeueTask . Task ;
66
63
dequeueTask = null ;
67
64
65
+ queue . TryDequeue ( out result ) ;
68
66
return result ;
69
67
}
70
68
0 commit comments