Skip to content

Commit e4453eb

Browse files
committed
Make the ResponseAccumulator Sendable
Motivation: The response accumulator is a delegate which must be sendable as it's passed across isolation domains. Modifications: - Make delegates have a sendable requirement - Make the response accumulator sendable Result: Delegates, and the response accumulator, are sendable
1 parent c61298e commit e4453eb

File tree

1 file changed

+120
-96
lines changed

1 file changed

+120
-96
lines changed

Sources/AsyncHTTPClient/HTTPHandler.swift

Lines changed: 120 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -538,8 +538,12 @@ public final class ResponseAccumulator: HTTPClientResponseDelegate {
538538
}
539539
}
540540

541-
var history = [HTTPClient.RequestResponse]()
542-
var state = State.idle
541+
private struct MutableState: Sendable {
542+
var history = [HTTPClient.RequestResponse]()
543+
var state = State.idle
544+
}
545+
546+
private let state: NIOLockedValueBox<MutableState>
543547
let requestMethod: HTTPMethod
544548
let requestHost: String
545549

@@ -573,107 +577,118 @@ public final class ResponseAccumulator: HTTPClientResponseDelegate {
573577
self.requestMethod = request.method
574578
self.requestHost = request.host
575579
self.maxBodySize = maxBodySize
580+
self.state = NIOLockedValueBox(MutableState())
576581
}
577582

578583
public func didVisitURL(
579584
task: HTTPClient.Task<HTTPClient.Response>,
580585
_ request: HTTPClient.Request,
581586
_ head: HTTPResponseHead
582587
) {
583-
self.history.append(.init(request: request, responseHead: head))
588+
self.state.withLockedValue {
589+
$0.history.append(.init(request: request, responseHead: head))
590+
}
584591
}
585592

