diff --git a/Sources/AsyncHTTPClient/HTTPHandler.swift b/Sources/AsyncHTTPClient/HTTPHandler.swift index 95bdaeaed..432f0ff11 100644 --- a/Sources/AsyncHTTPClient/HTTPHandler.swift +++ b/Sources/AsyncHTTPClient/HTTPHandler.swift @@ -32,14 +32,15 @@ extension HTTPClient { /// A streaming uploader. /// /// ``StreamWriter`` abstracts - public struct StreamWriter { - let closure: (IOData) -> EventLoopFuture + public struct StreamWriter: Sendable { + let closure: @Sendable (IOData) -> EventLoopFuture /// Create new ``HTTPClient/Body/StreamWriter`` /// /// - parameters: /// - closure: function that will be called to write actual bytes to the channel. - public init(closure: @escaping (IOData) -> EventLoopFuture) { + @preconcurrency + public init(closure: @escaping @Sendable (IOData) -> EventLoopFuture) { self.closure = closure } @@ -55,8 +56,8 @@ extension HTTPClient { func writeChunks( of bytes: Bytes, maxChunkSize: Int - ) -> EventLoopFuture where Bytes.Element == UInt8 { - // `StreamWriter` is has design issues, for example + ) -> EventLoopFuture where Bytes.Element == UInt8, Bytes: Sendable { + // `StreamWriter` has design issues, for example // - https://github.com/swift-server/async-http-client/issues/194 // - https://github.com/swift-server/async-http-client/issues/264 // - We're not told the EventLoop the task runs on and the user is free to return whatever EL they @@ -66,49 +67,52 @@ extension HTTPClient { typealias Iterator = EnumeratedSequence>.Iterator typealias Chunk = (offset: Int, element: ChunksOfCountCollection.Element) - func makeIteratorAndFirstChunk( - bytes: Bytes - ) -> ( - iterator: NIOLockedValueBox, - chunk: Chunk - )? { - var iterator = bytes.chunks(ofCount: maxChunkSize).enumerated().makeIterator() - guard let chunk = iterator.next() else { - return nil + // HACK (again, we're not told the right EventLoop): Let's write 0 bytes to make the user tell us... + return self.write(.byteBuffer(ByteBuffer())).flatMapWithEventLoop { (_, loop) in + func makeIteratorAndFirstChunk( + bytes: Bytes + ) -> (iterator: Iterator, chunk: Chunk)? { + var iterator = bytes.chunks(ofCount: maxChunkSize).enumerated().makeIterator() + guard let chunk = iterator.next() else { + return nil + } + + return (iterator, chunk) } - return (NIOLockedValueBox(iterator), chunk) - } - - guard let (iterator, chunk) = makeIteratorAndFirstChunk(bytes: bytes) else { - return self.write(IOData.byteBuffer(.init())) - } + guard let iteratorAndChunk = makeIteratorAndFirstChunk(bytes: bytes) else { + return loop.makeSucceededVoidFuture() + } - @Sendable // can't use closure here as we recursively call ourselves which closures can't do - func writeNextChunk(_ chunk: Chunk, allDone: EventLoopPromise) { - if let nextElement = iterator.withLockedValue({ $0.next() }) { - self.write(.byteBuffer(ByteBuffer(bytes: chunk.element))).map { - let index = nextElement.offset - if (index + 1) % 4 == 0 { - // Let's not stack-overflow if the futures insta-complete which they at least in HTTP/2 - // mode. - // Also, we must frequently return to the EventLoop because we may get the pause signal - // from another thread. If we fail to do that promptly, we may balloon our body chunks - // into memory. - allDone.futureResult.eventLoop.execute { - writeNextChunk(nextElement, allDone: allDone) + var iterator = iteratorAndChunk.0 + let chunk = iteratorAndChunk.1 + + // can't use closure here as we recursively call ourselves which closures can't do + func writeNextChunk(_ chunk: Chunk, allDone: EventLoopPromise) { + let loop = allDone.futureResult.eventLoop + loop.assertInEventLoop() + + if let (index, element) = iterator.next() { + self.write(.byteBuffer(ByteBuffer(bytes: chunk.element))).hop(to: loop).assumeIsolated().map + { + if (index + 1) % 4 == 0 { + // Let's not stack-overflow if the futures insta-complete which they at least in HTTP/2 + // mode. + // Also, we must frequently return to the EventLoop because we may get the pause signal + // from another thread. If we fail to do that promptly, we may balloon our body chunks + // into memory. + allDone.futureResult.eventLoop.assumeIsolated().execute { + writeNextChunk((offset: index, element: element), allDone: allDone) + } + } else { + writeNextChunk((offset: index, element: element), allDone: allDone) } - } else { - writeNextChunk(nextElement, allDone: allDone) - } - }.cascadeFailure(to: allDone) - } else { - self.write(.byteBuffer(ByteBuffer(bytes: chunk.element))).cascade(to: allDone) + }.nonisolated().cascadeFailure(to: allDone) + } else { + self.write(.byteBuffer(ByteBuffer(bytes: chunk.element))).cascade(to: allDone) + } } - } - // HACK (again, we're not told the right EventLoop): Let's write 0 bytes to make the user tell us... - return self.write(.byteBuffer(ByteBuffer())).flatMapWithEventLoop { (_, loop) in let allDone = loop.makePromise(of: Void.self) writeNextChunk(chunk, allDone: allDone) return allDone.futureResult