class Aws::SQS::QueuePoller

A utility class for long polling messages in a loop. **Messages are automatically deleted from the queue at the end of the given block.**

poller = Aws::SQS::QueuePoller.new(queue_url)

poller.poll do |msg|
  puts msg.body
end

## Long Polling

By default, messages are received using long polling. This method will force a default `:wait_time_seconds` of 20 seconds. If you prefer to use the queue default wait time, then pass a `nil` value for `:wait_time_seconds`.

# disables 20 second default, use queue ReceiveMessageWaitTimeSeconds
poller.poll(wait_time_seconds:nil) do |msg|
  # ...
end

When disabling `:wait_time_seconds` by passing `nil`, you must ensure the queue `ReceiveMessageWaitTimeSeconds` attribute is set to a non-zero value, or you will be short-polling. This will trigger significantly more API calls.

## Batch Receiving Messages

You can specify a maximum number of messages to receive with each polling attempt via `:max_number_of_messages`. When this is set to a positive value, greater than 1, the block will receive an array of messages, instead of a single message.

# receives and yields 1 message at a time
poller.poll do |msg|
  # ...
end

# receives and yields up to 10 messages at a time
poller.poll(max_number_of_messages:10) do |messages|
  messages.each do |msg|
    # ...
  end
end

The maximum value for `:max_number_of_messages` is enforced by Amazon SQS.

## Visibility Timeouts

When receiving messages, you have a fixed amount of time to process and delete the message before it is added back into the queue. This is the visibility timeout. By default, the queue's `VisibilityTimeout` attribute is used. You can provide an alternative visibility timeout when polling.

# queue default VisibilityTimeout
poller.poll do |msg|
end

# custom visibility timeout
poller.poll(visibility_timeout:10) do |msg|
end

