Package proton :: Module reactor
[frames] | no frames]

Source Code for Module proton.reactor

  1  from __future__ import absolute_import 
  2  # 
  3  # Licensed to the Apache Software Foundation (ASF) under one 
  4  # or more contributor license agreements.  See the NOTICE file 
  5  # distributed with this work for additional information 
  6  # regarding copyright ownership.  The ASF licenses this file 
  7  # to you under the Apache License, Version 2.0 (the 
  8  # "License"); you may not use this file except in compliance 
  9  # with the License.  You may obtain a copy of the License at 
 10  # 
 11  #   http://www.apache.org/licenses/LICENSE-2.0 
 12  # 
 13  # Unless required by applicable law or agreed to in writing, 
 14  # software distributed under the License is distributed on an 
 15  # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
 16  # KIND, either express or implied.  See the License for the 
 17  # specific language governing permissions and limitations 
 18  # under the License. 
 19  # 
 20  import logging, os, socket, time, types 
 21  from heapq import heappush, heappop, nsmallest 
 22  from proton import Collector, Connection, ConnectionException, Delivery, Described, dispatch 
 23  from proton import Endpoint, Event, EventBase, EventType, generate_uuid, Handler, Link, Message 
 24  from proton import ProtonException, PN_ACCEPTED, PN_PYREF, SASL, Session, SSL, SSLDomain, SSLUnavailable, symbol 
 25  from proton import Terminus, Timeout, Transport, TransportException, ulong, Url 
 26  from select import select 
 27  from proton.handlers import OutgoingMessageHandler 
 28  from proton import unicode2utf8, utf82unicode 
 29   
 30  import traceback 
 31  from proton import WrappedHandler, _chandler, secs2millis, millis2secs, timeout2millis, millis2timeout, Selectable 
 32  from .wrapper import Wrapper, PYCTX 
 33  from cproton import * 
 34  from . import _compat 
 35   
 36  try: 
 37      import Queue 
 38  except ImportError: 
 39      import queue as Queue 
