class Dalli::PipelinedGetter
Contains logic for the pipelined gets implemented by the client.
Public Class Methods
new(ring, key_manager)
click to toggle source
# File lib/dalli/pipelined_getter.rb, line 8 def initialize(ring, key_manager) @ring = ring @key_manager = key_manager end
Public Instance Methods
abort_with_timeout(servers)
click to toggle source
Swallows Dalli::NetworkError
# File lib/dalli/pipelined_getter.rb, line 128 def abort_with_timeout(servers) abort_without_timeout(servers) servers.each do |server| Dalli.logger.debug { "memcached at #{server.name} did not response within timeout" } end true # Required to simplify caller end
abort_without_timeout(servers)
click to toggle source
Swallows Dalli::NetworkError
# File lib/dalli/pipelined_getter.rb, line 91 def abort_without_timeout(servers) servers.each(&:pipeline_abort) end
fetch_responses(servers, start_time, timeout, &block)
click to toggle source
# File lib/dalli/pipelined_getter.rb, line 95 def fetch_responses(servers, start_time, timeout, &block) # Remove any servers which are not connected servers.delete_if { |s| !s.connected? } return [] if servers.empty? time_left = remaining_time(start_time, timeout) readable_servers = servers_with_response(servers, time_left) if readable_servers.empty? abort_with_timeout(servers) return [] end # Loop through the servers with responses, and # delete any from our list that are finished readable_servers.each do |server| servers.delete(server) if process_server(server, &block) end servers rescue NetworkError # Abort and raise if we encountered a network error. This triggers # a retry at the top level. abort_without_timeout(servers) raise end
finish_queries(servers)
click to toggle source
This loops through the servers that have keys in our set, sending the noop to terminate the set of queries.
# File lib/dalli/pipelined_getter.rb, line 59 def finish_queries(servers) deleted = [] servers.each do |server| next unless server.alive? begin finish_query_for_server(server) rescue Dalli::NetworkError raise rescue Dalli::DalliError deleted.append(server) end end servers.delete_if { |server| deleted.include?(server) } rescue Dalli::NetworkError abort_without_timeout(servers) raise end
finish_query_for_server(server)
click to toggle source
# File lib/dalli/pipelined_getter.rb, line 80 def finish_query_for_server(server) server.pipeline_response_setup rescue Dalli::NetworkError raise rescue Dalli::DalliError => e Dalli.logger.debug { e.inspect } Dalli.logger.debug { "Results from server: #{server.name} will be missing from the results" } raise end
groups_for_keys(*keys)
click to toggle source
# File lib/dalli/pipelined_getter.rb, line 164 def groups_for_keys(*keys) keys.flatten! keys.map! { |a| @key_manager.validate_key(a.to_s) } groups = @ring.keys_grouped_by_server(keys) if (unfound_keys = groups.delete(nil)) Dalli.logger.debug do "unable to get keys for #{unfound_keys.length} keys "\ 'because no matching server was found' end end groups end
make_getkq_requests(groups)
click to toggle source
Loop through the server-grouped sets of keys, writing the corresponding getkq requests to the appropriate servers
It’s worth noting that we could potentially reduce bytes on the wire by switching from getkq to getq, and using the opaque value to match requests to responses.
# File lib/dalli/pipelined_getter.rb, line 46 def make_getkq_requests(groups) groups.each do |server, keys_for_server| server.request(:pipelined_get, keys_for_server) rescue DalliError, NetworkError => e Dalli.logger.debug { e.inspect } Dalli.logger.debug { "unable to get keys for server #{server.name}" } end end
process(keys, &block)
click to toggle source
Yields, one at a time, keys and their values+attributes.
# File lib/dalli/pipelined_getter.rb, line 16 def process(keys, &block) return {} if keys.empty? @ring.lock do servers = setup_requests(keys) start_time = Time.now servers = fetch_responses(servers, start_time, @ring.socket_timeout, &block) until servers.empty? end rescue NetworkError => e Dalli.logger.debug { e.inspect } Dalli.logger.debug { 'retrying pipelined gets because of timeout' } retry end
process_server(server) { |key_without_namespace, value_list| ... }
click to toggle source
Processes responses from a server. Returns true if there are no additional responses from this server.
# File lib/dalli/pipelined_getter.rb, line 139 def process_server(server) server.pipeline_next_responses.each_pair do |key, value_list| yield @key_manager.key_without_namespace(key), value_list end server.pipeline_complete? end
remaining_time(start, timeout)
click to toggle source
# File lib/dalli/pipelined_getter.rb, line 120 def remaining_time(start, timeout) elapsed = Time.now - start return 0 if elapsed > timeout timeout - elapsed end
servers_with_response(servers, timeout)
click to toggle source
# File lib/dalli/pipelined_getter.rb, line 147 def servers_with_response(servers, timeout) return [] if servers.empty? # TODO: - This is a bit challenging. Essentially the PipelinedGetter # is a reactor, but without the benefit of a Fiber or separate thread. # My suspicion is that we may want to try and push this down into the # individual servers, but I'm not sure. For now, we keep the # mapping between the alerted object (the socket) and the # corrresponding server here. server_map = servers.each_with_object({}) { |s, h| h[s.sock] = s } readable, = IO.select(server_map.keys, nil, nil, timeout) return [] if readable.nil? readable.map { |sock| server_map[sock] } end
setup_requests(keys)
click to toggle source
# File lib/dalli/pipelined_getter.rb, line 30 def setup_requests(keys) groups = groups_for_keys(keys) make_getkq_requests(groups) # TODO: How does this exit on a NetworkError finish_queries(groups.keys) end