class Stomp::Client

Typical Stomp client class. Uses a listener thread to receive frames from the server, any thread can send.

Receives all happen in one thread, so consider not doing much processing in that thread if you have much message volume.

Attributes

host[R]

The Stomp host specified by the client.

login[R]

The login ID used by the client.

parameters[R]

Parameters Hash, possibly nil for a non-hashed connect.

passcode[R]

The login credentials used by the client.

port[R]

The Stomp host's listening port.

reliable[R]

Is this connection reliable?

Public Class Methods

new(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false, autoflush = false) click to toggle source

A new Client object can be initialized using three forms:

Hash (this is the recommended Client initialization method):

hash = {
  :hosts => [
    {:login => "login1", :passcode => "passcode1", :host => "localhost", :port => 61616, :ssl => false},
    {:login => "login2", :passcode => "passcode2", :host => "remotehost", :port => 61617, :ssl => false}
  ],
  :reliable => true,
  :initial_reconnect_delay => 0.01,
  :max_reconnect_delay => 30.0,
  :use_exponential_back_off => true,
  :back_off_multiplier => 2,
  :max_reconnect_attempts => 0,
  :randomize => false,
  :connect_timeout => 0,
  :connect_headers => {},
  :parse_timeout => 5,
  :logger => nil,
  :dmh => false,
  :closed_check => true,
  :hbser => false,
  :stompconn => false,
  :usecrlf => false,
}

e.g. c = Stomp::Client.new(hash)

Positional parameters:

login     (String,  default : '')
passcode  (String,  default : '')
host      (String,  default : 'localhost')
port      (Integer, default : 61613)
reliable  (Boolean, default : false)

e.g. c = Stomp::Client.new('login', 'passcode', 'localhost', 61613, true)

Stomp URL :

A Stomp URL must begin with 'stomp://' and can be in one of the following forms:

stomp://host:port
stomp://host.domain.tld:port
stomp://login:passcode@host:port
stomp://login:passcode@host.domain.tld:port

