class ActionCable::Connection::StreamEventLoop
Public class methods
Source code GitHub
# File actioncable/lib/action_cable/connection/stream_event_loop.rb, line 10
def initialize
@nio = @executor = @thread = nil
@map = {}
@stopping = false
@todo = Queue.new
@spawn_mutex = Mutex.new
end
Public instance methods
Source code GitHub
# File actioncable/lib/action_cable/connection/stream_event_loop.rb, line 30
def attach(io, stream)
@todo << lambda do
@map[io] = @nio.register(io, :r)
@map[io].value = stream
end
wakeup
end
Source code GitHub
# File actioncable/lib/action_cable/connection/stream_event_loop.rb, line 38
def detach(io, stream)
@todo << lambda do
@nio.deregister io
@map.delete io
io.close
end
wakeup
end
Source code GitHub
# File actioncable/lib/action_cable/connection/stream_event_loop.rb, line 23
def post(task = nil, &block)
task ||= block
spawn
@executor << task
end
Source code GitHub
# File actioncable/lib/action_cable/connection/stream_event_loop.rb, line 56
def stop
@stopping = true
wakeup if @nio
end
Source code GitHub
# File actioncable/lib/action_cable/connection/stream_event_loop.rb, line 19
def timer(interval, &block)
Concurrent::TimerTask.new(execution_interval: interval, &block).tap(&:execute)
end
Source code GitHub
# File actioncable/lib/action_cable/connection/stream_event_loop.rb, line 47
def writes_pending(io)
@todo << lambda do
if monitor = @map[io]
monitor.interests = :rw
end
end
wakeup
end