class Qpid::Proton::Messenger

A Messenger provides a high-level means for sending and receiving AMQP messages.

Examples

Public Class Methods

new(name = nil) click to toggle source

Creates a new Messenger.

The name parameter is optional. If one is not provided then a unique name is generated.

Options

  • name - the name (def. nil)

# File lib/qpid_proton/messenger.rb, line 42
def initialize(name = nil)
  @impl = Cproton.pn_messenger(name)
  ObjectSpace.define_finalizer(self, self.class.finalize!(@impl))
end

Public Instance Methods

accept(tracker = nil) click to toggle source

Accepts the incoming message identified by the tracker.

Options

  • tracker - the tracker

  • flag - the flag

# File lib/qpid_proton/messenger.rb, line 296
def accept(tracker = nil)
  raise TypeError.new("invalid tracker: #{tracker}") unless tracker.nil? or valid_tracker?(tracker)
  if tracker.nil? then
    tracker = self.incoming_tracker
    flag = Cproton::PN_CUMULATIVE
  else
    flag = 0
  end
  check_for_error(Cproton.pn_messenger_accept(@impl, tracker.impl, flag))
end
blocking() click to toggle source
# File lib/qpid_proton/messenger.rb, line 78
def blocking
  Cproton.pn_mesenger_is_blocking(@impl)
end
blocking=(blocking) click to toggle source
# File lib/qpid_proton/messenger.rb, line 82
def blocking=(blocking)
  Cproton.pn_messenger_set_blocking(@impl, blocking)
end
certificate() click to toggle source

Returns the path to a certificate file.

# File lib/qpid_proton/messenger.rb, line 146
def certificate
  Cproton.pn_messenger_get_certificate(@impl)
end
certificate=(certificate) click to toggle source

Path to a certificate file for the Messenger.

This certificate is used when the Messenger accepts or establishes SSL/TLS connections.

Options

  • certificate - the certificate

# File lib/qpid_proton/messenger.rb, line 140
def certificate=(certificate)
  Cproton.pn_messenger_set_certificate(@impl, certificate)
end
errno() click to toggle source

Returns the most recent error number.

# File lib/qpid_proton/messenger.rb, line 94
def errno
  Cproton.pn_messenger_errno(@impl)
end
error() click to toggle source

Returns the most recent error message.

# File lib/qpid_proton/messenger.rb, line 100
def error
  Cproton.pn_error_text(Cproton.pn_messenger_error(@impl))
end
error?() click to toggle source

Reports whether an error occurred.

# File lib/qpid_proton/messenger.rb, line 88
def error?
  !Cproton.pn_messenger_errno(@impl).zero?
end
get(msg = nil) click to toggle source

Gets a single message incoming message from the local queue.

If no message is provided in the argument, then one is created. In either case, the one returned will be the fetched message.

Options

  • msg - the (optional) Message instance to be used

# File lib/qpid_proton/messenger.rb, line 221
def get(msg = nil)
  msg_impl = nil
  if msg.nil? then
    msg_impl = nil
  else
    msg_impl = msg.impl
  end
  check_for_error(Cproton.pn_messenger_get(@impl, msg_impl))
  msg.post_decode unless msg.nil?
  return incoming_tracker
end
incoming() click to toggle source

Returns the number of messages in the incoming queue that have not been retrieved.

# File lib/qpid_proton/messenger.rb, line 268
def incoming
  Cproton.pn_messenger_incoming(@impl)
end
incoming_tracker() click to toggle source

Returns a Tracker for the most recently received message.

# File lib/qpid_proton/messenger.rb, line 283
def incoming_tracker
  impl = Cproton.pn_messenger_incoming_tracker(@impl)
  return nil if impl == -1
  Qpid::Proton::Tracker.new(impl)
end
incoming_window() click to toggle source

Returns the incoming window.

# File lib/qpid_proton/messenger.rb, line 370
def incoming_window
  Cproton.pn_messenger_get_incoming_window(@impl)
end
incoming_window=(window) click to toggle source

Sets the incoming window.

If the incoming window is set to a positive value, then after each call to accept or reject, the object will track the status of that many deliveries.

