class Qpid::Proton::Reactor::Container
A representation of the AMQP concept of a container which, loosely speaking, is something that establishes links to or from another container on which messages are transferred.
This is an extension to the Reactor classthat adds convenience methods for creating instances of Qpid::Proton::Connection, Qpid::Proton::Sender and Qpid::Proton::Receiver.
@example
Attributes
Public Class Methods
# File lib/reactor/container.rb, line 58 def initialize(handlers, options = {}) super(handlers, options) # only do the following if we're creating a new instance if !options.has_key?(:impl) @ssl = SSLConfig.new if options[:global_handler] self.global_handler = GlobalOverrides.new(options[:global_handler]) else # very ugly, but using self.global_handler doesn't work in the constructor ghandler = Reactor.instance_method(:global_handler).bind(self).call ghandler = GlobalOverrides.new(ghandler) Reactor.instance_method(:global_handler=).bind(self).call(ghandler) end @trigger = nil @container_id = generate_uuid end end
Public Instance Methods
# File lib/reactor/container.rb, line 256 def _apply_link_options(options, link) if !options.nil? && !options.empty? if !options.is_a?(::List) options = [Options].flatten end options.each {|option| o.apply(link) if o.test(link)} end end
# File lib/reactor/container.rb, line 110 def _session(context) if context.is_a?(Qpid::Proton::URL) return self._session(self.connect(:url => context)) elsif context.is_a?(Qpid::Proton::Session) return context elsif context.is_a?(Qpid::Proton::Connection) if context.session_policy? return context.session_policy.session(context) else return self.create_session(context) end else return context.session end end
Initiates the establishment of an AMQP connection.
@param options [Hash] A hash of named arguments.
# File lib/reactor/container.rb, line 81 def connect(options = {}) conn = self.connection(options[:handler]) conn.container = self.container_id || generate_uuid connector = Connector.new(conn) conn.overrides = connector if !options[:url].nil? connector.address = URLs.new([options[:url]]) elsif !options[:urls].nil? connector.address = URLs.new(options[:urls]) elsif !options[:address].nil? connector.address = URLs.new([Qpid::Proton::URL.new(options[:address])]) else raise ::ArgumentError.new("either :url or :urls or :address required") end connector.heartbeat = options[:heartbeat] if !options[:heartbeat].nil? if !options[:reconnect].nil? connector.reconnect = options[:reconnect] else connector.reconnect = Backoff.new() end connector.ssl_domain = SessionPerConnection.new # TODO seems this should be configurable conn.open return conn end
Initiates the establishment of a link over which messages can be received.
There are two accepted arguments for the context
1. If a Connection is supplied then the link is established using that
object. The source, and optionally the target, address can be supplied
2. If it is a String or a URL then a new Connection is created on which
the link will be attached. If a path is specified, but not the source address, then the path of the URL is used as the target address.
The name will be generated for the link if one is not specified.
@param context [Connection, URL, String] The connection or the address. @param opts [Hash] Additional otpions. @option opts [String, Qpid::Proton::URL] The source address. @option opts [String] :target The target address @option opts [String] :name The link name. @option opts [Boolean] :dynamic @option opts [Object] :handler @option opts [Hash] :options Additional link options.
@return [Receiver
# File lib/reactor/container.rb, line 185 def create_receiver(context, opts = {}) if context.is_a?(::String) context = Qpid::Proton::URL.new(context) end source = opts[:source] if context.is_a?(Qpid::Proton::URL) && source.nil? source = context.path end session = self._session(context) receiver = session.receiver(opts[:name] || id(session.connection.container, source, opts[:target])) receiver.source.address = source if source receiver.source.dynamic = true if opts.has_key?(:dynamic) && opts[:dynamic] receiver.target.address = opts[:target] if !opts[:target].nil? receiver.handler = opts[:handler] if !opts[:handler].nil? self._apply_link_options(opts[:options], receiver) receiver.open return receiver end
Initiates the establishment of a link over which messages can be sent.
@param context [String, URL] The context. @param opts [Hash] Additional options. @param opts [String, Qpid::Proton::URL] The target address. @param opts [String] :source The source address. @param opts [Boolean] :dynamic @param opts [Object] :handler @param opts [Object] :tag_generator The tag generator. @param opts [Hash] :options Addtional link options
@return [Sender] The sender.
# File lib/reactor/container.rb, line 139 def create_sender(context, opts = {}) if context.is_a?(::String) context = Qpid::Proton::URL.new(context) end target = opts[:target] if context.is_a?(Qpid::Proton::URL) && target.nil? target = context.path end session = self._session(context) sender = session.sender(opts[:name] || id(session.connection.container, target, opts[:source])) sender.source.address = opts[:source] if !opts[:source].nil? sender.target.address = target if target sender.handler = opts[:handler] if !opts[:handler].nil? sender.tag_generator = opts[:tag_generator] if !opts[:tag_gnenerator].nil? self._apply_link_options(opts[:options], sender) sender.open return sender end
# File lib/reactor/container.rb, line 209 def declare_transaction(context, handler = nil, settle_before_discharge = false) if context.respond_to? :txn_ctl && !context.__send__(:txn_ctl).nil? class << context attr_accessor :txn_ctl end context.txn_ctl = self.create_sender(context, nil, "txn-ctl", InternalTransactionHandler.new()) end return Transaction.new(context.txn_ctl, handler, settle_before_discharge) end
# File lib/reactor/container.rb, line 239 def do_work(timeout = nil) self.timeout = timeout unless timeout.nil? self.process end
# File lib/reactor/container.rb, line 244 def id(container, remote, local) if !local.nil? && !remote.nil? "#{container}-#{remote}-#{local}" elsif !local.nil? "#{container}-#{local}" elsif !remote.nil? "#{container}-#{remote}" else "#{container}-#{generate_uuid}" end end
Initiates a server socket, accepting incoming AMQP connections on the interface and port specified.
@param url [] @param ssl_domain []
# File lib/reactor/container.rb, line 226 def listen(url, ssl_domain = nil) url = Qpid::Proton::URL.new(url) acceptor = self.acceptor(url.host, url.port) ssl_config = ssl_domain if ssl_config.nil? && (url.scheme == 'amqps') && @ssl ssl_config = @ssl.server end if !ssl_config.nil? acceptor.ssl_domain(ssl_config) end return acceptor end
# File lib/reactor/container.rb, line 266 def to_s "#{self.class}<@impl=#{Cproton.pni_address_of(@impl)}>" end