Skip to content

Commit 7e55dbe

Browse files
committed
fix 784: dont crash on huge in-memory bodies
1 parent bdaa3b1 commit 7e55dbe

9 files changed

+305
-86
lines changed

Sources/AsyncHTTPClient/AsyncAwait/Transaction+StateMachine.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,14 +34,14 @@ extension Transaction {
3434
case finished(error: Error?)
3535
}
3636

37-
fileprivate enum RequestStreamState {
37+
fileprivate enum RequestStreamState: Sendable {
3838
case requestHeadSent
3939
case producing
4040
case paused(continuation: CheckedContinuation<Void, Error>?)
4141
case finished
4242
}
4343

44-
fileprivate enum ResponseStreamState {
44+
fileprivate enum ResponseStreamState: Sendable {
4545
// Waiting for response head. Valid transitions to: streamingBody.
4646
case waitingForResponseHead
4747
// streaming response body. Valid transitions to: finished.

Sources/AsyncHTTPClient/AsyncAwait/Transaction.swift

Lines changed: 38 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@ import NIOHTTP1
1919
import NIOSSL
2020

2121
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
22-
@usableFromInline final class Transaction: @unchecked Sendable {
22+
@usableFromInline final class Transaction:
23+
// until NIOLockedValueBox learns `sending` because StateMachine cannot be Sendable
24+
@unchecked Sendable {
2325
let logger: Logger
2426

2527
let request: HTTPClientRequest.Prepared
@@ -28,8 +30,7 @@ import NIOSSL
2830
let preferredEventLoop: EventLoop
2931
let requestOptions: RequestOptions
3032

31-
private let stateLock = NIOLock()
32-
private var state: StateMachine
33+
private let state: NIOLockedValueBox<StateMachine>
3334

3435
init(
3536
request: HTTPClientRequest.Prepared,
@@ -44,7 +45,7 @@ import NIOSSL
4445
self.logger = logger
4546
self.connectionDeadline = connectionDeadline
4647
self.preferredEventLoop = preferredEventLoop
47-
self.state = StateMachine(responseContinuation)
48+
self.state = NIOLockedValueBox(StateMachine(responseContinuation))
4849
}
4950

5051
func cancel() {
@@ -56,8 +57,8 @@ import NIOSSL
5657
private func writeOnceAndOneTimeOnly(byteBuffer: ByteBuffer) {
5758
// This method is synchronously invoked after sending the request head. For this reason we
5859
// can make a number of assumptions, how the state machine will react.
59-
let writeAction = self.stateLock.withLock {
60-
self.state.writeNextRequestPart()
60+
let writeAction = self.state.withLockedValue { state in
61+
state.writeNextRequestPart()
6162
}
6263

6364
switch writeAction {
@@ -99,30 +100,33 @@ import NIOSSL
99100

100101
struct BreakTheWriteLoopError: Swift.Error {}
101102

103+
// FIXME: Refactor this to not use `self.state.unsafe`.
102104
private func writeRequestBodyPart(_ part: ByteBuffer) async throws {
103-
self.stateLock.lock()
104-
switch self.state.writeNextRequestPart() {
105+
self.state.unsafe.lock()
106+
switch self.state.unsafe.withValueAssumingLockIsAcquired({ state in state.writeNextRequestPart() }) {
105107
case .writeAndContinue(let executor):
106-
self.stateLock.unlock()
108+
self.state.unsafe.unlock()
107109
executor.writeRequestBodyPart(.byteBuffer(part), request: self, promise: nil)
108110

109111
case .writeAndWait(let executor):
110112
try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Void, Error>) in
111-
self.state.waitForRequestBodyDemand(continuation: continuation)
112-
self.stateLock.unlock()
113+
self.state.unsafe.withValueAssumingLockIsAcquired({ state in
114+
state.waitForRequestBodyDemand(continuation: continuation)
115+
})
116+
self.state.unsafe.unlock()
113117

114118
executor.writeRequestBodyPart(.byteBuffer(part), request: self, promise: nil)
115119
}
116120

117121
case .fail:
118-
self.stateLock.unlock()
122+
self.state.unsafe.unlock()
119123
throw BreakTheWriteLoopError()
120124
}
121125
}
122126

123127
private func requestBodyStreamFinished() {
124-
let finishAction = self.stateLock.withLock {
125-
self.state.finishRequestBodyStream()
128+
let finishAction = self.state.withLockedValue { state in
129+
state.finishRequestBodyStream()
126130
}
127131

128132
switch finishAction {
@@ -150,8 +154,8 @@ extension Transaction: HTTPSchedulableRequest {
150154
var requiredEventLoop: EventLoop? { nil }
151155

152156
func requestWasQueued(_ scheduler: HTTPRequestScheduler) {
153-
self.stateLock.withLock {
154-
self.state.requestWasQueued(scheduler)
157+
self.state.withLockedValue { state in
158+
state.requestWasQueued(scheduler)
155159
}
156160
}
157161
}
@@ -165,8 +169,8 @@ extension Transaction: HTTPExecutableRequest {
165169
// MARK: Request
166170

167171
func willExecuteRequest(_ executor: HTTPRequestExecutor) {
168-
let action = self.stateLock.withLock {
169-
self.state.willExecuteRequest(executor)
172+
let action = self.state.withLockedValue { state in
173+
state.willExecuteRequest(executor)
170174
}
171175

172176
switch action {
@@ -183,8 +187,8 @@ extension Transaction: HTTPExecutableRequest {
183187
func requestHeadSent() {}
184188

185189
func resumeRequestBodyStream() {
186-
let action = self.stateLock.withLock {
187-
self.state.resumeRequestBodyStream()
190+
let action = self.state.withLockedValue { state in
191+
state.resumeRequestBodyStream()
188192
}
189193

190194
switch action {
@@ -214,16 +218,16 @@ extension Transaction: HTTPExecutableRequest {
214218
}
215219

216220
func pauseRequestBodyStream() {
217-
self.stateLock.withLock {
218-
self.state.pauseRequestBodyStream()
221+
self.state.withLockedValue { state in
222+
state.pauseRequestBodyStream()
219223
}
220224
}
221225

222226
// MARK: Response
223227

224228
func receiveResponseHead(_ head: HTTPResponseHead) {
225-
let action = self.stateLock.withLock {
226-
self.state.receiveResponseHead(head, delegate: self)
229+
let action = self.state.withLockedValue { state in
230+
state.receiveResponseHead(head, delegate: self)
227231
}
228232

229233
switch action {
@@ -243,8 +247,8 @@ extension Transaction: HTTPExecutableRequest {
243247
}
244248

245249
func receiveResponseBodyParts(_ buffer: CircularBuffer<ByteBuffer>) {
246-
let action = self.stateLock.withLock {
247-
self.state.receiveResponseBodyParts(buffer)
250+
let action = self.state.withLockedValue { state in
251+
state.receiveResponseBodyParts(buffer)
248252
}
249253
switch action {
250254
case .none:
@@ -260,8 +264,8 @@ extension Transaction: HTTPExecutableRequest {
260264
}
261265

262266
func succeedRequest(_ buffer: CircularBuffer<ByteBuffer>?) {
263-
let succeedAction = self.stateLock.withLock {
264-
self.state.succeedRequest(buffer)
267+
let succeedAction = self.state.withLockedValue { state in
268+
state.succeedRequest(buffer)
265269
}
266270
switch succeedAction {
267271
case .finishResponseStream(let source, let finalResponse):
@@ -276,8 +280,8 @@ extension Transaction: HTTPExecutableRequest {
276280
}
277281

278282
func fail(_ error: Error) {
279-
let action = self.stateLock.withLock {
280-
self.state.fail(error)
283+
let action = self.state.withLockedValue { state in
284+
state.fail(error)
281285
}
282286
self.performFailAction(action)
283287
}
@@ -304,8 +308,8 @@ extension Transaction: HTTPExecutableRequest {
304308
}
305309

306310
func deadlineExceeded() {
307-
let action = self.stateLock.withLock {
308-
self.state.deadlineExceeded()
311+
let action = self.state.withLockedValue { state in
312+
state.deadlineExceeded()
309313
}
310314
self.performDeadlineExceededAction(action)
311315
}
@@ -329,8 +333,8 @@ extension Transaction: HTTPExecutableRequest {
329333
extension Transaction: NIOAsyncSequenceProducerDelegate {
330334
@usableFromInline
331335
func produceMore() {
332-
let action = self.stateLock.withLock {
333-
self.state.produceMore()
336+
let action = self.state.withLockedValue { state in
337+
state.produceMore()
334338
}
335339
switch action {
336340
case .none:

Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2ClientRequestHandler.swift

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -432,6 +432,9 @@ final class HTTP2ClientRequestHandler: ChannelDuplexHandler {
432432
}
433433
}
434434

435+
@available(*, unavailable)
436+
extension HTTP2ClientRequestHandler: Sendable {}
437+
435438
extension HTTP2ClientRequestHandler: HTTPRequestExecutor {
436439
func writeRequestBodyPart(_ data: IOData, request: HTTPExecutableRequest, promise: EventLoopPromise<Void>?) {
437440
if self.eventLoop.inEventLoop {

Sources/AsyncHTTPClient/ConnectionPool/HTTPExecutableRequest.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ import NIOSSL
132132
///
133133
/// Use this handle to cancel the request, while it is waiting for a free connection, to execute the request.
134134
/// This protocol is only intended to be implemented by the `HTTPConnectionPool`.
135-
protocol HTTPRequestScheduler {
135+
protocol HTTPRequestScheduler: Sendable {
136136
/// Informs the task queuer that a request has been cancelled.
137137
func cancelRequest(_: HTTPSchedulableRequest)
138138
}

Sources/AsyncHTTPClient/HTTPClient.swift

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -222,21 +222,20 @@ public class HTTPClient {
222222
"""
223223
)
224224
}
225-
let errorStorageLock = NIOLock()
226-
let errorStorage: UnsafeMutableTransferBox<Error?> = .init(nil)
225+
let errorStorage: NIOLockedValueBox<Error?> = NIOLockedValueBox(nil)
227226
let continuation = DispatchWorkItem {}
228227
self.shutdown(requiresCleanClose: requiresCleanClose, queue: DispatchQueue(label: "async-http-client.shutdown"))
229228
{ error in
230229
if let error = error {
231-
errorStorageLock.withLock {
232-
errorStorage.wrappedValue = error
230+
errorStorage.withLockedValue { errorStorage in
231+
errorStorage = error
233232
}
234233
}
235234
continuation.perform()
236235
}
237236
continuation.wait()
238-
try errorStorageLock.withLock {
239-
if let error = errorStorage.wrappedValue {
237+
try errorStorage.withLockedValue { errorStorage in
238+
if let error = errorStorage {
240239
throw error
241240
}
242241
}

Sources/AsyncHTTPClient/HTTPHandler.swift

Lines changed: 51 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -47,25 +47,65 @@ extension HTTPClient {
4747
}
4848

4949
@inlinable
50-
func writeChunks<Bytes: Collection>(of bytes: Bytes, maxChunkSize: Int) -> EventLoopFuture<Void>
51-
where Bytes.Element == UInt8 {
52-
let iterator = UnsafeMutableTransferBox(bytes.chunks(ofCount: maxChunkSize).makeIterator())
53-
guard let chunk = iterator.wrappedValue.next() else {
50+
func writeChunks<Bytes: Collection>(
51+
of bytes: Bytes,
52+
maxChunkSize: Int
53+
) -> EventLoopFuture<Void> where Bytes.Element == UInt8 {
54+
// `StreamWriter` is has design issues, for example
55+
// - https://github.com/swift-server/async-http-client/issues/194
56+
// - https://github.com/swift-server/async-http-client/issues/264
57+
// - We're not told the EventLoop the task runs on and the user is free to return whatever EL they
58+
// want.
59+
// One important consideration then is that we must lock around the iterator because we could be hopping
60+
// between threads.
61+
typealias Iterator = EnumeratedSequence<ChunksOfCountCollection<Bytes>>.Iterator
62+
typealias Chunk = (offset: Int, element: ChunksOfCountCollection<Bytes>.Element)
63+
64+
func makeIteratorAndFirstChunk() -> (
65+
iterator: NIOLockedValueBox<Iterator>,
66+
chunk: Chunk
67+
)? {
68+
var iterator = bytes.chunks(ofCount: maxChunkSize).enumerated().makeIterator()
69+
guard let chunk = iterator.next() else {
70+
return nil
71+
}
72+
73+
return (NIOLockedValueBox(iterator), chunk)
74+
}
75+
76+
guard let (iterator, chunk) = makeIteratorAndFirstChunk() else {
5477
return self.write(IOData.byteBuffer(.init()))
5578
}
5679

5780
@Sendable // can't use closure here as we recursively call ourselves which closures can't do
58-
func writeNextChunk(_ chunk: Bytes.SubSequence) -> EventLoopFuture<Void> {
59-
if let nextChunk = iterator.wrappedValue.next() {
60-
return self.write(.byteBuffer(ByteBuffer(bytes: chunk))).flatMap {
61-
writeNextChunk(nextChunk)
62-
}
81+
func writeNextChunk(_ chunk: Chunk, allDone: EventLoopPromise<Void>) {
82+
if let nextElement = iterator.withLockedValue({ $0.next() }) {
83+
let (index, chunk) = nextElement
84+
self.write(.byteBuffer(ByteBuffer(bytes: chunk))).map {
85+
if (index + 1) % 4 == 0 {
86+
// Let's not stack-overflow if the futures insta-complete which they at least in HTTP/2
87+
// mode.
88+
// Also, we must frequently return to the EventLoop because we may get the pause signal
89+
// from another thread. If we fail to do that promptly, we may balloon our body chunks
90+
// into memory.
91+
allDone.futureResult.eventLoop.execute {
92+
writeNextChunk(nextElement, allDone: allDone)
93+
}
94+
} else {
95+
writeNextChunk(nextElement, allDone: allDone)
96+
}
97+
}.cascadeFailure(to: allDone)
6398
} else {
64-
return self.write(.byteBuffer(ByteBuffer(bytes: chunk)))
99+
self.write(.byteBuffer(ByteBuffer(bytes: chunk.element))).cascade(to: allDone)
65100
}
66101
}
67102

68-
return writeNextChunk(chunk)
103+
// HACK (again, we're not told the right EventLoop): Let's write 0 bytes to make the user tell us...
104+
return self.write(.byteBuffer(ByteBuffer())).flatMapWithEventLoop { (_, loop) in
105+
let allDone = loop.makePromise(of: Void.self)
106+
writeNextChunk(chunk, allDone: allDone)
107+
return allDone.futureResult
108+
}
69109
}
70110
}
71111

Sources/AsyncHTTPClient/UnsafeTransfer.swift

Lines changed: 0 additions & 29 deletions
This file was deleted.

0 commit comments

Comments
 (0)