Options

  • window - the window size

# File lib/qpid_proton/messenger.rb, line 363
def incoming_window=(window)
  raise TypeError.new("invalid window: #{window}") unless valid_window?(window)
  check_for_error(Cproton.pn_messenger_set_incoming_window(@impl, window))
end
name() click to toggle source

Returns the name.

# File lib/qpid_proton/messenger.rb, line 55
def name
  Cproton.pn_messenger_name(@impl)
end
outgoing() click to toggle source

Returns the number messages in the outgoing queue that have not been transmitted.

# File lib/qpid_proton/messenger.rb, line 261
def outgoing
  Cproton.pn_messenger_outgoing(@impl)
end
outgoing_tracker() click to toggle source

Returns a Tracker for the message most recently sent via the put method.

# File lib/qpid_proton/messenger.rb, line 275
def outgoing_tracker
  impl = Cproton.pn_messenger_outgoing_tracker(@impl)
  return nil if impl == -1
  Qpid::Proton::Tracker.new(impl)
end
outgoing_window() click to toggle source

Returns the outgoing window.

# File lib/qpid_proton/messenger.rb, line 390
def outgoing_window
  Cproton.pn_messenger_get_outgoing_window(@impl)
end
outgoing_window=(window) click to toggle source

Sets the outgoing window.

If the outgoing window is set to a positive value, then after each call
to #send, the object will track the status of that  many deliveries.

==== Options

* window - the window size
# File lib/qpid_proton/messenger.rb, line 383
def outgoing_window=(window)
  raise TypeError.new("invalid window: #{window}") unless valid_window?(window)
  check_for_error(Cproton.pn_messenger_set_outgoing_window(@impl, window))
end
private_key() click to toggle source

Returns the path to a private key file.

# File lib/qpid_proton/messenger.rb, line 166
def private_key
  Cproton.pn_messenger_get_private_key(@impl)
end
private_key=(key) click to toggle source

Path to a private key file for the Messenger.

The property must be specified for the Messenger to accept incoming SSL/TLS connections and to establish client authenticated outgoing SSL/TLS connections.

Options

  • key - the key file

# File lib/qpid_proton/messenger.rb, line 160
def private_key=(key)
  Cproton.pn_messenger_set_private_key(@impl, key)
end
put(message) click to toggle source

Puts a single message into the outgoing queue.

To ensure messages are sent, you should then call ::send.

Options

  • message - the message

# File lib/qpid_proton/messenger.rb, line 196
def put(message)
  raise TypeError.new("invalid message: #{message}") if message.nil?
  raise ArgumentError.new("invalid message type: #{message.class}") unless message.kind_of?(Message)
  # encode the message first
  message.pre_encode
  check_for_error(Cproton.pn_messenger_put(@impl, message.impl))
  return outgoing_tracker
end
receive(limit = -1) click to toggle source

Receives up to the specified number of messages, blocking until at least one message is received.

Options ====

  • limit - the maximum number of messages to receive

# File lib/qpid_proton/messenger.rb, line 240
def receive(limit = -1)
  check_for_error(Cproton.pn_messenger_recv(@impl, limit))
end
receiving() click to toggle source
# File lib/qpid_proton/messenger.rb, line 244
def receiving
  Cproton.pn_messenger_receiving(@impl)
end
reject(tracker) click to toggle source

Rejects the incoming message identified by the tracker.

Options

  • tracker - the tracker

  • flag - the flag

# File lib/qpid_proton/messenger.rb, line 314
def reject(tracker)
  raise TypeError.new("invalid tracker: #{tracker}") unless tracker.nil? or valid_tracker?(tracker)
  if tracker.nil? then
    tracker = self.incoming_tracker
    flag = Cproton::PN_CUMULATIVE
  else
    flag = 0
  end
  check_for_error(Cproton.pn_messenger_reject(@impl, tracker.impl, flag))
end
send(n = -1) click to toggle source

Sends all outgoing messages, blocking until the outgoing queue is empty.

# File lib/qpid_proton/messenger.rb, line 208
def send(n = -1)
  check_for_error(Cproton.pn_messenger_send(@impl, n))
