class Qpid::Proton::Reactor::Reactor

Constants

PROTON_METHOD_PREFIX

@private

Attributes

errors[R]

Public Class Methods

new(handlers, options = {}) click to toggle source
# File lib/reactor/reactor.rb, line 53
def initialize(handlers, options = {})
  @impl = options[:impl]
  if @impl.nil?
    @impl = Cproton.pn_reactor
  end
  if !handlers.nil?
    [handlers].flatten.each {|handler| self.handler.add(handler)}
  end
  @errors = []
  @handlers = []
  self.class.store_instance(self, :pn_reactor_attachments)
end
wrap(impl) click to toggle source
# File lib/reactor/reactor.rb, line 47
def self.wrap(impl)
  return nil if impl.nil?

  self.fetch_instance(impl, :pn_reactor_attachments) || Reactor.new(nil, :impl => impl)
end

Public Instance Methods

acceptor(host, port, handler = nil) click to toggle source
# File lib/reactor/reactor.rb, line 155
def acceptor(host, port, handler = nil)
  impl = chandler(handler, self.method(:on_error))
  aimpl = Cproton.pn_reactor_acceptor(@impl, host, "#{port}", impl)
  Cproton.pn_decref(impl)
  if !aimpl.nil?
    return Acceptor.new(aimpl)
  else
    io = Cproton.pn_reactor_io(@impl)
    io_error = Cproton.pn_io_error(io)
    error_text = Cproton.pn_error_text(io_error)
    text = "(#{Cproton.pn_error_text(io_error)} (#{host}:#{port}))"
    raise IOError.new(text)
  end
end
connection(handler = nil) click to toggle source
# File lib/reactor/reactor.rb, line 170
def connection(handler = nil)
  impl = chandler(handler, self.method(:on_error))
  conn = Qpid::Proton::Connection.wrap(Cproton.pn_reactor_connection(@impl, impl))
  Cproton.pn_decref(impl)
  return conn
end
global_handler() click to toggle source
# File lib/reactor/reactor.rb, line 79
def global_handler
  impl = Cproton.pn_reactor_get_global_handler(@impl)
  Qpid::Proton::Handler::WrappedHandler.wrap(impl, self.method(:on_error))
end
global_handler=(handler) click to toggle source
# File lib/reactor/reactor.rb, line 84
def global_handler=(handler)
  impl = chandler(handler, self.method(:on_error))
  Cproton.pn_reactor_set_global_handler(@impl, impl)
  Cproton.pn_decref(impl)
end
handler() click to toggle source
# File lib/reactor/reactor.rb, line 106
def handler
  impl = Cproton.pn_reactor_get_handler(@impl)
  Qpid::Proton::Handler::WrappedHandler.wrap(impl, self.method(:on_error))
end
handler=(handler) click to toggle source
# File lib/reactor/reactor.rb, line 111
def handler=(handler)
  impl = chandler(handler, set.method(:on_error))
  Cproton.pn_reactor_set_handler(@impl, impl)
  Cproton.pn_decref(impl)
end
on_error(info) click to toggle source
# File lib/reactor/reactor.rb, line 74
def on_error(info)
  self.errors << info
  self.yield
end
process() click to toggle source
# File lib/reactor/reactor.rb, line 136
def process
  result = Cproton.pn_reactor_process(@impl)
  if !self.errors.nil? && !self.errors.empty?
    (0...self.errors.size).each do |index|
      error_set = self.errors[index]
      print error.backtrace.join("\n")
    end
    raise self.errors.last
  end
  return result
end
push_event(obj, etype) click to toggle source
# File lib/reactor/reactor.rb, line 192
def push_event(obj, etype)
  Cproton.pn_collector_put(Cproton.pn_reactor_collector(@impl), Qpid::Proton::Util::RBCTX, Cproton.pn_py2void(obj), etype.number)
end
quiesced?() click to toggle source

Returns whether the reactor has any unbuffered data.

@return [Boolean] True if there is no unbuffered data.

# File lib/reactor/reactor.rb, line 70
def quiesced?
  Cproton.pn_reactor_quiesced(@impl)
end
run() { || ... } click to toggle source
# File lib/reactor/reactor.rb, line 117
def run(&block)
  self.timeout = 3.14159265359
  self.start
  while self.process do
    if block_given?
      yield
    end
  end
  self.stop
end
schedule(delay, task) click to toggle source
# File lib/reactor/reactor.rb, line 148
def schedule(delay, task)
  impl = chandler(task, self.method(:on_error))
  task = Task.wrap(Cproton.pn_reactor_schedule(@impl, sec_to_millis(delay), impl))
  Cproton.pn_decref(impl)
  return task
end
selectable(handler = nil) click to toggle source
# File lib/reactor/reactor.rb, line 177
def selectable(handler = nil)
  impl = chandler(handler, self.method(:on_error))
  result = Selectable.wrap(Cproton.pn_reactor_selectable(@impl))
  if !impl.nil?
    record = Cproton.pn_selectable_attachments(result.impl)
    Cproton.pn_record_set_handler(record, impl)
    Cproton.pn_decref(impl)
  end
  return result
end
timeout() click to toggle source

Returns the timeout period.

@return [Fixnum] The timeout period, in seconds.

# File lib/reactor/reactor.rb, line 94
def timeout
  millis_to_timeout(Cproton.pn_reactor_get_timeout(@impl))
end
timeout=(timeout) click to toggle source

Sets the timeout period.

@param timeout [Fixnum] The timeout, in seconds.

# File lib/reactor/reactor.rb, line 102
def timeout=(timeout)
  Cproton.pn_reactor_set_timeout(@impl, timeout_to_millis(timeout))
end
update(sel) click to toggle source
# File lib/reactor/reactor.rb, line 188
def update(sel)
  Cproton.pn_reactor_update(@impl, sel.impl)
end
wakeup() click to toggle source
# File lib/reactor/reactor.rb, line 128
def wakeup
  n = Cproton.pn_reactor_wakeup(@impl)
  unless n.zero?
    io = Cproton.pn_reactor_io(@impl)
    raise IOError.new(Cproton.pn_io_error(io))
  end
end