5
5
namespace EventHook . Helpers
6
6
{
7
7
/// <summary>
8
- /// A concurrent queue facilitating async without locking
8
+ /// A concurrent queue facilitating async dequeue
9
+ /// Since our consumer is always single threaded no locking is needed
9
10
/// </summary>
10
11
/// <typeparam name="T"></typeparam>
11
12
internal class AsyncQueue < T >
@@ -19,66 +20,54 @@ internal AsyncQueue(CancellationToken taskCancellationToken)
19
20
/// <summary>
20
21
/// Backing queue
21
22
/// </summary>
22
- ConcurrentQueue < TaskResult > queue = new ConcurrentQueue < TaskResult > ( ) ;
23
+ ConcurrentQueue < T > queue = new ConcurrentQueue < T > ( ) ;
23
24
24
25
/// <summary>
25
26
/// Keeps a list of pending Dequeue tasks in FIFO order
26
27
/// </summary>
27
- ConcurrentQueue < TaskCompletionSource < TaskResult > > dequeueTasks
28
- = new ConcurrentQueue < TaskCompletionSource < TaskResult > > ( ) ;
28
+ ConcurrentQueue < TaskCompletionSource < T > > dequeueTasks
29
+ = new ConcurrentQueue < TaskCompletionSource < T > > ( ) ;
29
30
31
+ /// <summary>
32
+ /// Assumes a single threaded producer!
33
+ /// </summary>
34
+ /// <param name="value"></param>
30
35
internal void Enqueue ( T value )
31
36
{
32
- queue . Enqueue ( new TaskResult ( ) { success = true , Data = value } ) ;
37
+ queue . Enqueue ( value ) ;
33
38
34
39
//Set the earlist waiting Dequeue task
35
- TaskCompletionSource < TaskResult > task ;
36
- if ( dequeueTasks . TryDequeue ( out task ) )
40
+ TaskCompletionSource < T > task ;
41
+
42
+ if ( dequeueTasks . TryDequeue ( out task ) )
37
43
{
38
- TaskResult result ;
39
- //if dequeue failed it means another Task picked up the data
40
- //set the result to false for this Task so that it will be retried
41
- //otherwise return the result
42
- if ( queue . TryDequeue ( out result ) )
43
- {
44
- task . SetResult ( result ) ;
45
- }
46
- else
47
- {
48
- task . SetResult ( new TaskResult ( ) { success = false } ) ;
49
- }
50
-
44
+ //return the result
45
+ T result ;
46
+ queue . TryDequeue ( out result ) ;
47
+ task . SetResult ( result ) ;
51
48
}
52
49
50
+
53
51
}
54
52
53
+ /// <summary>
54
+ /// Assumes a single threaded consumer!
55
+ /// </summary>
56
+ /// <returns></returns>
55
57
internal async Task < T > DequeueAsync ( )
56
58
{
57
- TaskResult result ;
59
+ T result ;
60
+
58
61
queue . TryDequeue ( out result ) ;
59
62
60
- //try until we get a result
61
- while ( result == null || ! result . success )
62
- {
63
- var tcs = new TaskCompletionSource < TaskResult > ( ) ;
64
- //cancel the task if cancellation token was invoked
65
- //will throw exception on await below if task was running when cancelled
66
- taskCancellationToken . Register ( ( ) => tcs . TrySetCanceled ( ) ) ;
63
+ var tcs = new TaskCompletionSource < T > ( ) ;
64
+ taskCancellationToken . Register ( ( ) => tcs . TrySetCanceled ( ) ) ;
67
65
68
- dequeueTasks . Enqueue ( tcs ) ;
69
- result = await tcs . Task ;
70
- }
66
+ dequeueTasks . Enqueue ( tcs ) ;
67
+ result = await tcs . Task ;
71
68
72
- return result . Data ;
69
+ return result ;
73
70
}
74
71
75
- /// <summary>
76
- /// To keep the dequeue result status
77
- /// </summary>
78
- internal class TaskResult
79
- {
80
- internal bool success { get ; set ; }
81
- internal T Data { get ; set ; }
82
- }
83
72
}
84
73
}
0 commit comments