class Dalli::Ring
An implementation of a consistent hash ring, designed to minimize the cache miss impact of adding or removing servers from the ring. That is, adding or removing a server from the ring should impact the key -> server mapping of ~ 1/N of the stored keys where N is the number of servers in the ring. This is done by creating a large number of “points” per server, distributed over the space 0x00000000 - 0xFFFFFFFF. For a given key, we calculate the CRC32 hash, and find the nearest “point” that is less than or equal to the the key’s hash. In this implemetation, each “point” is represented by a Dalli::Ring::Entry
.
Constants
- POINTS_PER_SERVER
The number of entries on the continuum created per server in an equally weighted scenario.
Attributes
continuum[RW]
servers[RW]
Public Class Methods
new(servers, options)
click to toggle source
# File lib/dalli/ring.rb, line 26 def initialize(servers, options) @servers = servers @continuum = nil @continuum = build_continuum(servers) if servers.size > 1 threadsafe! unless options[:threadsafe] == false @failover = options[:failover] != false end
Public Instance Methods
close()
click to toggle source
# File lib/dalli/ring.rb, line 95 def close @servers.each(&:close) end
keys_grouped_by_server(key_arr)
click to toggle source
# File lib/dalli/ring.rb, line 64 def keys_grouped_by_server(key_arr) key_arr.group_by do |key| server_for_key(key) rescue Dalli::RingError Dalli.logger.debug { "unable to get key #{key}" } nil end end
lock() { || ... }
click to toggle source
# File lib/dalli/ring.rb, line 73 def lock @servers.each(&:lock!) begin yield ensure @servers.each(&:unlock!) end end
pipeline_consume_and_ignore_responses()
click to toggle source
# File lib/dalli/ring.rb, line 82 def pipeline_consume_and_ignore_responses @servers.each do |s| s.request(:noop) rescue Dalli::NetworkError # Ignore this error, as it indicates the socket is unavailable # and there's no need to flush end end
server_for_key(key)
click to toggle source
# File lib/dalli/ring.rb, line 35 def server_for_key(key) server = if @continuum server_from_continuum(key) else @servers.first end # Note that the call to alive? has the side effect of initializing # the socket return server if server&.alive? raise Dalli::RingError, 'No server available' end
server_from_continuum(key)
click to toggle source
# File lib/dalli/ring.rb, line 49 def server_from_continuum(key) hkey = hash_for(key) 20.times do |try| server = server_for_hash_key(hkey) # Note that the call to alive? has the side effect of initializing # the socket return server if server.alive? break unless @failover hkey = hash_for("#{try}#{key}") end nil end
socket_timeout()
click to toggle source
# File lib/dalli/ring.rb, line 91 def socket_timeout @servers.first.socket_timeout end
Private Instance Methods
build_continuum(servers)
click to toggle source
# File lib/dalli/ring.rb, line 126 def build_continuum(servers) continuum = [] total_weight = servers.inject(0) { |memo, srv| memo + srv.weight } servers.each do |server| entry_count_for(server, servers.size, total_weight).times do |idx| hash = Digest::SHA1.hexdigest("#{server.name}:#{idx}") value = Integer("0x#{hash[0..7]}") continuum << Dalli::Ring::Entry.new(value, server) end end continuum.sort_by(&:value) end
entry_count_for(server, total_servers, total_weight)
click to toggle source
# File lib/dalli/ring.rb, line 111 def entry_count_for(server, total_servers, total_weight) ((total_servers * POINTS_PER_SERVER * server.weight) / Float(total_weight)).floor end
hash_for(key)
click to toggle source
# File lib/dalli/ring.rb, line 107 def hash_for(key) Zlib.crc32(key) end
server_for_hash_key(hash_key)
click to toggle source
# File lib/dalli/ring.rb, line 115 def server_for_hash_key(hash_key) # Find the closest index in the Ring with value <= the given value entryidx = @continuum.bsearch_index { |entry| entry.value > hash_key } if entryidx.nil? entryidx = @continuum.size - 1 else entryidx -= 1 end @continuum[entryidx].server end
threadsafe!()
click to toggle source
# File lib/dalli/ring.rb, line 101 def threadsafe! @servers.each do |s| s.extend(Dalli::Threadsafe) end end