end
settle(tracker, flag) click to toggle source

Settles messages for a tracker.

Options

  • tracker - the tracker

  • flag - the flag

Examples

# File lib/qpid_proton/messenger.rb, line 347
def settle(tracker, flag)
  raise TypeError.new("invalid tracker: #{tracker}") unless valid_tracker?(tracker)
  raise TypeError.new("invalid flag: #{flag}") unless Qpid::Proton::Tracker.valid_flag?(flag)
  Cproton.pn_messenger_settle(@impl, tracker.impl, flag)
end
start() click to toggle source

Starts the Messenger, allowing it to begin sending and receiving messages.

# File lib/qpid_proton/messenger.rb, line 107
def start
  check_for_error(Cproton.pn_messenger_start(@impl))
end
status(tracker) click to toggle source

Gets the last known remote state of the delivery associated with the given tracker. See TrackerStatus for details on the values returned.

Options

  • tracker - the tracker

# File lib/qpid_proton/messenger.rb, line 333
def status(tracker)
  raise TypeError.new("invalid tracker: #{tracker}") unless valid_tracker?(tracker)
  Qpid::Proton::TrackerStatus.by_value(Cproton.pn_messenger_status(@impl, tracker.impl))
end
stop() click to toggle source

Stops the Messenger, preventing it from sending or receiving any more messages.

# File lib/qpid_proton/messenger.rb, line 114
def stop
  check_for_error(Cproton.pn_messenger_stop(@impl))
end
stopped() click to toggle source
# File lib/qpid_proton/messenger.rb, line 118
def stopped
  Cproton.pn_messenger_stopped(@impl)
end
subscribe(address) click to toggle source

Subscribes the Messenger to a remote address.

# File lib/qpid_proton/messenger.rb, line 124
def subscribe(address)
  raise TypeError.new("invalid address: #{address}") if address.nil?
  subscription = Cproton.pn_messenger_subscribe(@impl, address)
  raise Qpid::Proton::ProtonError.new("Subscribe failed") if subscription.nil?
  Qpid::Proton::Subscription.new(subscription)
end
timeout() click to toggle source

Returns the timeout period

# File lib/qpid_proton/messenger.rb, line 74
def timeout
  Cproton.pn_messenger_get_timeout(@impl)
end
timeout=(timeout) click to toggle source

Sets the timeout period, in milliseconds.

A negative timeout period implies an infinite timeout.

Options

  • timeout - the timeout period

# File lib/qpid_proton/messenger.rb, line 67
def timeout=(timeout)
  raise TypeError.new("invalid timeout: #{timeout}") if timeout.nil?
  Cproton.pn_messenger_set_timeout(@impl, timeout)
end
trusted_certificates() click to toggle source

The path to the databse of trusted certificates.

# File lib/qpid_proton/messenger.rb, line 184
def trusted_certificates
  Cproton.pn_messenger_get_trusted_certificates(@impl)
end
trusted_certificates=(certificates) click to toggle source

A path to a database of trusted certificates for use in verifying the peer on an SSL/TLS connection. If this property is nil, then the peer will not be verified.

Options

  • certificates - the certificates path

# File lib/qpid_proton/messenger.rb, line 178
def trusted_certificates=(certificates)
  Cproton.pn_messenger_set_trusted_certificates(@impl,certificates)
end
work(timeout=-1) click to toggle source
# File lib/qpid_proton/messenger.rb, line 248
def work(timeout=-1)
  err = Cproton.pn_messenger_work(@impl, timeout)
  if (err == Cproton::PN_TIMEOUT) then
    return false
  else
    check_for_error(err)
    return true
  end
end

Private Instance Methods

valid_tracker?(tracker) click to toggle source
# File lib/qpid_proton/messenger.rb, line 396
def valid_tracker?(tracker)
  !tracker.nil? && tracker.is_a?(Qpid::Proton::Tracker)
end
valid_window?(window) click to toggle source
# File lib/qpid_proton/messenger.rb, line 400
def valid_window?(window)
  !window.nil? && [Float, Fixnum].include?(window.class)
end