class ActiveSupport::Notifications::Fanout

This is a default queue implementation that ships with Notifications. It just pushes events to all registered log subscribers.

This class is thread safe. All methods are reentrant.

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/active_support/notifications/fanout.rb, line 15
def initialize
  @subscribers = []
  @listeners_for = Concurrent::Map.new
  super
end

Public Instance Methods

finish(name, id, payload, listeners = listeners_for(name)) click to toggle source
# File lib/active_support/notifications/fanout.rb, line 47
def finish(name, id, payload, listeners = listeners_for(name))
  listeners.each { |s| s.finish(name, id, payload) }
end
listeners_for(name) click to toggle source
# File lib/active_support/notifications/fanout.rb, line 55
def listeners_for(name)
  # this is correctly done double-checked locking (Concurrent::Map's lookups have volatile semantics)
  @listeners_for[name] || synchronize do
    # use synchronisation when accessing @subscribers
    @listeners_for[name] ||= @subscribers.select { |s| s.subscribed_to?(name) }
  end
end
listening?(name) click to toggle source
# File lib/active_support/notifications/fanout.rb, line 63
def listening?(name)
  listeners_for(name).any?
end
publish(name, *args) click to toggle source
# File lib/active_support/notifications/fanout.rb, line 51
def publish(name, *args)
  listeners_for(name).each { |s| s.publish(name, *args) }
end
start(name, id, payload) click to toggle source
# File lib/active_support/notifications/fanout.rb, line 43
def start(name, id, payload)
  listeners_for(name).each { |s| s.start(name, id, payload) }
end
subscribe(pattern = nil, block = Proc.new) click to toggle source
# File lib/active_support/notifications/fanout.rb, line 21
def subscribe(pattern = nil, block = Proc.new)
  subscriber = Subscribers.new pattern, block
  synchronize do
    @subscribers << subscriber
    @listeners_for.clear
  end
  subscriber
end
unsubscribe(subscriber_or_name) click to toggle source
# File lib/active_support/notifications/fanout.rb, line 30
def unsubscribe(subscriber_or_name)
  synchronize do
    case subscriber_or_name
    when String
      @subscribers.reject! { |s| s.matches?(subscriber_or_name) }
    else
      @subscribers.delete(subscriber_or_name)
    end

    @listeners_for.clear
  end
end
wait() click to toggle source

This is a sync queue, so there is no waiting.

# File lib/active_support/notifications/fanout.rb, line 68
def wait
end