Source code for jeepney.io.blocking

"""Synchronous IO wrappers around jeepney
"""
from collections import deque
from errno import ECONNRESET
import functools
from itertools import count
import os
from selectors import DefaultSelector, EVENT_READ
import socket
import time
from typing import Optional

from jeepney import Parser, Message, MessageType, HeaderFields
from jeepney.auth import SASLParser, make_auth_external, BEGIN, AuthenticationError
from jeepney.bus import get_bus
from jeepney.wrappers import ProxyBase, unwrap_msg
from jeepney.routing import Router
from jeepney.bus_messages import message_bus
from .common import MessageFilters, FilterHandle, check_replyable


class _Future:
    def __init__(self):
        self._result = None

    def done(self):
        return bool(self._result)

    def set_exception(self, exception):
        self._result = (False, exception)

    def set_result(self, result): 
        self._result = (True, result)

    def result(self):
        success, value = self._result
        if success:
            return value
        raise value


def timeout_to_deadline(timeout):
    if timeout is not None:
        return time.monotonic() + timeout
    return None

def deadline_to_timeout(deadline):
    if deadline is not None:
        return deadline - time.monotonic()
    return None


[docs]class DBusConnection: def __init__(self, sock): self.sock = sock self.parser = Parser() self.outgoing_serial = count(start=1) self.selector = DefaultSelector() self.select_key = self.selector.register(sock, EVENT_READ) self._unwrap_reply = False # Message routing machinery self.router = Router(_Future) # Old interface, for backwards compat self._filters = MessageFilters() # Say Hello, get our unique name self.bus_proxy = Proxy(message_bus, self) hello_reply = self.bus_proxy.Hello() self.unique_name = hello_reply[0] def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() return False
[docs] def send(self, message: Message, serial=None): """Serialise and send a :class:`~.Message` object""" if serial is None: serial = next(self.outgoing_serial) data = message.serialise(serial=serial) self.sock.sendall(data)
send_message = send # Backwards compatibility
[docs] def receive(self, *, timeout=None) -> Message: """Return the next available message from the connection If the data is ready, this will return immediately, even if timeout<=0. Otherwise, it will wait for up to timeout seconds, or indefinitely if timeout is None. If no message comes in time, it raises TimeoutError. """ deadline = timeout_to_deadline(timeout) while True: msg = self.parser.get_next_message() if msg is not None: return msg b = self._read_some_data(timeout=deadline_to_timeout(deadline)) self.parser.add_data(b)
def _read_some_data(self, timeout=None): for key, ev in self.selector.select(timeout): if key == self.select_key: return unwrap_read(self.sock.recv(4096)) raise TimeoutError
[docs] def recv_messages(self, *, timeout=None): """Receive one message and apply filters See :meth:`filter`. Returns nothing. """ msg = self.receive(timeout=timeout) self.router.incoming(msg) for filter in self._filters.matches(msg): filter.queue.append(msg)
[docs] def send_and_get_reply(self, message, *, timeout=None, unwrap=None): """Send a message, wait for the reply and return it Filters are applied to other messages received before the reply - see :meth:`add_filter`. """ check_replyable(message) deadline = timeout_to_deadline(timeout) if unwrap is None: unwrap = self._unwrap_reply serial = next(self.outgoing_serial) self.send_message(message, serial=serial) while True: msg_in = self.receive(timeout=deadline_to_timeout(deadline)) reply_to = msg_in.header.fields.get(HeaderFields.reply_serial, -1) if reply_to == serial: if unwrap: return unwrap_msg(msg_in) return msg_in # Not the reply self.router.incoming(msg_in) for filter in self._filters.matches(msg_in): filter.queue.append(msg_in)
[docs] def filter(self, rule, *, queue: Optional[deque] =None, bufsize=1): """Create a filter for incoming messages Usage:: with conn.filter(rule) as matches: # matches is a deque containing matched messages matching_msg = conn.recv_until_filtered(matches) :param jeepney.MatchRule rule: Catch messages matching this rule :param collections.deque queue: Matched messages will be added to this :param int bufsize: If no deque is passed in, create one with this size """ return FilterHandle(self._filters, rule, queue or deque(maxlen=bufsize))
[docs] def recv_until_filtered(self, queue, *, timeout=None) -> Message: """Process incoming messages until one is filtered into queue Pops the message from queue and returns it, or raises TimeoutError if the optional timeout expires. Without a timeout, this is equivalent to:: while len(queue) == 0: conn.recv_messages() return queue.popleft() In the other I/O modules, there is no need for this, because messages are placed in queues by a separate task. :param collections.deque queue: A deque connected by :meth:`filter` :param float timeout: Maximum time to wait in seconds """ deadline = timeout_to_deadline(timeout) while len(queue) == 0: self.recv_messages(timeout=deadline_to_timeout(deadline)) return queue.popleft()
[docs] def close(self): """Close this connection""" self.selector.close() self.sock.close()
[docs]class Proxy(ProxyBase): """A blocking proxy for calling D-Bus methods You can call methods on the proxy object, such as ``bus_proxy.Hello()`` to make a method call over D-Bus and wait for a reply. It will either return a tuple of returned data, or raise :exc:`.DBusErrorResponse`. The methods available are defined by the message generator you wrap. You can set a time limit on a call by passing ``_timeout=`` in the method call, or set a default when creating the proxy. The ``_timeout`` argument is not passed to the message generator. All timeouts are in seconds, and :exc:`TimeoutErrror` is raised if it expires before a reply arrives. :param msggen: A message generator object :param ~blocking.DBusConnection connection: Connection to send and receive messages :param float timeout: Default seconds to wait for a reply, or None for no limit """ def __init__(self, msggen, connection, *, timeout=None): super().__init__(msggen) self._connection = connection self._timeout = timeout def __repr__(self): extra = '' if (self._timeout is None) else f', timeout={self._timeout}' return f"Proxy({self._msggen}, {self._connection}{extra})" def _method_call(self, make_msg): @functools.wraps(make_msg) def inner(*args, **kwargs): timeout = kwargs.pop('_timeout', self._timeout) msg = make_msg(*args, **kwargs) assert msg.header.message_type is MessageType.method_call return self._connection.send_and_get_reply( msg, timeout=timeout, unwrap=True ) return inner
def unwrap_read(b): """Raise ConnectionResetError from an empty read. Sometimes the socket raises an error itself, sometimes it gives no data. I haven't worked out when it behaves each way. """ if not b: raise ConnectionResetError(ECONNRESET, os.strerror(ECONNRESET)) return b
[docs]def open_dbus_connection(bus='SESSION') -> DBusConnection: """Connect to a D-Bus message bus""" bus_addr = get_bus(bus) sock = socket.socket(family=socket.AF_UNIX) sock.connect(bus_addr) sock.sendall(b'\0' + make_auth_external()) auth_parser = SASLParser() while not auth_parser.authenticated: auth_parser.feed(unwrap_read(sock.recv(1024))) if auth_parser.error: raise AuthenticationError(auth_parser.error) sock.sendall(BEGIN) conn = DBusConnection(sock) conn.parser.add_data(auth_parser.buffer) return conn
if __name__ == '__main__': conn = open_dbus_connection() print("Unique name:", conn.unique_name)