class Qrack::Subscription

Subscription ancestor class @deprecated

Attributes

ack[RW]
client[R]
consumer_tag[RW]
delivery_tag[RW]
exclusive[RW]
message_count[R]
message_max[RW]
queue[R]
timeout[RW]

Public Class Methods

new(client, queue, opts = {}) click to toggle source
# File lib/qrack/subscription.rb, line 15
def initialize(client, queue, opts = {})
  @client = client
  @queue = queue

  # Get timeout value
  @timeout = opts[:timeout] || nil

  # Get maximum amount of messages to process
  @message_max = opts[:message_max] || nil

  # If a consumer tag is not passed in the server will generate one
  @consumer_tag = opts[:consumer_tag] || nil

  # Ignore the :nowait option if passed, otherwise program will hang waiting for a
  # response from the server causing an error.
  opts.delete(:nowait)

  # Do we want to have to provide an acknowledgement?
  @ack = opts[:ack] || nil

  # Does this consumer want exclusive use of the queue?
  @exclusive = opts[:exclusive] || false

  # Initialize message counter
  @message_count = 0

  # Store options
  @opts = opts
end

Public Instance Methods

start(&blk) click to toggle source
# File lib/qrack/subscription.rb, line 45
def start(&blk)
  # Do not process any messages if zero message_max
  if message_max == 0
    return
  end

  # Notify server about new consumer
  setup_consumer

  # Start subscription loop
  loop do

    begin
      method = client.next_method(:timeout => timeout)
    rescue Qrack::FrameTimeout
      queue.unsubscribe
      break
    end

    # Increment message counter
    @message_count += 1

    # get delivery tag to use for acknowledge
    queue.delivery_tag = method.delivery_tag if @ack

    header = client.next_payload

    # If maximum frame size is smaller than message payload body then message
    # will have a message header and several message bodies
    msg = ''
    while msg.length < header.size
      msg << client.next_payload
    end

    # If block present, pass the message info to the block for processing
    blk.call({:header => header, :payload => msg, :delivery_details => method.arguments}) if !blk.nil?

    # Exit loop if message_max condition met
    if (!message_max.nil? and message_count == message_max)
      # Stop consuming messages
      queue.unsubscribe()
      # Acknowledge receipt of the final message
      queue.ack() if @ack
      # Quit the loop
      break
    end

    # Have to do the ack here because the ack triggers the release of messages from the server
    # if you are using Client#qos prefetch and you will get extra messages sent through before
    # the unsubscribe takes effect to stop messages being sent to this consumer unless the ack is
    # deferred.
    queue.ack() if @ack
  end
end