586593
public func didReceiveHead(task: HTTPClient.Task<Response>, _ head: HTTPResponseHead) -> EventLoopFuture<Void> {
587-
switch self.state {
588-
case .idle:
589-
if self.requestMethod != .HEAD,
590-
let contentLength = head.headers.first(name: "Content-Length"),
591-
let announcedBodySize = Int(contentLength),
592-
announcedBodySize > self.maxBodySize
593-
{
594-
let error = ResponseTooBigError(maxBodySize: maxBodySize)
595-
self.state = .error(error)
596-
return task.eventLoop.makeFailedFuture(error)
597-
}
594+
self.state.withLockedValue {
595+
switch $0.state {
596+
case .idle:
597+
if self.requestMethod != .HEAD,
598+
let contentLength = head.headers.first(name: "Content-Length"),
599+
let announcedBodySize = Int(contentLength),
600+
announcedBodySize > self.maxBodySize
601+
{
602+
let error = ResponseTooBigError(maxBodySize: maxBodySize)
603+
$0.state = .error(error)
604+
return task.eventLoop.makeFailedFuture(error)
605+
}
598606

599-
self.state = .head(head)
600-
case .head:
601-
preconditionFailure("head already set")
602-
case .body:
603-
preconditionFailure("no head received before body")
604-
case .end:
605-
preconditionFailure("request already processed")
606-
case .error:
607-
break
607+
$0.state = .head(head)
608+
case .head:
609+
preconditionFailure("head already set")
610+
case .body:
611+
preconditionFailure("no head received before body")
612+
case .end:
613+
preconditionFailure("request already processed")
614+
case .error:
615+
break
616+
}
617+
return task.eventLoop.makeSucceededFuture(())
608618
}
609-
return task.eventLoop.makeSucceededFuture(())
610619
}
611620

612621
public func didReceiveBodyPart(task: HTTPClient.Task<Response>, _ part: ByteBuffer) -> EventLoopFuture<Void> {
613-
switch self.state {
614-
case .idle:
615-
preconditionFailure("no head received before body")
616-
case .head(let head):
617-
guard part.readableBytes <= self.maxBodySize else {
618-
let error = ResponseTooBigError(maxBodySize: self.maxBodySize)
619-
self.state = .error(error)
620-
return task.eventLoop.makeFailedFuture(error)
621-
}
622-
self.state = .body(head, part)
623-
case .body(let head, var body):
624-
let newBufferSize = body.writerIndex + part.readableBytes
625-
guard newBufferSize <= self.maxBodySize else {
626-
let error = ResponseTooBigError(maxBodySize: self.maxBodySize)
627-
self.state = .error(error)
628-
return task.eventLoop.makeFailedFuture(error)
629-
}
622+
self.state.withLockedValue {
623+
switch $0.state {
624+
case .idle:
625+
preconditionFailure("no head received before body")
626+
case .head(let head):
627+
guard part.readableBytes <= self.maxBodySize else {
628+
let error = ResponseTooBigError(maxBodySize: self.maxBodySize)
629+
$0.state = .error(error)
630+
return task.eventLoop.makeFailedFuture(error)
631+
}
632+
$0.state = .body(head, part)
633+
case .body(let head, var body):
634+
let newBufferSize = body.writerIndex + part.readableBytes
635+
guard newBufferSize <= self.maxBodySize else {
636+
let error = ResponseTooBigError(maxBodySize: self.maxBodySize)
637+
$0.state = .error(error)
638+
return task.eventLoop.makeFailedFuture(error)
639+
}
630640

631-
// The compiler can't prove that `self.state` is dead here (and it kinda isn't, there's
632-
// a cross-module call in the way) so we need to drop the original reference to `body` in
633-
// `self.state` or we'll get a CoW. To fix that we temporarily set the state to `.end` (which
634-
// has no associated data). We'll fix it at the bottom of this block.
635-
self.state = .end
636-
var part = part
637-
body.writeBuffer(&part)
638-
self.state = .body(head, body)
639-
case .end:
640-
preconditionFailure("request already processed")
641-
case .error:
642-
break
641+
// The compiler can't prove that `self.state` is dead here (and it kinda isn't, there's
642+
// a cross-module call in the way) so we need to drop the original reference to `body` in
643+
// `self.state` or we'll get a CoW. To fix that we temporarily set the state to `.end` (which
644+
// has no associated data). We'll fix it at the bottom of this block.
645+
$0.state = .end
646+
var part = part
647+
body.writeBuffer(&part)
648+
$0.state = .body(head, body)
649+
case .end:
650+
preconditionFailure("request already processed")
651+
case .error:
652+
break
653+
}
654+
return task.eventLoop.makeSucceededFuture(())
643655
}
644-
return task.eventLoop.makeSucceededFuture(())
645656
}
646657

647658
public func didReceiveError(task: HTTPClient.Task<Response>, _ error: Error) {
648-
self.state = .error(error)
659+
self.state.withLockedValue {
660+
$0.state = .error(error)
661+
}
649662
}
650663

651664
public func didFinishRequest(task: HTTPClient.Task<Response>) throws -> Response {
652-
switch self.state {
653-
case .idle:
654-
preconditionFailure("no head received before end")
655-
case .head(let head):
656-
return Response(
657-
host: self.requestHost,
658-
status: head.status,
659-
version: head.version,
660-
headers: head.headers,
661-
body: nil,
662-
history: self.history
663-
)
664-
case .body(let head, let body):
665-
return Response(
666-
host: self.requestHost,
667-
status: head.status,
668-
version: head.version,
669-
headers: head.headers,
670-
body: body,
671-
history: self.history
672-
)
673-
case .end:
674-
preconditionFailure("request already processed")
675-
case .error(let error):
676-
throw error
665+
try self.state.withLockedValue {
666+
switch $0.state {
667+
case .idle:
668+
preconditionFailure("no head received before end")
669+
case .head(let head):
670+
return Response(
671+
host: self.requestHost,
672+
status: head.status,
673+
version: head.version,
674+
headers: head.headers,
675+
body: nil,
676+
history: $0.history
677+
)
678+
case .body(let head, let body):
679+
return Response(
680+
host: self.requestHost,
681+
status: head.status,
682+
version: head.version,
683+
headers: head.headers,
684+
body: body,
685+
history: $0.history
686+
)
687+
case .end:
688+
preconditionFailure("request already processed")
689+
case .error(let error):
690+
throw error
691+
}
677692
}
678693
}
679694
}
@@ -709,8 +724,9 @@ public final class ResponseAccumulator: HTTPClientResponseDelegate {
709724
/// released together with the `HTTPTaskHandler` when channel is closed.
710725
/// Users of the library are not required to keep a reference to the
711726
/// object that implements this protocol, but may do so if needed.
712-
public protocol HTTPClientResponseDelegate: AnyObject {
713-
associatedtype Response
727+
@preconcurrency
728+
public protocol HTTPClientResponseDelegate: AnyObject, Sendable {
729+
associatedtype Response: Sendable
714730

715731
/// Called when the request head is sent. Will be called once.
716732
///
@@ -885,7 +901,7 @@ extension URL {
885901
}
886902
}
887903

888-
protocol HTTPClientTaskDelegate {
904+
protocol HTTPClientTaskDelegate: Sendable {
889905
func fail(_ error: Error)
890906
}
891907

@@ -894,30 +910,35 @@ extension HTTPClient {
894910
///
895911
/// Will be created by the library and could be used for obtaining
896912
/// `EventLoopFuture<Response>` of the execution or cancellation of the execution.
897-
public final class Task<Response> {
913+
@preconcurrency
914+
public final class Task<Response: Sendable> {
898915
/// The `EventLoop` the delegate will be executed on.
899916
public let eventLoop: EventLoop
900917
/// The `Logger` used by the `Task` for logging.
901918
public let logger: Logger // We are okay to store the logger here because a Task is for only one request.
902919

903920
let promise: EventLoopPromise<Response>
904921

922+
struct State: Sendable {
923+
var isCancelled: Bool
924+
var taskDelegate: HTTPClientTaskDelegate?
925+
}
926+
927+
private let state: NIOLockedValueBox<State>
928+
905929
var isCancelled: Bool {
906-
self.lock.withLock { self._isCancelled }
930+
self.state.withLockedValue { $0.isCancelled }
907931
}
908932

909933
var taskDelegate: HTTPClientTaskDelegate? {
910934
get {
911-
self.lock.withLock { self._taskDelegate }
935+
self.state.withLockedValue { $0.taskDelegate }
912936
}
913937
set {
914-
self.lock.withLock { self._taskDelegate = newValue }
938+
self.state.withLockedValue { $0.taskDelegate = newValue }
915939
}
916940
}
917941

918-
private var _isCancelled: Bool = false
919-
private var _taskDelegate: HTTPClientTaskDelegate?
920-
private let lock = NIOLock()
921942
private let makeOrGetFileIOThreadPool: () -> NIOThreadPool
922943

923944
/// The shared thread pool of a ``HTTPClient`` used for file IO. It is lazily created on first access.
@@ -930,6 +951,7 @@ extension HTTPClient {
930951
self.promise = eventLoop.makePromise()
931952
self.logger = logger
932953
self.makeOrGetFileIOThreadPool = makeOrGetFileIOThreadPool
954+
self.state = NIOLockedValueBox(State(isCancelled: false, taskDelegate: nil))
933955
}
934956

935957
static func failedTask(
@@ -957,7 +979,8 @@ extension HTTPClient {
957979
/// - returns: The value of ``futureResult`` when it completes.
958980
/// - throws: The error value of ``futureResult`` if it errors.
959981
@available(*, noasync, message: "wait() can block indefinitely, prefer get()", renamed: "get()")
960-
public func wait() throws -> Response {
982+
@preconcurrency
983+
public func wait() throws -> Response where Response: Sendable {
961984
try self.promise.futureResult.wait()
962985
}
963986

@@ -968,7 +991,8 @@ extension HTTPClient {
968991
/// - returns: The value of ``futureResult`` when it completes.
969992
/// - throws: The error value of ``futureResult`` if it errors.
970993
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
971-
public func get() async throws -> Response {
994+
@preconcurrency
995+
public func get() async throws -> Response where Response: Sendable {
972996
try await self.promise.futureResult.get()
973997
}
974998

@@ -985,9 +1009,9 @@ extension HTTPClient {
9851009
///
9861010
/// - Parameter error: the error that is used to fail the promise
9871011
public func fail(reason error: Error) {
988-
let taskDelegate = self.lock.withLock { () -> HTTPClientTaskDelegate? in
989-
self._isCancelled = true
990-
return self._taskDelegate
1012+
let taskDelegate = self.state.withLockedValue { state in
1013+
state.isCancelled = true
1014+
return state.taskDelegate
9911015
}
9921016

9931017
taskDelegate?.fail(error)
@@ -1017,7 +1041,7 @@ internal struct TaskCancelEvent {}
10171041

10181042
// MARK: - RedirectHandler
10191043

1020-
internal struct RedirectHandler<ResponseType> {
1044+
internal struct RedirectHandler<ResponseType: Sendable> {
10211045
let request: HTTPClient.Request
10221046
let redirectState: RedirectState
10231047
let execute: (HTTPClient.Request, RedirectState) -> HTTPClient.Task<ResponseType>

0 commit comments

Comments
 (0)