@@ -53,63 +53,65 @@ extension HTTPClient {
53
53
}
54
54
55
55
@inlinable
56
- @preconcurrency
57
56
func writeChunks< Bytes: Collection > (
58
57
of bytes: Bytes ,
59
58
maxChunkSize: Int
60
- ) -> EventLoopFuture < Void > where Bytes. Element == UInt8 , Bytes: Sendable , Bytes . SubSequence : Sendable {
59
+ ) -> EventLoopFuture < Void > where Bytes. Element == UInt8 , Bytes: Sendable {
61
60
// `StreamWriter` has design issues, for example
62
61
// - https://github.com/swift-server/async-http-client/issues/194
63
62
// - https://github.com/swift-server/async-http-client/issues/264
64
63
// - We're not told the EventLoop the task runs on and the user is free to return whatever EL they
65
64
// want.
66
65
// One important consideration then is that we must lock around the iterator because we could be hopping
67
66
// between threads.
68
- typealias Iterator = BodyStreamIterator < Bytes >
67
+ typealias Iterator = EnumeratedSequence < ChunksOfCountCollection < Bytes > > . Iterator
69
68
typealias Chunk = ( offset: Int , element: ChunksOfCountCollection < Bytes > . Element )
70
69
71
- func makeIteratorAndFirstChunk(
72
- bytes: Bytes
73
- ) -> (
74
- iterator: NIOLockedValueBox < Iterator > ,
75
- chunk: Chunk
76
- ) ? {
77
- var iterator = bytes. chunks ( ofCount: maxChunkSize) . enumerated ( ) . makeIterator ( )
78
- guard let chunk = iterator. next ( ) else {
79
- return nil
70
+ // HACK (again, we're not told the right EventLoop): Let's write 0 bytes to make the user tell us...
71
+ return self . write ( . byteBuffer( ByteBuffer ( ) ) ) . flatMapWithEventLoop { ( _, loop) in
72
+ func makeIteratorAndFirstChunk(
73
+ bytes: Bytes
74
+ ) -> ( iterator: Iterator , chunk: Chunk ) ? {
75
+ var iterator = bytes. chunks ( ofCount: maxChunkSize) . enumerated ( ) . makeIterator ( )
76
+ guard let chunk = iterator. next ( ) else {
77
+ return nil
78
+ }
79
+
80
+ return ( iterator, chunk)
80
81
}
81
82
82
- return ( NIOLockedValueBox ( BodyStreamIterator ( iterator) ) , chunk)
83
- }
84
-
85
- guard let ( iterator, chunk) = makeIteratorAndFirstChunk ( bytes: bytes) else {
86
- return self . write ( IOData . byteBuffer ( . init( ) ) )
87
- }
83
+ guard let iteratorAndChunk = makeIteratorAndFirstChunk ( bytes: bytes) else {
84
+ return loop. makeSucceededVoidFuture ( )
85
+ }
88
86
89
- @Sendable // can't use closure here as we recursively call ourselves which closures can't do
90
- func writeNextChunk( _ chunk: Chunk , allDone: EventLoopPromise < Void > ) {
91
- if let ( index, element) = iterator. withLockedValue ( { $0. next ( ) } ) {
92
- self . write ( . byteBuffer( ByteBuffer ( bytes: chunk. element) ) ) . map {
93
- if ( index + 1 ) % 4 == 0 {
94
- // Let's not stack-overflow if the futures insta-complete which they at least in HTTP/2
95
- // mode.
96
- // Also, we must frequently return to the EventLoop because we may get the pause signal
97
- // from another thread. If we fail to do that promptly, we may balloon our body chunks
98
- // into memory.
99
- allDone. futureResult. eventLoop. execute {
87
+ var iterator = iteratorAndChunk. 0
88
+ let chunk = iteratorAndChunk. 1
89
+
90
+ // can't use closure here as we recursively call ourselves which closures can't do
91
+ func writeNextChunk( _ chunk: Chunk , allDone: EventLoopPromise < Void > ) {
92
+ let loop = allDone. futureResult. eventLoop
93
+ loop. assertInEventLoop ( )
94
+
95
+ if let ( index, element) = iterator. next ( ) {
96
+ self . write ( . byteBuffer( ByteBuffer ( bytes: chunk. element) ) ) . hop ( to: loop) . assumeIsolated ( ) . map {
97
+ if ( index + 1 ) % 4 == 0 {
98
+ // Let's not stack-overflow if the futures insta-complete which they at least in HTTP/2
99
+ // mode.
100
+ // Also, we must frequently return to the EventLoop because we may get the pause signal
101
+ // from another thread. If we fail to do that promptly, we may balloon our body chunks
102
+ // into memory.
103
+ allDone. futureResult. eventLoop. assumeIsolated ( ) . execute {
104
+ writeNextChunk ( ( offset: index, element: element) , allDone: allDone)
105
+ }
106
+ } else {
100
107
writeNextChunk ( ( offset: index, element: element) , allDone: allDone)
101
108
}
102
- } else {
103
- writeNextChunk ( ( offset: index, element: element) , allDone: allDone)
104
- }
105
- } . cascadeFailure ( to: allDone)
106
- } else {
107
- self . write ( . byteBuffer( ByteBuffer ( bytes: chunk. element) ) ) . cascade ( to: allDone)
109
+ } . nonisolated ( ) . cascadeFailure ( to: allDone)
110
+ } else {
111
+ self . write ( . byteBuffer( ByteBuffer ( bytes: chunk. element) ) ) . cascade ( to: allDone)
112
+ }
108
113
}
109
- }
110
114
111
- // HACK (again, we're not told the right EventLoop): Let's write 0 bytes to make the user tell us...
112
- return self . write ( . byteBuffer( ByteBuffer ( ) ) ) . flatMapWithEventLoop { ( _, loop) in
113
115
let allDone = loop. makePromise ( of: Void . self)
114
116
writeNextChunk ( chunk, allDone: allDone)
115
117
return allDone. futureResult
@@ -189,7 +191,7 @@ extension HTTPClient {
189
191
@preconcurrency
190
192
@inlinable
191
193
public static func bytes< Bytes> ( _ bytes: Bytes ) -> Body
192
- where Bytes: RandomAccessCollection , Bytes: Sendable , Bytes. SubSequence : Sendable , Bytes . Element == UInt8 {
194
+ where Bytes: RandomAccessCollection , Bytes: Sendable , Bytes. Element == UInt8 {
193
195
Body ( contentLength: Int64 ( bytes. count) ) { writer in
194
196
if bytes. count <= bagOfBytesToByteBufferConversionChunkSize {
195
197
return writer. write ( . byteBuffer( ByteBuffer ( bytes: bytes) ) )
@@ -1081,26 +1083,3 @@ extension RequestBodyLength {
1081
1083
self = . known( length)
1082
1084
}
1083
1085
}
1084
-
1085
- @usableFromInline
1086
- struct BodyStreamIterator <
1087
- Bytes: Collection
1088
- > : IteratorProtocol , @unchecked Sendable where Bytes. Element == UInt8 , Bytes: Sendable {
1089
- // @unchecked: swift-algorithms hasn't adopted Sendable yet. By inspection, the iterator
1090
- // is safe to annotate as sendable.
1091
- @usableFromInline
1092
- typealias Element = ( offset: Int , element: Bytes . SubSequence )
1093
-
1094
- @usableFromInline
1095
- var _backing : EnumeratedSequence < ChunksOfCountCollection < Bytes > > . Iterator
1096
-
1097
- @inlinable
1098
- init ( _ backing: EnumeratedSequence < ChunksOfCountCollection < Bytes > > . Iterator ) {
1099
- self . _backing = backing
1100
- }
1101
-
1102
- @inlinable
1103
- mutating func next( ) -> Element ? {
1104
- self . _backing. next ( )
1105
- }
1106
- }
0 commit comments