diff --git a/Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift b/Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift index 4f08bc4f5..0e74236d5 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift @@ -95,15 +95,13 @@ func withCLocaleSetToGerman(_ body: () throws -> Void) throws { try body() } -class TestHTTPDelegate: HTTPClientResponseDelegate { +final class TestHTTPDelegate: HTTPClientResponseDelegate { typealias Response = Void init(backpressureEventLoop: EventLoop? = nil) { - self.backpressureEventLoop = backpressureEventLoop + self.state = NIOLockedValueBox(MutableState(backpressureEventLoop: backpressureEventLoop)) } - var backpressureEventLoop: EventLoop? - enum State { case idle case head(HTTPResponseHead) @@ -112,77 +110,96 @@ class TestHTTPDelegate: HTTPClientResponseDelegate { case error(Error) } - var state = State.idle + struct MutableState: Sendable { + var state: State = .idle + var backpressureEventLoop: EventLoop? + } + + let state: NIOLockedValueBox func didReceiveHead(task: HTTPClient.Task, _ head: HTTPResponseHead) -> EventLoopFuture { - self.state = .head(head) - return (self.backpressureEventLoop ?? task.eventLoop).makeSucceededFuture(()) + let eventLoop = self.state.withLockedValue { + $0.state = .head(head) + return ($0.backpressureEventLoop ?? task.eventLoop) + } + + return eventLoop.makeSucceededVoidFuture() } func didReceiveBodyPart(task: HTTPClient.Task, _ buffer: ByteBuffer) -> EventLoopFuture { - switch self.state { - case .head(let head): - self.state = .body(head, buffer) - case .body(let head, var body): - var buffer = buffer - body.writeBuffer(&buffer) - self.state = .body(head, body) - default: - preconditionFailure("expecting head or body") + let eventLoop = self.state.withLockedValue { + switch $0.state { + case .head(let head): + $0.state = .body(head, buffer) + case .body(let head, var body): + var buffer = buffer + body.writeBuffer(&buffer) + $0.state = .body(head, body) + default: + preconditionFailure("expecting head or body") + } + return ($0.backpressureEventLoop ?? task.eventLoop) } - return (self.backpressureEventLoop ?? task.eventLoop).makeSucceededFuture(()) + + return eventLoop.makeSucceededVoidFuture() } func didFinishRequest(task: HTTPClient.Task) throws {} } -class CountingDelegate: HTTPClientResponseDelegate { +final class CountingDelegate: HTTPClientResponseDelegate { typealias Response = Int - var count = 0 + private let _count = NIOLockedValueBox(0) func didReceiveBodyPart(task: HTTPClient.Task, _ buffer: ByteBuffer) -> EventLoopFuture { let str = buffer.getString(at: 0, length: buffer.readableBytes) if str?.starts(with: "id:") ?? false { - self.count += 1 + self._count.withLockedValue { $0 += 1 } } return task.eventLoop.makeSucceededFuture(()) } func didFinishRequest(task: HTTPClient.Task) throws -> Int { - self.count + self._count.withLockedValue { $0 } } } -class DelayOnHeadDelegate: HTTPClientResponseDelegate { +final class DelayOnHeadDelegate: HTTPClientResponseDelegate { typealias Response = ByteBuffer let eventLoop: EventLoop - let didReceiveHead: (HTTPResponseHead, EventLoopPromise) -> Void - - private var data: ByteBuffer + let didReceiveHead: @Sendable (HTTPResponseHead, EventLoopPromise) -> Void - private var mayReceiveData = false + struct State: Sendable { + var data: ByteBuffer + var mayReceiveData = false + var expectError = false + } - private var expectError = false + private let state: NIOLockedValueBox - init(eventLoop: EventLoop, didReceiveHead: @escaping (HTTPResponseHead, EventLoopPromise) -> Void) { + init(eventLoop: EventLoop, didReceiveHead: @escaping @Sendable (HTTPResponseHead, EventLoopPromise) -> Void) { self.eventLoop = eventLoop self.didReceiveHead = didReceiveHead - self.data = ByteBuffer() + self.state = NIOLockedValueBox(State(data: ByteBuffer())) } func didReceiveHead(task: HTTPClient.Task, _ head: HTTPResponseHead) -> EventLoopFuture { - XCTAssertFalse(self.mayReceiveData) - XCTAssertFalse(self.expectError) + self.state.withLockedValue { + XCTAssertFalse($0.mayReceiveData) + XCTAssertFalse($0.expectError) + } let promise = self.eventLoop.makePromise(of: Void.self) - promise.futureResult.whenComplete { - switch $0 { - case .success: - self.mayReceiveData = true - case .failure: - self.expectError = true + promise.futureResult.whenComplete { result in + self.state.withLockedValue { state in + switch result { + case .success: + state.mayReceiveData = true + case .failure: + state.expectError = true + } } } @@ -191,20 +208,26 @@ class DelayOnHeadDelegate: HTTPClientResponseDelegate { } func didReceiveBodyPart(task: HTTPClient.Task, _ buffer: ByteBuffer) -> EventLoopFuture { - XCTAssertTrue(self.mayReceiveData) - XCTAssertFalse(self.expectError) - self.data.writeImmutableBuffer(buffer) + self.state.withLockedValue { + XCTAssertTrue($0.mayReceiveData) + XCTAssertFalse($0.expectError) + $0.data.writeImmutableBuffer(buffer) + } return self.eventLoop.makeSucceededFuture(()) } func didFinishRequest(task: HTTPClient.Task) throws -> Response { - XCTAssertTrue(self.mayReceiveData) - XCTAssertFalse(self.expectError) - return self.data + self.state.withLockedValue { + XCTAssertTrue($0.mayReceiveData) + XCTAssertFalse($0.expectError) + return $0.data + } } func didReceiveError(task: HTTPClient.Task, _ error: Error) { - XCTAssertTrue(self.expectError) + self.state.withLockedValue { + XCTAssertTrue($0.expectError) + } } } @@ -336,7 +359,7 @@ enum TestTLS { ) } -internal final class HTTPBin +internal final class HTTPBin: Sendable where RequestHandler.InboundIn == HTTPServerRequestPart, RequestHandler.OutboundOut == HTTPServerResponsePart @@ -415,11 +438,15 @@ where } var port: Int { - Int(self.serverChannel.localAddress!.port!) + self.serverChannel.withLockedValue { + Int($0!.localAddress!.port!) + } } var socketAddress: SocketAddress { - self.serverChannel.localAddress! + self.serverChannel.withLockedValue { + $0!.localAddress! + } } var baseURL: String { @@ -447,9 +474,9 @@ where private let mode: Mode private let sslContext: NIOSSLContext? - private var serverChannel: Channel! + private let serverChannel = NIOLockedValueBox(nil) private let isShutdown = ManagedAtomic(false) - private let handlerFactory: (Int) -> (RequestHandler) + private let handlerFactory: @Sendable (Int) -> (RequestHandler) init( _ mode: Mode = .http1_1(ssl: false, compress: false), @@ -457,7 +484,7 @@ where bindTarget: BindTarget = .localhostIPv4RandomPort, reusePort: Bool = false, trafficShapingTargetBytesPerSecond: Int? = nil, - handlerFactory: @escaping (Int) -> (RequestHandler) + handlerFactory: @escaping @Sendable (Int) -> (RequestHandler) ) { self.mode = mode self.sslContext = HTTPBin.sslContext(for: mode) @@ -477,14 +504,14 @@ where let connectionIDAtomic = ManagedAtomic(0) - self.serverChannel = try! ServerBootstrap(group: self.group) + let serverChannel = try! ServerBootstrap(group: self.group) .serverChannelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1) .serverChannelOption( ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEPORT), value: reusePort ? 1 : 0 ) - .serverChannelInitializer { channel in - channel.pipeline.addHandler(self.activeConnCounterHandler) + .serverChannelInitializer { [activeConnCounterHandler] channel in + channel.pipeline.addHandler(activeConnCounterHandler) }.childChannelInitializer { channel in if let trafficShapingTargetBytesPerSecond = trafficShapingTargetBytesPerSecond { try! channel.pipeline.syncOperations.addHandler( @@ -528,6 +555,7 @@ where return channel.eventLoop.makeFailedFuture(error) } }.bind(to: socketAddress).wait() + self.serverChannel.withLockedValue { $0 = serverChannel } } private func syncAddHTTPProxyHandlers( @@ -1092,13 +1120,13 @@ internal final class HTTPBinHandler: ChannelInboundHandler { ) context.write(wrapOutboundOut(.body(.byteBuffer(responseBody))), promise: nil) } - context.eventLoop.scheduleTask(in: self.delay) { + context.eventLoop.assumeIsolated().scheduleTask(in: self.delay) { guard context.channel.isActive else { context.close(promise: nil) return } - context.writeAndFlush(self.wrapOutboundOut(.end(nil))).whenComplete { result in + context.writeAndFlush(self.wrapOutboundOut(.end(nil))).assumeIsolated().whenComplete { result in self.isServingRequest = false switch result { case .success: @@ -1133,7 +1161,7 @@ internal final class HTTPBinHandler: ChannelInboundHandler { } } -final class ConnectionsCountHandler: ChannelInboundHandler { +final class ConnectionsCountHandler: ChannelInboundHandler, Sendable { typealias InboundIn = Channel private let activeConns = ManagedAtomic(0) @@ -1152,8 +1180,8 @@ final class ConnectionsCountHandler: ChannelInboundHandler { _ = self.activeConns.loadThenWrappingIncrement(ordering: .relaxed) _ = self.createdConns.loadThenWrappingIncrement(ordering: .relaxed) - channel.closeFuture.whenComplete { _ in - _ = self.activeConns.loadThenWrappingDecrement(ordering: .relaxed) + channel.closeFuture.whenComplete { [activeConns] _ in + _ = activeConns.loadThenWrappingDecrement(ordering: .relaxed) } context.fireChannelRead(data) @@ -1173,7 +1201,7 @@ internal final class CloseWithoutClosingServerHandler: ChannelInboundHandler { func handlerAdded(context: ChannelHandlerContext) { self.onClosePromise = context.eventLoop.makePromise() - self.onClosePromise!.futureResult.whenSuccess(self.callback!) + self.onClosePromise!.futureResult.assumeIsolated().whenSuccess(self.callback!) self.callback = nil } @@ -1235,7 +1263,7 @@ final class ExpectClosureServerHandler: ChannelInboundHandler { struct EventLoopFutureTimeoutError: Error {} -extension EventLoopFuture { +extension EventLoopFuture where Value: Sendable { func timeout(after failDelay: TimeAmount) -> EventLoopFuture { let promise = self.eventLoop.makePromise(of: Value.self) @@ -1261,28 +1289,27 @@ struct CollectEverythingLogHandler: LogHandler { var logLevel: Logger.Level = .info let logStore: LogStore - class LogStore { + final class LogStore: Sendable { struct Entry { var level: Logger.Level var message: String var metadata: [String: String] } - var lock = NIOLock() - var logs: [Entry] = [] + private let logs = NIOLockedValueBox<[Entry]>([]) var allEntries: [Entry] { get { - self.lock.withLock { self.logs } + self.logs.withLockedValue { $0 } } set { - self.lock.withLock { self.logs = newValue } + self.logs.withLockedValue { $0 = newValue } } } func append(level: Logger.Level, message: Logger.Message, metadata: Logger.Metadata?) { - self.lock.withLock { - self.logs.append( + self.logs.withLockedValue { + $0.append( Entry( level: level, message: message.description, @@ -1301,6 +1328,7 @@ struct CollectEverythingLogHandler: LogHandler { level: Logger.Level, message: Logger.Message, metadata: Logger.Metadata?, + source: String, file: String, function: String, line: UInt @@ -1322,10 +1350,10 @@ struct CollectEverythingLogHandler: LogHandler { /// consume the bytes by calling ``next()`` on the delegate. /// /// The sole purpose of this class is to enable straight-line stream tests. -class ResponseStreamDelegate: HTTPClientResponseDelegate { +final class ResponseStreamDelegate: HTTPClientResponseDelegate { typealias Response = Void - enum State { + enum State: Sendable { /// The delegate is in the idle state. There are no http response parts to be buffered /// and the consumer did not signal a demand. Transitions to all other states are allowed. case idle @@ -1343,10 +1371,11 @@ class ResponseStreamDelegate: HTTPClientResponseDelegate { } let eventLoop: EventLoop - private var state: State = .idle + private let state: NIOLoopBoundBox init(eventLoop: EventLoop) { self.eventLoop = eventLoop + self.state = .makeBoxSendingValue(.idle, eventLoop: eventLoop) } func next() -> EventLoopFuture { @@ -1360,25 +1389,25 @@ class ResponseStreamDelegate: HTTPClientResponseDelegate { } private func next0() -> EventLoopFuture { - switch self.state { + switch self.state.value { case .idle: let promise = self.eventLoop.makePromise(of: ByteBuffer?.self) - self.state = .waitingForBytes(promise) + self.state.value = .waitingForBytes(promise) return promise.futureResult case .buffering(let byteBuffer, done: false): - self.state = .idle + self.state.value = .idle return self.eventLoop.makeSucceededFuture(byteBuffer) case .buffering(let byteBuffer, done: true): - self.state = .finished + self.state.value = .finished return self.eventLoop.makeSucceededFuture(byteBuffer) case .waitingForBytes: preconditionFailure("Don't call `.next` twice") case .failed(let error): - self.state = .finished + self.state.value = .finished return self.eventLoop.makeFailedFuture(error) case .finished: @@ -1408,16 +1437,16 @@ class ResponseStreamDelegate: HTTPClientResponseDelegate { func didReceiveBodyPart(task: HTTPClient.Task, _ buffer: ByteBuffer) -> EventLoopFuture { self.eventLoop.preconditionInEventLoop() - switch self.state { + switch self.state.value { case .idle: - self.state = .buffering(buffer, done: false) + self.state.value = .buffering(buffer, done: false) case .waitingForBytes(let promise): - self.state = .idle + self.state.value = .idle promise.succeed(buffer) case .buffering(var byteBuffer, done: false): var buffer = buffer byteBuffer.writeBuffer(&buffer) - self.state = .buffering(byteBuffer, done: false) + self.state.value = .buffering(byteBuffer, done: false) case .buffering(_, done: true), .finished, .failed: preconditionFailure("Invalid state: \(self.state)") } @@ -1428,14 +1457,14 @@ class ResponseStreamDelegate: HTTPClientResponseDelegate { func didReceiveError(task: HTTPClient.Task, _ error: Error) { self.eventLoop.preconditionInEventLoop() - switch self.state { + switch self.state.value { case .idle: - self.state = .failed(error) + self.state.value = .failed(error) case .waitingForBytes(let promise): - self.state = .finished + self.state.value = .finished promise.fail(error) case .buffering(_, done: false): - self.state = .failed(error) + self.state.value = .failed(error) case .buffering(_, done: true), .finished, .failed: preconditionFailure("Invalid state: \(self.state)") } @@ -1444,14 +1473,14 @@ class ResponseStreamDelegate: HTTPClientResponseDelegate { func didFinishRequest(task: HTTPClient.Task) throws { self.eventLoop.preconditionInEventLoop() - switch self.state { + switch self.state.value { case .idle: - self.state = .finished + self.state.value = .finished case .waitingForBytes(let promise): - self.state = .finished + self.state.value = .finished promise.succeed(nil) case .buffering(let byteBuffer, done: false): - self.state = .buffering(byteBuffer, done: true) + self.state.value = .buffering(byteBuffer, done: true) case .buffering(_, done: true), .finished, .failed: preconditionFailure("Invalid state: \(self.state)") } @@ -1473,7 +1502,7 @@ class HTTPEchoHandler: ChannelInboundHandler { case .body(let bytes): context.writeAndFlush(self.wrapOutboundOut(.body(.byteBuffer(bytes))), promise: nil) case .end: - context.writeAndFlush(self.wrapOutboundOut(.end(nil))).whenSuccess { + context.writeAndFlush(self.wrapOutboundOut(.end(nil))).assumeIsolated().whenSuccess { context.close(promise: nil) } } @@ -1495,7 +1524,7 @@ final class HTTPEchoHeaders: ChannelInboundHandler { case .body: break case .end: - context.writeAndFlush(self.wrapOutboundOut(.end(nil))).whenSuccess { + context.writeAndFlush(self.wrapOutboundOut(.end(nil))).assumeIsolated().whenSuccess { context.close(promise: nil) } } @@ -1661,7 +1690,7 @@ final class BasicInboundTrafficShapingHandler: ChannelDuplexHandler { let buffer = Self.unwrapInboundIn(data) let byteCount = buffer.readableBytes self.currentSecondBytesSeen += byteCount - context.eventLoop.scheduleTask(in: .seconds(1)) { + context.eventLoop.assumeIsolated().scheduleTask(in: .seconds(1)) { self.currentSecondBytesSeen -= byteCount self.evaluatePause(context: loopBoundContext.value) }