40 41 -class Task(Wrapper):
42 43 @staticmethod
44 - def wrap(impl):
45 if impl is None: 46 return None 47 else: 48 return Task(impl)
49
50 - def __init__(self, impl):
51 Wrapper.__init__(self, impl, pn_task_attachments)
52
53 - def _init(self):
54 pass
55
56 - def cancel(self):
57 pn_task_cancel(self._impl)
58
59 -class Acceptor(Wrapper):
60
61 - def __init__(self, impl):
62 Wrapper.__init__(self, impl)
63
64 - def set_ssl_domain(self, ssl_domain):
65 pn_acceptor_set_ssl_domain(self._impl, ssl_domain._domain)
66
67 - def close(self):
68 pn_acceptor_close(self._impl)
69
70 -class Reactor(Wrapper):
71 72 @staticmethod
73 - def wrap(impl):
74 if impl is None: 75 return None 76 else: 77 record = pn_reactor_attachments(impl) 78 attrs = pn_void2py(pn_record_get(record, PYCTX)) 79 if attrs and 'subclass' in attrs: 80 return attrs['subclass'](impl=impl) 81 else: 82 return Reactor(impl=impl)
83
84 - def __init__(self, *handlers, **kwargs):
85 Wrapper.__init__(self, kwargs.get("impl", pn_reactor), pn_reactor_attachments) 86 for h in handlers: 87 self.handler.add(h)
88
89 - def _init(self):
90 self.errors = []
91
92 - def on_error(self, info):
93 self.errors.append(info) 94 self.yield_()
95
96 - def _get_global(self):
97 return WrappedHandler.wrap(pn_reactor_get_global_handler(self._impl), self.on_error)
98
99 - def _set_global(self, handler):
100 impl = _chandler(handler, self.on_error) 101 pn_reactor_set_global_handler(self._impl, impl) 102 pn_decref(impl)
103 104 global_handler = property(_get_global, _set_global) 105
106 - def _get_timeout(self):
107 return millis2timeout(pn_reactor_get_timeout(self._impl))
108
109 - def _set_timeout(self, secs):
110 return pn_reactor_set_timeout(self._impl, timeout2millis(secs))
111 112 timeout = property(_get_timeout, _set_timeout) 113
114 - def yield_(self):
115 pn_reactor_yield(self._impl)
116
117 - def mark(self):
118 return pn_reactor_mark(self._impl)
119
120 - def _get_handler(self):
121 return WrappedHandler.wrap(pn_reactor_get_handler(self._impl), self.on_error)
122
123 - def _set_handler(self, handler):
124 impl = _chandler(handler, self.on_error) 125 pn_reactor_set_handler(self._impl, impl) 126 pn_decref(impl)
127 128 handler = property(_get_handler, _set_handler) 129
130 - def run(self):
131 self.timeout = 3.14159265359 132 self.start() 133 while self.process(): pass 134 self.stop() 135 self.process() 136 self.global_handler = None 137 self.handler = None
138
139 - def wakeup(self):
140 n = pn_reactor_wakeup(self._impl) 141 if n: raise IOError(pn_error_text(pn_io_error(pn_reactor_io(self._impl))))
142
143 - def start(self):
144 pn_reactor_start(self._impl)
145 146 @property
147 - def quiesced(self):
148 return pn_reactor_quiesced(self._impl)
149
150 - def _check_errors(self):
151 if self.errors: 152 for exc, value, tb in self.errors[:-1]: 153 traceback.print_exception(exc, value, tb) 154 exc, value, tb = self.errors[-1] 155 _compat.raise_(exc, value, tb)
156
157 - def process(self):
158 result = pn_reactor_process(self._impl) 159 self._check_errors() 160 return result
161
162 - def stop(self):
163 pn_reactor_stop(self._impl) 164 self._check_errors()
165
166 - def schedule(self, delay, task):
167 impl = _chandler(task, self.on_error) 168 task = Task.wrap(pn_reactor_schedule(self._impl, secs2millis(delay), impl)) 169 pn_decref(impl) 170 return task
171
172 - def acceptor(self, host, port, handler=None):
173 impl = _chandler(handler, self.on_error) 174 aimpl = pn_reactor_acceptor(self._impl, unicode2utf8(host), str(port), impl) 175 pn_decref(impl) 176 if aimpl: 177 return Acceptor(aimpl) 178 else: 179 raise IOError("%s (%s:%s)" % (pn_error_text(pn_io_error(pn_reactor_io(self._impl))), host, port))
180
181 - def connection(self, handler=None):
182 """Deprecated: use connection_to_host() instead 183 """ 184 impl = _chandler(handler, self.on_error) 185 result = Connection.wrap(pn_reactor_connection(self._impl, impl)) 186 if impl: pn_decref(impl) 187 return result
188
189 - def connection_to_host(self, host, port, handler=None):
190 """Create an outgoing Connection that will be managed by the reactor. 191 The reator's pn_iohandler will create a socket connection to the host 192 once the connection is opened. 193 """ 194 conn = self.connection(handler) 195 self.set_connection_host(conn, host, port) 196 return conn
197
198 - def set_connection_host(self, connection, host, port):
199 """Change the address used by the connection. The address is 200 used by the reactor's iohandler to create an outgoing socket 201 connection. This must be set prior to opening the connection. 202 """ 203 pn_reactor_set_connection_host(self._impl, 204 connection._impl, 205 unicode2utf8(str(host)), 206 unicode2utf8(str(port)))
207
208 - def get_connection_address(self, connection):
209 """This may be used to retrieve the remote peer address. 210 @return: string containing the address in URL format or None if no 211 address is available. Use the proton.Url class to create a Url object 212 from the returned value. 213 """ 214 _url = pn_reactor_get_connection_address(self._impl, connection._impl) 215 return utf82unicode(_url)
216
217 - def selectable(self, handler=None):
218 impl = _chandler(handler, self.on_error) 219 result = Selectable.wrap(pn_reactor_selectable(self._impl)) 220 if impl: 221 record = pn_selectable_attachments(result._impl) 222 pn_record_set_handler(record, impl) 223 pn_decref(impl) 224 return result
225
226 - def update(self, sel):
227 pn_reactor_update(self._impl, sel._impl)
228
229 - def push_event(self, obj, etype):
230 pn_collector_put(pn_reactor_collector(self._impl), PN_PYREF, pn_py2void(obj), etype.number)
231 232 from proton import wrappers as _wrappers 233 _wrappers["pn_reactor"] = lambda x: Reactor.wrap(pn_cast_pn_reactor(x)) 234 _wrappers["pn_task"] = lambda x: Task.wrap(pn_cast_pn_task(x))
235 236 237 -class EventInjector(object):
238 """ 239 Can be added to a reactor to allow events to be triggered by an 240 external thread but handled on the event thread associated with 241 the reactor. An instance of this class can be passed to the 242 Reactor.selectable() method of the reactor in order to activate 243 it. The close() method should be called when it is no longer 244 needed, to allow the event loop to end if needed. 245 """
246 - def __init__(self):
247 self.queue = Queue.Queue() 248 self.pipe = os.pipe() 249 self._closed = False
250
251 - def trigger(self, event):
252 """ 253 Request that the given event be dispatched on the event thread 254 of the reactor to which this EventInjector was added. 255 """ 256 self.queue.put(event) 257 os.write(self.pipe[1], _compat.str2bin("!"))
258
259 - def close(self):
260 """ 261 Request that this EventInjector be closed. Existing events 262 will be dispctahed on the reactors event dispactch thread, 263 then this will be removed from the set of interest. 264 """ 265 self._closed = True 266 os.write(self.pipe[1], _compat.str2bin("!"))
267
268 - def fileno(self):
269 return self.pipe[0]
270
271 - def on_selectable_init(self, event):
272 sel = event.context 273 sel.fileno(self.fileno()) 274 sel.reading = True 275 event.reactor.update(sel)
276
277 - def on_selectable_readable(self, event):
278 os.read(self.pipe[0], 512) 279 while not self.queue.empty(): 280 requested = self.queue.get() 281 event.reactor.push_event(requested.context, requested.type) 282 if self._closed: 283 s = event.context 284 s.terminate() 285 event.reactor.update(s)
286
287 288 -class ApplicationEvent(EventBase):
289 """ 290 Application defined event, which can optionally be associated with 291 an engine object and or an arbitrary subject 292 """
293 - def __init__(self, typename, connection=None, session=None, link=None, delivery=None, subject=None):
294 super(ApplicationEvent, self).__init__(PN_PYREF, self, EventType(typename)) 295 self.connection = connection 296 self.session = session 297 self.link = link 298 self.delivery = delivery 299 if self.delivery: 300 self.link = self.delivery.link 301 if self.link: 302 self.session = self.link.session 303 if self.session: 304 self.connection = self.session.connection 305 self.subject = subject
306
307 - def __repr__(self):
308 objects = [self.connection, self.session, self.link, self.delivery, self.subject] 309 return "%s(%s)" % (self.type, ", ".join([str(o) for o in objects if o is not None]))
310
311 -class Transaction(object):
312 """ 313 Class to track state of an AMQP 1.0 transaction. 314 """
315 - def __init__(self, txn_ctrl, handler, settle_before_discharge=False):
316 self.txn_ctrl = txn_ctrl 317 self.handler = handler 318 self.id = None 319 self._declare = None 320 self._discharge = None 321 self.failed = False 322 self._pending = [] 323 self.settle_before_discharge = settle_before_discharge 324 self.declare()
325
326 - def commit(self):
327 self.discharge(False)
328
329 - def abort(self):
330 self.discharge(True)
331
332 - def declare(self):
333 self._declare = self._send_ctrl(symbol(u'amqp:declare:list'), [None])
334
335 - def discharge(self, failed):
336 self.failed = failed 337 self._discharge = self._send_ctrl(symbol(u'amqp:discharge:list'), [self.id, failed])
338
339 - def _send_ctrl(self, descriptor, value):
340 delivery = self.txn_ctrl.send(Message(body=Described(descriptor, value))) 341 delivery.transaction = self 342 return delivery
343
344 - def send(self, sender, msg, tag=None):
345 dlv = sender.send(msg, tag=tag) 346 dlv.local.data = [self.id] 347 dlv.update(0x34) 348 return dlv
349
350 - def accept(self, delivery):
351 self.update(delivery, PN_ACCEPTED) 352 if self.settle_before_discharge: 353 delivery.settle() 354 else: 355 self._pending.append(delivery)
356
357 - def update(self, delivery, state=None):
358 if state: 359 delivery.local.data = [self.id, Described(ulong(state), [])] 360 delivery.update(0x34)
361
362 - def _release_pending(self):
363 for d in self._pending: 364 d.update(Delivery.RELEASED) 365 d.settle() 366 self._clear_pending()
367
368 - def _clear_pending(self):
369 self._pending = []
370
371 - def handle_outcome(self, event):
372 if event.delivery == self._declare: 373 if event.delivery.remote.data: 374 self.id = event.delivery.remote.data[0] 375 self.handler.on_transaction_declared(event) 376 elif event.delivery.remote_state == Delivery.REJECTED: 377 self.handler.on_transaction_declare_failed(event) 378 else: 379 logging.warning("Unexpected outcome for declare: %s" % event.delivery.remote_state) 380 self.handler.on_transaction_declare_failed(event) 381 elif event.delivery == self._discharge: 382 if event.delivery.remote_state == Delivery.REJECTED: 383 if not self.failed: 384 self.handler.on_transaction_commit_failed(event) 385 self._release_pending() # make this optional? 386 else: 387 if self.failed: 388 self.handler.on_transaction_aborted(event) 389 self._release_pending() 390 else: 391 self.handler.on_transaction_committed(event) 392 self._clear_pending()
393
394 -class LinkOption(object):
395 """ 396 Abstract interface for link configuration options 397 """
398 - def apply(self, link):
399 """ 400 Subclasses will implement any configuration logic in this 401 method 402 """ 403 pass
404 - def test(self, link):
405 """ 406 Subclasses can override this to selectively apply an option 407 e.g. based on some link criteria 408 """ 409 return True
410
411 -class AtMostOnce(LinkOption):
412 - def apply(self, link):
414
415 -class AtLeastOnce(LinkOption):
416 - def apply(self, link):
419
420 -class SenderOption(LinkOption):
421 - def apply(self, sender): pass
422 - def test(self, link): return link.is_sender
423
424 -class ReceiverOption(LinkOption):
425 - def apply(self, receiver): pass
426 - def test(self, link): return link.is_receiver
427
428 -class DynamicNodeProperties(LinkOption):
429 - def __init__(self, props={}):
430 self.properties = {} 431 for k in props: 432 if isinstance(k, symbol): 433 self.properties[k] = props[k] 434 else: 435 self.properties[symbol(k)] = props[k]
436
437 - def apply(self, link):
442
443 -class Filter(ReceiverOption):
444 - def __init__(self, filter_set={}):
445 self.filter_set = filter_set
446
447 - def apply(self, receiver):
448 receiver.source.filter.put_dict(self.filter_set)
449
450 -class Selector(Filter):
451 """ 452 Configures a link with a message selector filter 453 """
454 - def __init__(self, value, name='selector'):
455 super(Selector, self).__init__({symbol(name): Described(symbol('apache.org:selector-filter:string'), value)})
456
457 -class DurableSubscription(ReceiverOption):
458 - def apply(self, receiver):
461
462 -class Move(ReceiverOption):
463 - def apply(self, receiver):
465
466 -class Copy(ReceiverOption):
467 - def apply(self, receiver):
469 477
478 -def _create_session(connection, handler=None):
479 session = connection.session() 480 session.open() 481 return session
482
483 484 -def _get_attr(target, name):
485 if hasattr(target, name): 486 return getattr(target, name) 487 else: 488 return None
489
490 -class SessionPerConnection(object):
491 - def __init__(self):
492 self._default_session = None
493
494 - def session(self, connection):
495 if not self._default_session: 496 self._default_session = _create_session(connection) 497 self._default_session.context = self 498 return self._default_session
499
500 - def on_session_remote_close(self, event):
501 event.connection.close() 502 self._default_session = None
503
504 -class GlobalOverrides(object):
505 """ 506 Internal handler that triggers the necessary socket connect for an 507 opened connection. 508 """
509 - def __init__(self, base):
510 self.base = base
511
512 - def on_unhandled(self, name, event):
513 if not self._override(event): 514 event.dispatch(self.base)
515
516 - def _override(self, event):
517 conn = event.connection 518 return conn and hasattr(conn, '_overrides') and event.dispatch(conn._overrides)
519
520 -class Connector(Handler):
521 """ 522 Internal handler that triggers the necessary socket connect for an 523 opened connection. 524 """
525 - def __init__(self, connection):
526 self.connection = connection 527 self.address = None 528 self.heartbeat = None 529 self.reconnect = None 530 self.ssl_domain = None 531 self.allow_insecure_mechs = True 532 self.allowed_mechs = None 533 self.sasl_enabled = True 534 self.user = None 535 self.password = None 536 self.virtual_host = None
537
538 - def _connect(self, connection, reactor):
539 assert(reactor is not None) 540 url = self.address.next() 541 reactor.set_connection_host(connection, url.host, str(url.port)) 542 # if virtual-host not set, use host from address as default 543 if self.virtual_host is None: 544 connection.hostname = url.host 545 logging.debug("connecting to %s..." % url) 546 547 transport = Transport() 548 if self.sasl_enabled: 549 sasl = transport.sasl() 550 sasl.allow_insecure_mechs = self.allow_insecure_mechs 551 if url.username: 552 connection.user = url.username 553 elif self.user: 554 connection.user = self.user 555 if url.password: 556 connection.password = url.password 557 elif self.password: 558 connection.password = self.password 559 if self.allowed_mechs: 560 sasl.allowed_mechs(self.allowed_mechs) 561 transport.bind(connection) 562 if self.heartbeat: 563 transport.idle_timeout = self.heartbeat 564 if url.scheme == 'amqps': 565 if not self.ssl_domain: 566 raise SSLUnavailable("amqps: SSL libraries not found") 567 self.ssl = SSL(transport, self.ssl_domain) 568 self.ssl.peer_hostname = url.host
569
570 - def on_connection_local_open(self, event):
571 self._connect(event.connection, event.reactor)
572
573 - def on_connection_remote_open(self, event):
574 logging.debug("connected to %s" % event.connection.hostname) 575 if self.reconnect: 576 self.reconnect.reset() 577 self.transport = None
578
579 - def on_transport_tail_closed(self, event):
580 self.on_transport_closed(event)
581
582 - def on_transport_closed(self, event):
583 if self.connection and self.connection.state & Endpoint.LOCAL_ACTIVE: 584 if self.reconnect: 585 event.transport.unbind() 586 delay = self.reconnect.next() 587 if delay == 0: 588 logging.info("Disconnected, reconnecting...") 589 self._connect(self.connection, event.reactor) 590 else: 591 logging.info("Disconnected will try to reconnect after %s seconds" % delay) 592 event.reactor.schedule(delay, self) 593 else: 594 logging.debug("Disconnected") 595 self.connection = None
596
597 - def on_timer_task(self, event):
598 self._connect(self.connection, event.reactor)
599
600 - def on_connection_remote_close(self, event):
601 self.connection = None
602
603 -class Backoff(object):
604 """ 605 A reconnect strategy involving an increasing delay between 606 retries, up to a maximum or 10 seconds. 607 """
608 - def __init__(self):
609 self.delay = 0
610
611 - def reset(self):
612 self.delay = 0
613
614 - def next(self):
615 current = self.delay 616 if current == 0: 617 self.delay = 0.1 618 else: 619 self.delay = min(10, 2*current) 620 return current
621
622 -class Urls(object):
623 - def __init__(self, values):
624 self.values = [Url(v) for v in values] 625 self.i = iter(self.values)
626
627 - def __iter__(self):
628 return self
629
630 - def next(self):
631 try: 632 return next(self.i) 633 except StopIteration: 634 self.i = iter(self.values) 635 return next(self.i)
636
637 -class SSLConfig(object):
638 - def __init__(self):
639 self.client = SSLDomain(SSLDomain.MODE_CLIENT) 640 self.server = SSLDomain(SSLDomain.MODE_SERVER)
641
642 - def set_credentials(self, cert_file, key_file, password):
643 self.client.set_credentials(cert_file, key_file, password) 644 self.server.set_credentials(cert_file, key_file, password)
645
646 - def set_trusted_ca_db(self, certificate_db):
647 self.client.set_trusted_ca_db(certificate_db) 648 self.server.set_trusted_ca_db(certificate_db)
649
650 651 -class Container(Reactor):
652 """A representation of the AMQP concept of a 'container', which 653 lossely speaking is something that establishes links to or from 654 another container, over which messages are transfered. This is 655 an extension to the Reactor class that adds convenience methods 656 for creating connections and sender- or receiver- links. 657 """
658 - def __init__(self, *handlers, **kwargs):
659 super(Container, self).__init__(*handlers, **kwargs) 660 if "impl" not in kwargs: 661 try: 662 self.ssl = SSLConfig() 663 except SSLUnavailable: 664 self.ssl = None 665 self.global_handler = GlobalOverrides(kwargs.get('global_handler', self.global_handler)) 666 self.trigger = None 667 self.container_id = str(generate_uuid()) 668 self.allow_insecure_mechs = True 669 self.allowed_mechs = None 670 self.sasl_enabled = True 671 self.user = None 672 self.password = None 673 Wrapper.__setattr__(self, 'subclass', self.__class__)
674
675 - def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None, ssl_domain=None, **kwargs):
676 """ 677 Initiates the establishment of an AMQP connection. Returns an 678 instance of proton.Connection. 679 680 @param url: URL string of process to connect to 681 682 @param urls: list of URL strings of process to try to connect to 683 684 Only one of url or urls should be specified. 685 686 @param reconnect: A value of False will prevent the library 687 form automatically trying to reconnect if the underlying 688 socket is disconnected before the connection has been closed. 689 690 @param heartbeat: A value in milliseconds indicating the 691 desired frequency of heartbeats used to test the underlying 692 socket is alive. 693 694 @param ssl_domain: SSL configuration in the form of an 695 instance of proton.SSLdomain. 696 697 @param handler: a connection scoped handler that will be 698 called to process any events in the scope of this connection 699 or its child links 700 701 @param kwargs: sasl_enabled, which determines whether a sasl layer is 702 used for the connection; allowed_mechs an optional list of SASL 703 mechanisms to allow if sasl is enabled; allow_insecure_mechs a flag 704 indicating whether insecure mechanisms, such as PLAIN over a 705 non-encrypted socket, are allowed; 'virtual_host' the hostname to set 706 in the Open performative used by peer to determine the correct 707 back-end service for the client. If 'virtual_host' is not supplied the 708 host field from the URL is used instead." 709 710 """ 711 conn = self.connection(handler) 712 conn.container = self.container_id or str(generate_uuid()) 713 conn.offered_capabilities = kwargs.get('offered_capabilities') 714 conn.desired_capabilities = kwargs.get('desired_capabilities') 715 conn.properties = kwargs.get('properties') 716 717 connector = Connector(conn) 718 connector.allow_insecure_mechs = kwargs.get('allow_insecure_mechs', self.allow_insecure_mechs) 719 connector.allowed_mechs = kwargs.get('allowed_mechs', self.allowed_mechs) 720 connector.sasl_enabled = kwargs.get('sasl_enabled', self.sasl_enabled) 721 connector.user = kwargs.get('user', self.user) 722 connector.password = kwargs.get('password', self.password) 723 connector.virtual_host = kwargs.get('virtual_host') 724 if connector.virtual_host: 725 # only set hostname if virtual-host is a non-empty string 726 conn.hostname = connector.virtual_host 727 728 conn._overrides = connector 729 if url: connector.address = Urls([url]) 730 elif urls: connector.address = Urls(urls) 731 elif address: connector.address = address 732 else: raise ValueError("One of url, urls or address required") 733 if heartbeat: 734 connector.heartbeat = heartbeat 735 if reconnect: 736 connector.reconnect = reconnect 737 elif reconnect is None: 738 connector.reconnect = Backoff() 739 # use container's default client domain if none specified. This is 740 # only necessary of the URL specifies the "amqps:" scheme 741 connector.ssl_domain = ssl_domain or (self.ssl and self.ssl.client) 742 conn._session_policy = SessionPerConnection() #todo: make configurable 743 conn.open() 744 return conn
745
746 - def _get_id(self, container, remote, local):
747 if local and remote: "%s-%s-%s" % (container, remote, local) 748 elif local: return "%s-%s" % (container, local) 749 elif remote: return "%s-%s" % (container, remote) 750 else: return "%s-%s" % (container, str(generate_uuid()))
751
752 - def _get_session(self, context):
753 if isinstance(context, Url): 754 return self._get_session(self.connect(url=context)) 755 elif isinstance(context, Session): 756 return context 757 elif isinstance(context, Connection): 758 if hasattr(context, '_session_policy'): 759 return context._session_policy.session(context) 760 else: 761 return _create_session(context) 762 else: 763 return context.session()
764
765 - def create_sender(self, context, target=None, source=None, name=None, handler=None, tags=None, options=None):
766 """ 767 Initiates the establishment of a link over which messages can 768 be sent. Returns an instance of proton.Sender. 769 770 There are two patterns of use. (1) A connection can be passed 771 as the first argument, in which case the link is established 772 on that connection. In this case the target address can be 773 specified as the second argument (or as a keyword 774 argument). The source address can also be specified if 775 desired. (2) Alternatively a URL can be passed as the first 776 argument. In this case a new connection will be establised on 777 which the link will be attached. If a path is specified and 778 the target is not, then the path of the URL is used as the 779 target address. 780 781 The name of the link may be specified if desired, otherwise a 782 unique name will be generated. 783 784 Various LinkOptions can be specified to further control the 785 attachment. 786 """ 787 if isinstance(context, _compat.STRING_TYPES): 788 context = Url(context) 789 if isinstance(context, Url) and not target: 790 target = context.path 791 session = self._get_session(context) 792 snd = session.sender(name or self._get_id(session.connection.container, target, source)) 793 if source: 794 snd.source.address = source 795 if target: 796 snd.target.address = target 797 if handler != None: 798 snd.handler = handler 799 if tags: 800 snd.tag_generator = tags 801 _apply_link_options(options, snd) 802 snd.open() 803 return snd
804
805 - def create_receiver(self, context, source=None, target=None, name=None, dynamic=False, handler=None, options=None):
806 """ 807 Initiates the establishment of a link over which messages can 808 be received (aka a subscription). Returns an instance of 809 proton.Receiver. 810 811 There are two patterns of use. (1) A connection can be passed 812 as the first argument, in which case the link is established 813 on that connection. In this case the source address can be 814 specified as the second argument (or as a keyword 815 argument). The target address can also be specified if 816 desired. (2) Alternatively a URL can be passed as the first 817 argument. In this case a new connection will be establised on 818 which the link will be attached. If a path is specified and 819 the source is not, then the path of the URL is used as the 820 target address. 821 822 The name of the link may be specified if desired, otherwise a 823 unique name will be generated. 824 825 Various LinkOptions can be specified to further control the 826 attachment. 827 """ 828 if isinstance(context, _compat.STRING_TYPES): 829 context = Url(context) 830 if isinstance(context, Url) and not source: 831 source = context.path 832 session = self._get_session(context) 833 rcv = session.receiver(name or self._get_id(session.connection.container, source, target)) 834 if source: 835 rcv.source.address = source 836 if dynamic: 837 rcv.source.dynamic = True 838 if target: 839 rcv.target.address = target 840 if handler != None: 841 rcv.handler = handler 842 _apply_link_options(options, rcv) 843 rcv.open() 844 return rcv
845
846 - def declare_transaction(self, context, handler=None, settle_before_discharge=False):
847 if not _get_attr(context, '_txn_ctrl'): 848 class InternalTransactionHandler(OutgoingMessageHandler): 849 def __init__(self): 850 super(InternalTransactionHandler, self).__init__(auto_settle=True)
851 852 def on_settled(self, event): 853 if hasattr(event.delivery, "transaction"): 854 event.transaction = event.delivery.transaction 855 event.delivery.transaction.handle_outcome(event)
856 context._txn_ctrl = self.create_sender(context, None, name='txn-ctrl', handler=InternalTransactionHandler()) 857 context._txn_ctrl.target.type = Terminus.COORDINATOR 858 context._txn_ctrl.target.capabilities.put_object(symbol(u'amqp:local-transactions')) 859 return Transaction(context._txn_ctrl, handler, settle_before_discharge) 860
861 - def listen(self, url, ssl_domain=None):
862 """ 863 Initiates a server socket, accepting incoming AMQP connections 864 on the interface and port specified. 865 """ 866 url = Url(url) 867 acceptor = self.acceptor(url.host, url.port) 868 ssl_config = ssl_domain 869 if not ssl_config and url.scheme == 'amqps': 870 # use container's default server domain 871 if self.ssl: 872 ssl_config = self.ssl.server 873 else: 874 raise SSLUnavailable("amqps: SSL libraries not found") 875 if ssl_config: 876 acceptor.set_ssl_domain(ssl_config) 877 return acceptor
878
879 - def do_work(self, timeout=None):
880 if timeout: 881 self.timeout = timeout 882 return self.process()
883