Skip to content

Fix handling of IO#close interruption across threads/fibers. #369

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions lib/async/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,32 @@ def io_write(io, buffer, length, offset = 0)
end
end

# Used to defer stopping the current task until later.
class RaiseException
# Create a new stop later operation.
#
# @parameter task [Task] The task to stop later.
def initialize(fiber, exception)
@fiber = fiber
@exception = exception
end

# @returns [Boolean] Whether the task is alive.
def alive?
@fiber.alive?
end

# Transfer control to the operation - this will stop the task.
def transfer
@fiber.raise(@exception)
end
end

# Raise an exception on the specified fiber, waking up the event loop if necessary.
def fiber_interrupt(fiber, exception)
unblock(nil, RaiseException.new(fiber, exception))
end

# Wait for the specified process ID to exit.
#
# @public Since *Async v2*.
Expand Down
125 changes: 125 additions & 0 deletions test/io.rb
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,129 @@
out.close
end
end

with "#close" do
it "can interrupt reading fiber when closing" do
r, w = IO.pipe

read_task = Async do
r.read(5)
nil
rescue IOError => e
e.to_s
end

r.close

expect(read_task.wait).to be == 'closed stream'
end

it "can interrupt reading fiber when closing from another fiber" do
r, w = IO.pipe

read_task = Async do
r.read(5)
nil
rescue IOError => e
e.to_s
end

close_task = Async do
r.close
end

close_task.wait
expect(read_task.wait).to be == 'closed stream'
end

it "can interrupt reading fiber when closing from a new thread" do
r, w = IO.pipe

read_task = Async do
r.read(5)
nil
rescue IOError => e
e.to_s
end

close_thread = Thread.new do
r.close
end

close_thread.value
expect(read_task.wait).to be == 'closed stream'
end

it "can interrupt reading fiber when closing from a fiber in a new thread" do
r, w = IO.pipe

read_task = Async do
r.read(5)
nil
rescue IOError => e
e.to_s
end

close_thread = Thread.new do
close_task = Async do
r.close
end
close_task.wait
end

close_thread.value
expect(read_task.wait).to be == 'closed stream'
end

it "can interrupt reading thread when closing from a fiber" do
r, w = IO.pipe

read_thread = Thread.new do
Thread.current.report_on_exception = false
r.read(5)
nil
rescue IOError => e
e.to_s
end

# Wait until read_thread blocks on I/O
while read_thread.status != 'sleep'
sleep(0.001)
end

close_task = Async do
r.close
end

close_task.wait
expect(read_thread.value).to be == 'closed stream'
end

it "can interrupt reading fiber in a new thread when closing from a fiber" do
r, w = IO.pipe

read_thread = Thread.new do
Thread.current.report_on_exception = false
read_task = Async do
r.read(5)
nil
rescue IOError => e
e.to_s
end
read_task.wait
end

# Wait until read_thread blocks on I/O
while read_thread.status != 'sleep'
sleep(0.001)
end

close_task = Async do
r.close
end
close_task.wait

expect(read_thread.value).to be == 'closed stream'
end
end
end
Loading