e.g. c = Stomp::Client.new(urlstring)
# File lib/stomp/client.rb, line 82
def initialize(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false, autoflush = false)

  # Parse stomp:// URL's or set params
  if login.is_a?(Hash)
    @parameters = login

    first_host = @parameters[:hosts][0]

    @login = first_host[:login]
    @passcode = first_host[:passcode]
    @host = first_host[:host]
    @port = first_host[:port] || Connection::default_port(first_host[:ssl])

    @reliable = true
  elsif login =~ /^stomp:\/\/#{url_regex}/ # e.g. stomp://login:passcode@host:port or stomp://host:port
    @login = $2 || ""
    @passcode = $3 || ""
    @host = $4
    @port = $5.to_i
    @reliable = false
  elsif login =~ /^failover:(\/\/)?\(stomp(\+ssl)?:\/\/#{url_regex}(,stomp(\+ssl)?:\/\/#{url_regex}\))+(\?(.*))?$/ 
    # e.g. failover://(stomp://login1:passcode1@localhost:61616,stomp://login2:passcode2@remotehost:61617)?option1=param
    first_host = {}
    first_host[:ssl] = !$2.nil?
    @login = first_host[:login] = $4 || ""
    @passcode = first_host[:passcode] = $5 || ""
    @host = first_host[:host] = $6
    @port = first_host[:port] = $7.to_i || Connection::default_port(first_host[:ssl])

    options = $16 || ""
    parts = options.split(/&|=/)
    options = Hash[*parts]

    hosts = [first_host] + parse_hosts(login)

    @parameters = {}
    @parameters[:hosts] = hosts

    @parameters.merge! filter_options(options)

    @reliable = true
  else
    @login = login
    @passcode = passcode
    @host = host
    @port = port.to_i
    @reliable = reliable
  end

  check_arguments!()

  @id_mutex = Mutex.new()
  @ids = 1

  if @parameters
    @connection = Connection.new(@parameters)
  else
    @connection = Connection.new(@login, @passcode, @host, @port, @reliable)
    @connection.autoflush = autoflush
  end

  start_listeners()

end
open(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false) click to toggle source

open is syntactic sugar for 'Client.new', see 'initialize' for usage.

# File lib/stomp/client.rb, line 148
def self.open(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false)
  Client.new(login, passcode, host, port, reliable)
end

Public Instance Methods

abort(name, headers = {}) click to toggle source

Abort aborts work in a transaction by name.

# File lib/stomp/client.rb, line 164
def abort(name, headers = {})
  @connection.abort(name, headers)

  # replay any ack'd messages in this transaction
  replay_list = @replay_messages_by_txn[name]
  if replay_list
    replay_list.each do |message|
      if listener = find_listener(message)
        listener.call(message)
      end
    end
  end
end
acknowledge(message, headers = {}) { |r| ... } click to toggle source

Acknowledge a message, used when a subscription has specified client acknowledgement ( connection.subscribe(“/queue/a”,{:ack => 'client'}). Accepts a transaction header ( :transaction => 'some_transaction_id' ).

# File lib/stomp/client.rb, line 211
def acknowledge(message, headers = {})
  txn_id = headers[:transaction]
  if txn_id
    # lets keep around messages ack'd in this transaction in case we rollback
    replay_list = @replay_messages_by_txn[txn_id]
    if replay_list.nil?
      replay_list = []
      @replay_messages_by_txn[txn_id] = replay_list
    end
    replay_list << message
  end
  if block_given?
    headers['receipt'] = register_receipt_listener lambda {|r| yield r}
  end
  if protocol() == Stomp::SPL_12
    @connection.ack(message.headers['ack'], headers)
  else
    @connection.ack(message.headers['message-id'], headers)
  end
end
autoflush() click to toggle source

autoflush returns the current connection's autoflush setting.

# File lib/stomp/client.rb, line 342
def autoflush()
  @connection.autoflush()
end
autoflush=(af) click to toggle source

autoflush= sets the current connection's autoflush setting.

# File lib/stomp/client.rb, line 337
def autoflush=(af)
  @connection.autoflush = af
end
begin(name, headers = {}) click to toggle source

Begin starts work in a a transaction by name.

# File lib/stomp/client.rb, line 159
def begin(name, headers = {})
  @connection.begin(name, headers)
end
close(headers={}) click to toggle source

close frees resources in use by this client. The listener thread is terminated, and disconnect on the connection is called.

# File lib/stomp/client.rb, line 275
def close(headers={})
  @listener_thread.exit
  @connection.disconnect(headers)
end
closed?() click to toggle source

close? tests if this client connection is closed.

# File lib/stomp/client.rb, line 269
def closed?()
  @connection.closed?()
end
commit(name, headers = {}) click to toggle source

Commit commits work in a transaction by name.

# File lib/stomp/client.rb, line 179
def commit(name, headers = {})
  txn_id = headers[:transaction]
  @replay_messages_by_txn.delete(txn_id)
  @connection.commit(name, headers)
end
connection_frame() click to toggle source

Return the broker's CONNECTED frame to the client. Misnamed.

# File lib/stomp/client.rb, line 254
def connection_frame()
  @connection.connection_frame
end
disconnect_receipt() click to toggle source

Return any RECEIPT frame received by DISCONNECT.

# File lib/stomp/client.rb, line 259
def disconnect_receipt()
  @connection.disconnect_receipt
end
hbrecv_count() click to toggle source

#hbrecv_count returns the current connection's heartbeat receive count.

# File lib/stomp/client.rb, line 326
def hbrecv_count()
  @connection.hbrecv_count()
end
hbrecv_interval() click to toggle source

#hbrecv_interval returns the connection's heartbeat receive interval.

# File lib/stomp/client.rb, line 316
def hbrecv_interval()
  @connection.hbrecv_interval()
end
hbsend_count() click to toggle source

#hbsend_count returns the current connection's heartbeat send count.

# File lib/stomp/client.rb, line 321
def hbsend_count()
  @connection.hbsend_count()
end
hbsend_interval() click to toggle source

#hbsend_interval returns the connection's heartbeat send interval.

# File lib/stomp/client.rb, line 311
def hbsend_interval()
  @connection.hbsend_interval()
end
join(limit = nil) click to toggle source

join the listener thread for this client, generally used to wait for a quit signal.

# File lib/stomp/client.rb, line 154
def join(limit = nil)
  @listener_thread.join(limit)
end
nack(message_id, headers = {}) click to toggle source

Stomp 1.1+ NACK.

# File lib/stomp/client.rb, line 233
def nack(message_id, headers = {})
  @connection.nack(message_id, headers)
end
open?() click to toggle source

open? tests if this client connection is open.

# File lib/stomp/client.rb, line 264
def open?
  @connection.open?()
end
poll() click to toggle source

Poll for asynchronous messages issued by broker. Return nil of no message available, else the message

# File lib/stomp/client.rb, line 332
def poll()
  @connection.poll()
end
protocol() click to toggle source

protocol returns the current client's protocol level.

# File lib/stomp/client.rb, line 291
def protocol()
  @connection.protocol()
end
publish(destination, message, headers = {}) { |r| ... } click to toggle source

Publishes message to destination. If a block is given a receipt will be requested and passed to the block on receipt. Accepts a transaction header ( :transaction => 'some_transaction_id' ).

# File lib/stomp/client.rb, line 246
def publish(destination, message, headers = {})
  if block_given?
    headers['receipt'] = register_receipt_listener lambda {|r| yield r}
  end
  @connection.publish(destination, message, headers)
end
running() click to toggle source

running checks if the thread was created and is not dead.

# File lib/stomp/client.rb, line 281
def running()
  @listener_thread && !!@listener_thread.status
end
set_logger(logger) click to toggle source

#set_logger identifies a new callback logger.

# File lib/stomp/client.rb, line 286
def set_logger(logger)
  @connection.set_logger(logger)
end
sha1(data) click to toggle source

sha1 returns a SHA1 sum of a given string.

# File lib/stomp/client.rb, line 301
def sha1(data)
  @connection.sha1(data)
end
subscribe(destination, headers = {}) { |msg| ... } click to toggle source

Subscribe to a destination, must be passed a block which will be used as a callback listener. Accepts a transaction header ( :transaction => 'some_transaction_id' ).

# File lib/stomp/client.rb, line 188
def subscribe(destination, headers = {})
  raise "No listener given" unless block_given?
  # use subscription id to correlate messages to subscription. As described in
  # the SUBSCRIPTION section of the protocol: http://stomp.github.com/.
  # If no subscription id is provided, generate one.
  set_subscription_id_if_missing(destination, headers)
  if @listeners[headers[:id]]
    raise "attempting to subscribe to a queue with a previous subscription"
  end
  @listeners[headers[:id]] = lambda {|msg| yield msg}
  @connection.subscribe(destination, headers)
end
unreceive(message, options = {}) click to toggle source

Unreceive a message, sending it back to its queue or to the DLQ.

# File lib/stomp/client.rb, line 238
def unreceive(message, options = {})
  @connection.unreceive(message, options)
end
unsubscribe(name, headers = {}) click to toggle source

Unsubscribe from a subscription by name.

# File lib/stomp/client.rb, line 202
def unsubscribe(name, headers = {})
  set_subscription_id_if_missing(name, headers)
  @connection.unsubscribe(name, headers)
  @listeners[headers[:id]] = nil
end
uuid() click to toggle source

uuid returns a type 4 UUID.

# File lib/stomp/client.rb, line 306
def uuid()
  @connection.uuid()
end
valid_utf8?(s) click to toggle source

valid_utf8? validates any given string for UTF8 compliance.

# File lib/stomp/client.rb, line 296
def valid_utf8?(s)
  @connection.valid_utf8?(s)
end

Private Instance Methods

check_arguments!() click to toggle source

A very basic check of required arguments.

# File lib/client/utils.rb, line 58
def check_arguments!()
  raise ArgumentError if @host.nil? || @host.empty?
  raise ArgumentError if @port.nil? || @port == '' || @port < 1 || @port > 65535
  raise ArgumentError unless @reliable.is_a?(TrueClass) || @reliable.is_a?(FalseClass)
end
filter_options(options) click to toggle source

#filter_options returns a new Hash of filtered options.

# File lib/client/utils.rb, line 65
def filter_options(options)
  new_options = {}
  new_options[:initial_reconnect_delay] = (options["initialReconnectDelay"] || 10).to_f / 1000 # In ms
  new_options[:max_reconnect_delay] = (options["maxReconnectDelay"] || 30000 ).to_f / 1000 # In ms
  new_options[:use_exponential_back_off] = !(options["useExponentialBackOff"] == "false") # Default: true
  new_options[:back_off_multiplier] = (options["backOffMultiplier"] || 2 ).to_i
  new_options[:max_reconnect_attempts] = (options["maxReconnectAttempts"] || 0 ).to_i
  new_options[:randomize] = options["randomize"] == "true" # Default: false
  new_options[:connect_timeout] = 0

  new_options
end
find_listener(message) click to toggle source

#find_listener returns the listener for a given subscription in a given message.

# File lib/client/utils.rb, line 79
def find_listener(message)
  subscription_id = message.headers['subscription']
  if subscription_id == nil
    # For backward compatibility, some messages may already exist with no
    # subscription id, in which case we can attempt to synthesize one.
    set_subscription_id_if_missing(message.headers['destination'], message.headers)
    subscription_id = message.headers[:id]
  end
  @listeners[subscription_id]
end
parse_hosts(url) click to toggle source

Parse a stomp URL.

# File lib/client/utils.rb, line 39
def parse_hosts(url)
  hosts = []

  host_match = /stomp(\+ssl)?:\/\/(([\w\.]*):(\w*)@)?([\w\.]+):(\d+)\)/
  url.scan(host_match).each do |match|
    host = {}
    host[:ssl] = !match[0].nil?
    host[:login] =  match[2] || ""
    host[:passcode] = match[3] || ""
    host[:host] = match[4]
    host[:port] = match[5].to_i

    hosts << host
  end

  hosts
end
register_receipt_listener(listener) click to toggle source

Register a receipt listener.

# File lib/client/utils.rb, line 23
def register_receipt_listener(listener)
  id = -1
  @id_mutex.synchronize do
    id = @ids.to_s
    @ids = @ids.succ
  end
  @receipt_listeners[id] = listener
  id
end
set_subscription_id_if_missing(destination, headers) click to toggle source

Set a subscription id in the headers hash if one does not already exist. For simplicities sake, all subscriptions have a subscription ID. setting an id in the SUBSCRIPTION header is described in the stomp protocol docs: stomp.github.com/

# File lib/client/utils.rb, line 15
def set_subscription_id_if_missing(destination, headers)
  headers[:id] = headers[:id] ? headers[:id] : headers['id']
  if headers[:id] == nil
    headers[:id] = Digest::SHA1.hexdigest(destination)
  end
end
start_listeners() click to toggle source

Start a single listener thread. Misnamed I think.

# File lib/client/utils.rb, line 91
def start_listeners()
  @listeners = {}
  @receipt_listeners = {}
  @replay_messages_by_txn = {}

  @listener_thread = Thread.start do
    while true
      message = @connection.receive
      if message.command == Stomp::CMD_MESSAGE
        if listener = find_listener(message)
          listener.call(message)
        end
      elsif message.command == Stomp::CMD_RECEIPT
        if listener = @receipt_listeners[message.headers['receipt-id']]
          listener.call(message)
        end
      end
    end # while true
  end
end
url_regex() click to toggle source

#url_regex defines a regex for e.g. login:passcode@host:port or host:port

# File lib/client/utils.rb, line 34
def url_regex
  '(([\w\.\-]*):(\w*)@)?([\w\.\-]+):(\d+)'
end