You can reset the visibility timeout of a single message by calling {#change_message_visibility_timeout}. This is useful when you need more time to finish processing the message.

poller.poll do |msg|

  # do work ...

  # need more time for processing
  poller.change_message_visibility_timeout(msg, 60)

  # finish work ...

end

If you change the visibility timeout of a message to zero, it will return to the queue immediately.

## Deleting Messages

Messages are deleted from the queue when the block returns normally.

poller.poll do |msg|
  # do work
end # messages deleted here

You can skip message deletion by passing `skip_delete: true`. This allows you to manually delete the messages using {#delete_message}, or {#delete_messages}.

# single message
poller.poll(skip_delete: true) do |msg|
  poller.delete_message(msg) # if successful
end

# batch delete messages
poller.poll(skip_delete: true, max_number_of_messages:10) do |messages|
  poller.delete_messages(messages)
end

Another way to manage message deletion is to throw `:skip_delete` from the poll block. You can use this to choose when a message, or message batch is deleted on an individual basis. This can be very useful when you are capturing temporal errors and wish for the message to timeout.

poller.poll do |msg|
  begin
    # do work
  rescue
    # unexpected error occurred while processing messages,
    # log it, and skip delete so it can be re-processed later
    throw :skip_delete
  end
end

## Terminating the Polling Loop

By default, polling will continue indefinitely. You can stop the poller by providing an idle timeout or by throwing `:stop_polling` from the {#before_request} callback.

### `:idle_timeout` Option

This is a configurable, maximum number of seconds to wait for a new message before the polling loop exists. By default, there is no idle timeout.

# stops polling after a minute of no received messages
poller.poll(idle_timeout: 60) do |msg|
  # ...
end

### Throw `:stop_polling`

If you want more fine grained control, you can configure a before request callback to trigger before each long poll. Throwing `:stop_polling` from this callback will cause the poller to exit normally without making the next request.

# stop after processing 100 messages
poller.before_request do |stats|
  throw :stop_polling if stats.received_message_count >= 100
end

poller.poll do |msg|
  # do work ...
end

## Tracking Progress

The poller will automatically track a few statistics client-side in a {PollerStats} object. You can access the poller stats three ways:

Here are examples of accessing the statistics.

Attributes

client[R]

@return [Client]

default_config[R]

@return [PollerConfig]

queue_url[R]

@return [String]

Public Class Methods

new(queue_url, options = {}) click to toggle source

@param [String] queue_url @option options [Client] :client @option (see poll)

# File lib/aws-sdk-resources/services/sqs/queue_poller.rb, line 208
def initialize(queue_url, options = {})
  @queue_url = queue_url
  @client = options.delete(:client) || Client.new
  @default_config = PollerConfig.new(options)
end

Public Instance Methods

before_request(&block) click to toggle source

Registers a callback that is invoked once before every polling attempt.

poller.before_request do |stats|
  logger.info("requests: #{stats.request_count}")
  logger.info("messages: #{stats.received_message_count}")
  logger.info("last-timestamp: #{stats.last_message_received_at}")
end

poller.poll do |msg|
  # do work ...
end

## `:stop_polling`

If you throw `:stop_polling` from the {#before_request} callback, then the poller will exit normally before making the next long poll request.

poller.before_request do |stats|
  throw :stop_polling if stats.received_messages >= 100
end

# at most 100 messages will be yielded
poller.poll do |msg|
  # do work ...
end

@yieldparam [PollerStats] stats An object that tracks a few

client-side statistics about the queue polling.

@return [void]

# File lib/aws-sdk-resources/services/sqs/queue_poller.rb, line 255
def before_request(&block)
  @default_config = @default_config.with(before_request: Proc.new)
end
change_message_visibility_timeout(message, seconds) click to toggle source

@note This method should be called from inside a {#poll} block. @param [#receipt_handle] message An object that responds to

`#receipt_handle`.

@param [Integer] seconds

# File lib/aws-sdk-resources/services/sqs/queue_poller.rb, line 348
def change_message_visibility_timeout(message, seconds)
  @client.change_message_visibility({
    queue_url: @queue_url,
    receipt_handle: message.receipt_handle,
    visibility_timeout: seconds,
  })
end
delete_message(message) click to toggle source

@note This method should be called from inside a {#poll} block. @param [#receipt_handle] message An object that responds to

`#receipt_handle`.
# File lib/aws-sdk-resources/services/sqs/queue_poller.rb, line 359
def delete_message(message)
  @client.delete_message({
    queue_url: @queue_url,
    receipt_handle: message.receipt_handle,
  })
end
delete_messages(messages) click to toggle source

@note This method should be called from inside a {#poll} block. @param [Array<#message_id, receipt_handle>] messages An array of received

messages. Each object must respond to `#message_id` and
`#receipt_handle`.
# File lib/aws-sdk-resources/services/sqs/queue_poller.rb, line 370
def delete_messages(messages)
  @client.delete_message_batch(
    queue_url: @queue_url,
    entries: messages.map { |msg|
      { id: msg.message_id, receipt_handle: msg.receipt_handle }
    }
  )
end
poll(options = {}, &block) click to toggle source

Polls the queue, yielded a message, or an array of messages. Messages are automatically deleted from the queue at the end of the given block. See the class documentation on {QueuePoller} for more examples.

@example Basic example, loops indefinitely

poller.poll do |msg|
  # ...
end

@example Receives and deletes messages as a batch

poller.poll(max_number_of_messages:10) do |messages|
  messages.each do |msg|
    # ...
  end
end

@option options [Integer] :wait_time_seconds (20) The

long polling interval. Messages are yielded as soon as they are
received. The `:wait_time_seconds` option specifies the max
duration for each polling attempt before a new request is
sent to receive messages.

@option options [Integer] :max_number_of_messages (1) The maximum

number of messages to yield from each polling attempt.
Values can be from 1 to 10.

@option options [Integer] :visibility_timeout (nil)

The number of seconds you have to process a message before
it is put back into the queue and can be received again.
By default, the queue's

@option options [Array<String>] :attribute_names ([])

The list of attributes that need to be returned along with each
message. Valid attribute names include:

* `All` - All attributes.
* `ApproximateFirstReceiveTimestamp` - The time when the message
   was first received from the queue (epoch time in milliseconds).
* `ApproximateReceiveCount` - The number of times a message has
   been received from the queue but not deleted.
* `SenderId` - The AWS account number (or the IP address, if
   anonymous access is allowed) of the sender.
* `SentTimestamp` - The time when the message was sent to the
   queue (epoch time in milliseconds).

@option options [Array<String>] :message_attribute_names ([])

A list of message attributes to receive. You can receive
all messages by using `All` or `.*`. You can also use
`foo.*` to return all message attributes starting with the
`foo` prefix.

@option options [Integer] :idle_timeout (nil) Polling terminates

gracefully when `:idle_timeout` seconds have passed without
receiving any messages.

@option options [Boolean] :skip_delete (false) When `true`, messages

are not deleted after polling block. If you wish to delete
received messages, you will need to call `#delete_message` or
`#delete_messages` manually.

@option options [Proc] :before_request (nil) Called before each

polling attempt. This proc receives a single argument, an
instance of {PollerStats}.

@return [PollerStats]

# File lib/aws-sdk-resources/services/sqs/queue_poller.rb, line 327
def poll(options = {}, &block)
  config = @default_config.with(options)
  stats = PollerStats.new
  catch(:stop_polling) do
    loop do
      messages = get_messages(config, stats)
      if messages.empty?
        check_idle_timeout(config, stats, messages)
      else
        process_messages(config, stats, messages, &block)
      end
    end
  end
  stats.polling_stopped_at = Time.now
  stats
end

Private Instance Methods

check_idle_timeout(config, stats, messages) click to toggle source
# File lib/aws-sdk-resources/services/sqs/queue_poller.rb, line 393
def check_idle_timeout(config, stats, messages)
  if config.idle_timeout
    since = stats.last_message_received_at || stats.polling_started_at
    idle_time = Time.now - since
    throw :stop_polling if idle_time > config.idle_timeout
  end
end
get_messages(config, stats) click to toggle source
# File lib/aws-sdk-resources/services/sqs/queue_poller.rb, line 381
def get_messages(config, stats)
  config.before_request.call(stats) if config.before_request
  messages = send_request(config).messages
  stats.request_count += 1
  messages
end
process_messages(config, stats, messages, &block) click to toggle source
# File lib/aws-sdk-resources/services/sqs/queue_poller.rb, line 401
def process_messages(config, stats, messages, &block)
  stats.received_message_count += messages.count
  stats.last_message_received_at = Time.now
  catch(:skip_delete) do
    yield_messages(config, messages, stats, &block)
    delete_messages(messages) unless config.skip_delete
  end
end
send_request(config) click to toggle source
# File lib/aws-sdk-resources/services/sqs/queue_poller.rb, line 388
def send_request(config)
  params = config.request_params.merge(queue_url: @queue_url)
  @client.receive_message(params)
end
yield_messages(config, messages, stats) { |msg, stats| ... } click to toggle source
# File lib/aws-sdk-resources/services/sqs/queue_poller.rb, line 410
def yield_messages(config, messages, stats, &block)
  if config.request_params[:max_number_of_messages] == 1
    messages.each do |msg|
      yield(msg, stats)
    end
  else
    yield(messages, stats)
  end
end