class Qpid::Proton::Reactor::Container
A representation of the AMQP concept of a container which, loosely speaking, is something that establishes links to or from another container on which messages are transferred.
This is an extension to the Reactor classthat adds convenience methods for creating instances of Qpid::Proton::Connection, Qpid::Proton::Sender and Qpid::Proton::Receiver.
Attributes
Public Class Methods
# File lib/reactor/container.rb, line 55 def initialize(handlers, options = {}) super(handlers, options) # only do the following if we're creating a new instance if !options.has_key?(:impl) @ssl = SSLConfig.new if options[:global_handler] self.global_handler = GlobalOverrides.new(options[:global_handler]) else # very ugly, but using self.global_handler doesn't work in the constructor ghandler = Reactor.instance_method(:global_handler).bind(self).call ghandler = GlobalOverrides.new(ghandler) Reactor.instance_method(:global_handler=).bind(self).call(ghandler) end @trigger = nil @container_id = generate_uuid end end
Public Instance Methods
Initiate an AMQP connection.
@param url [String] Connect to URL host:port, using user:password@ if present @param opts [Hash] Named options
For backwards compatibility, can be called with a single parameter opts.
@option opts [String] :url Connect to URL host:port using user:password@ if present. @option opts [String] :user user name for authentication if not given by URL @option opts [String] :password password for authentication if not given by URL @option opts [Numeric] :idle_timeout seconds before closing an idle connection,
can be a fractional value.
@option opts [Boolean] :sasl_enabled Enable or disable SASL. @option opts [Boolean] :sasl_allow_insecure_mechs Allow mechanisms that disclose clear text
passwords, even over an insecure connection. By default, such mechanisms are only allowed when SSL is enabled.
@option opts [String] :sasl_allowed_mechs the allowed SASL mechanisms for use on the connection.
@option opts [String] :address deprecated use the :url option @option opts [Numeric] :heartbeat milliseconds before closing an idle connection.
*deprecated* use :idle_timeout => heartbeat/1000
@return [Connection] the new connection
# File lib/reactor/container.rb, line 97 def connect(url, opts = {}) # Backwards compatible with old connect(options) if url.is_a? Hash and opts.empty? opts = url url = nil end conn = self.connection(opts[:handler]) conn.container = self.container_id || generate_uuid connector = Connector.new(conn, url, opts) return conn end
Initiates the establishment of a link over which messages can be received.
There are two accepted arguments for the context
1. If a Connection is supplied then the link is established using that
object. The source, and optionally the target, address can be supplied
2. If it is a String or a URL then a new Connection is created on which
the link will be attached. If a path is specified, but not the source address, then the path of the URL is used as the target address.
The name will be generated for the link if one is not specified.
@param context [Connection, URL, String] The connection or the address. @param opts [Hash] Additional otpions. @option opts [String, Qpid::Proton::URL] The source address. @option opts [String] :target The target address @option opts [String] :name The link name. @option opts [Boolean] :dynamic @option opts [Object] :handler @option opts [Hash] :options Additional link options.
@return [Receiver
# File lib/reactor/container.rb, line 186 def create_receiver(context, opts = {}) if context.is_a?(::String) context = Qpid::Proton::URL.new(context) end source = opts[:source] if context.is_a?(Qpid::Proton::URL) && source.nil? source = context.path end session = _session(context) receiver = session.receiver(opts[:name] || id(session.connection.container, source, opts[:target])) receiver.source.address = source if source receiver.source.dynamic = true if opts.has_key?(:dynamic) && opts[:dynamic] receiver.target.address = opts[:target] if !opts[:target].nil? receiver.handler = opts[:handler] if !opts[:handler].nil? _apply_link_options(opts[:options], receiver) receiver.open return receiver end
Initiates the establishment of a link over which messages can be sent.
@param context [String, URL] The context. @param opts [Hash] Additional options. @option opts [String, Qpid::Proton::URL] The target address. @option opts [String] :source The source address. @option opts [Boolean] :dynamic @option opts [Object] :handler @option opts [Object] :tag_generator The tag generator. @option opts [Hash] :options Addtional link options
@return [Sender] The sender.
# File lib/reactor/container.rb, line 140 def create_sender(context, opts = {}) if context.is_a?(::String) context = Qpid::Proton::URL.new(context) end target = opts[:target] if context.is_a?(Qpid::Proton::URL) && target.nil? target = context.path end session = _session(context) sender = session.sender(opts[:name] || id(session.connection.container, target, opts[:source])) sender.source.address = opts[:source] if !opts[:source].nil? sender.target.address = target if target sender.handler = opts[:handler] if !opts[:handler].nil? sender.tag_generator = opts[:tag_generator] if !opts[:tag_gnenerator].nil? _apply_link_options(opts[:options], sender) sender.open return sender end
# File lib/reactor/container.rb, line 210 def declare_transaction(context, handler = nil, settle_before_discharge = false) if context.respond_to? :txn_ctl && !context.__send__(:txn_ctl).nil? class << context attr_accessor :txn_ctl end context.txn_ctl = self.create_sender(context, nil, "txn-ctl", InternalTransactionHandler.new()) end return Transaction.new(context.txn_ctl, handler, settle_before_discharge) end
Initiates a server socket, accepting incoming AMQP connections on the interface and port specified.
@param url [] @param ssl_domain []
# File lib/reactor/container.rb, line 227 def listen(url, ssl_domain = nil) url = Qpid::Proton::URL.new(url) acceptor = self.acceptor(url.host, url.port) ssl_config = ssl_domain if ssl_config.nil? && (url.scheme == 'amqps') && @ssl ssl_config = @ssl.server end if !ssl_config.nil? acceptor.ssl_domain(ssl_config) end return acceptor end
Private Instance Methods
# File lib/reactor/container.rb, line 254 def _apply_link_options(options, link) if !options.nil? && !options.empty? if !options.is_a?(::List) options = [Options].flatten end options.each {|option| o.apply(link) if o.test(link)} end end
# File lib/reactor/container.rb, line 110 def _session(context) if context.is_a?(Qpid::Proton::URL) return _session(self.connect(:url => context)) elsif context.is_a?(Qpid::Proton::Session) return context elsif context.is_a?(Qpid::Proton::Connection) if context.session_policy? return context.session_policy.session(context) else return self.create_session(context) end else return context.session end end
# File lib/reactor/container.rb, line 242 def id(container, remote, local) if !local.nil? && !remote.nil? "#{container}-#{remote}-#{local}" elsif !local.nil? "#{container}-#{local}" elsif !remote.nil? "#{container}-#{remote}" else "#{container}-#{generate_uuid}" end end
# File lib/reactor/container.rb, line 264 def to_s "#{self.class}<@impl=#{Cproton.pni_address_of(@impl)}>" end