diff --git a/lib/async/scheduler.rb b/lib/async/scheduler.rb index 56c9620..9cbe393 100644 --- a/lib/async/scheduler.rb +++ b/lib/async/scheduler.rb @@ -347,6 +347,32 @@ def io_write(io, buffer, length, offset = 0) end end + # Used to defer stopping the current task until later. + class RaiseException + # Create a new stop later operation. + # + # @parameter task [Task] The task to stop later. + def initialize(fiber, exception) + @fiber = fiber + @exception = exception + end + + # @returns [Boolean] Whether the task is alive. + def alive? + @fiber.alive? + end + + # Transfer control to the operation - this will stop the task. + def transfer + @fiber.raise(@exception) + end + end + + # Raise an exception on the specified fiber, waking up the event loop if necessary. + def fiber_interrupt(fiber, exception) + unblock(nil, RaiseException.new(fiber, exception)) + end + # Wait for the specified process ID to exit. # # @public Since *Async v2*. diff --git a/test/io.rb b/test/io.rb index da52a4b..05ff562 100644 --- a/test/io.rb +++ b/test/io.rb @@ -88,4 +88,129 @@ out.close end end + + with "#close" do + it "can interrupt reading fiber when closing" do + r, w = IO.pipe + + read_task = Async do + r.read(5) + nil + rescue IOError => e + e.to_s + end + + r.close + + expect(read_task.wait).to be == 'closed stream' + end + + it "can interrupt reading fiber when closing from another fiber" do + r, w = IO.pipe + + read_task = Async do + r.read(5) + nil + rescue IOError => e + e.to_s + end + + close_task = Async do + r.close + end + + close_task.wait + expect(read_task.wait).to be == 'closed stream' + end + + it "can interrupt reading fiber when closing from a new thread" do + r, w = IO.pipe + + read_task = Async do + r.read(5) + nil + rescue IOError => e + e.to_s + end + + close_thread = Thread.new do + r.close + end + + close_thread.value + expect(read_task.wait).to be == 'closed stream' + end + + it "can interrupt reading fiber when closing from a fiber in a new thread" do + r, w = IO.pipe + + read_task = Async do + r.read(5) + nil + rescue IOError => e + e.to_s + end + + close_thread = Thread.new do + close_task = Async do + r.close + end + close_task.wait + end + + close_thread.value + expect(read_task.wait).to be == 'closed stream' + end + + it "can interrupt reading thread when closing from a fiber" do + r, w = IO.pipe + + read_thread = Thread.new do + Thread.current.report_on_exception = false + r.read(5) + nil + rescue IOError => e + e.to_s + end + + # Wait until read_thread blocks on I/O + while read_thread.status != 'sleep' + sleep(0.001) + end + + close_task = Async do + r.close + end + + close_task.wait + expect(read_thread.value).to be == 'closed stream' + end + + it "can interrupt reading fiber in a new thread when closing from a fiber" do + r, w = IO.pipe + + read_thread = Thread.new do + Thread.current.report_on_exception = false + read_task = Async do + r.read(5) + nil + rescue IOError => e + e.to_s + end + read_task.wait + end + + # Wait until read_thread blocks on I/O + while read_thread.status != 'sleep' + sleep(0.001) + end + + close_task = Async do + r.close + end + close_task.wait + + expect(read_thread.value).to be == 'closed stream' + end + end end