class ActiveSupport::Notifications::Fanout
This is a default queue implementation that ships with Notifications. It just pushes events to all registered log subscribers.
This class is thread safe. All methods are reentrant.
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/active_support/notifications/fanout.rb, line 15 def initialize @subscribers = [] @listeners_for = Concurrent::Map.new super end
Public Instance Methods
finish(name, id, payload, listeners = listeners_for(name))
click to toggle source
# File lib/active_support/notifications/fanout.rb, line 47 def finish(name, id, payload, listeners = listeners_for(name)) listeners.each { |s| s.finish(name, id, payload) } end
listeners_for(name)
click to toggle source
# File lib/active_support/notifications/fanout.rb, line 55 def listeners_for(name) # this is correctly done double-checked locking (Concurrent::Map's lookups have volatile semantics) @listeners_for[name] || synchronize do # use synchronisation when accessing @subscribers @listeners_for[name] ||= @subscribers.select { |s| s.subscribed_to?(name) } end end
listening?(name)
click to toggle source
# File lib/active_support/notifications/fanout.rb, line 63 def listening?(name) listeners_for(name).any? end
publish(name, *args)
click to toggle source
# File lib/active_support/notifications/fanout.rb, line 51 def publish(name, *args) listeners_for(name).each { |s| s.publish(name, *args) } end
start(name, id, payload)
click to toggle source
# File lib/active_support/notifications/fanout.rb, line 43 def start(name, id, payload) listeners_for(name).each { |s| s.start(name, id, payload) } end
subscribe(pattern = nil, block = Proc.new)
click to toggle source
# File lib/active_support/notifications/fanout.rb, line 21 def subscribe(pattern = nil, block = Proc.new) subscriber = Subscribers.new pattern, block synchronize do @subscribers << subscriber @listeners_for.clear end subscriber end
unsubscribe(subscriber_or_name)
click to toggle source
# File lib/active_support/notifications/fanout.rb, line 30 def unsubscribe(subscriber_or_name) synchronize do case subscriber_or_name when String @subscribers.reject! { |s| s.matches?(subscriber_or_name) } else @subscribers.delete(subscriber_or_name) end @listeners_for.clear end end
wait()
click to toggle source
This is a sync queue, so there is no waiting.
# File lib/active_support/notifications/fanout.rb, line 68 def wait end