1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 """
19 Transports are objects responsible for connecting to XMPP server and putting
20 data to wrapped sockets in in desired form (SSL, TLS, TCP, for HTTP proxy,
21 for SOCKS5 proxy...)
22
23 Transports are not aware of XMPP stanzas and only responsible for low-level
24 connection handling.
25 """
26
27 from simplexml import ustr
28 from plugin import PlugIn
29 from idlequeue import IdleObject
30 import proxy_connectors
31 import tls_nb
32
33 import socket
34 import errno
35 import time
36 import traceback
37 import base64
38 import urlparse
39
40 import logging
41 log = logging.getLogger('nbxmpp.transports_nb')
42
44 """
45 Function for splitting URI string to tuple (protocol, host, port, path).
46 e.g. urisplit('http://httpcm.jabber.org:123/webclient') returns ('http',
47 'httpcm.jabber.org', 123, '/webclient') return 443 as default port if proto
48 is https else 80
49 """
50 splitted = urlparse.urlsplit(uri)
51 proto, host, path = splitted.scheme, splitted.hostname, splitted.path
52 try:
53 port = splitted.port
54 except ValueError:
55 log.warn('port cannot be extracted from BOSH URL %s, using default port' \
56 % uri)
57 port = ''
58 if not port:
59 if proto == 'https':
60 port = 443
61 else:
62 port = 80
63 return proto, host, port, path
64
66 tcp_host, tcp_port, proxy_user, proxy_pass = None, None, None, None
67 proxy_type = proxy['type']
68 if proxy_type == 'bosh' and not proxy['bosh_useproxy']:
69
70 proto, tcp_host, tcp_port, path = urisplit(proxy['bosh_uri'])
71 else:
72
73
74 tcp_host, tcp_port = proxy['host'], proxy['port']
75 if proxy.get('useauth', False):
76 proxy_user, proxy_pass = proxy['user'], proxy['pass']
77 return tcp_host, tcp_port, proxy_user, proxy_pass
78
79
80 CONNECT_TIMEOUT_SECONDS = 30
81
82
83 DISCONNECT_TIMEOUT_SECONDS = 5
84
85
86
87 RECV_BUFSIZE = 32768
88
89
90
91
92 DATA_RECEIVED = 'DATA RECEIVED'
93 DATA_SENT = 'DATA SENT'
94 DATA_ERROR = 'DATA ERROR'
95
96 DISCONNECTED = 'DISCONNECTED'
97 DISCONNECTING = 'DISCONNECTING'
98 CONNECTING = 'CONNECTING'
99 PROXY_CONNECTING = 'PROXY_CONNECTING'
100 CONNECTED = 'CONNECTED'
101 STATES = (DISCONNECTED, CONNECTING, PROXY_CONNECTING, CONNECTED, DISCONNECTING)
102
104 """
105 Abstract class representing a transport
106
107 Subclasses CAN have different constructor signature but connect method SHOULD
108 be the same.
109 """
110
111 - def __init__(self, raise_event, on_disconnect, idlequeue, estabilish_tls,
112 certs):
113 """
114 Each trasport class can have different constructor but it has to have at
115 least all the arguments of NonBlockingTransport constructor
116
117 :param raise_event: callback for monitoring of sent and received data
118 :param on_disconnect: callback called on disconnection during runtime
119 :param idlequeue: processing idlequeue
120 :param estabilish_tls: boolean whether to estabilish TLS connection after
121 TCP connection is done
122 :param certs: tuple of (cacerts, mycerts) see constructor of
123 tls_nb.NonBlockingTLS for more details
124 """
125 PlugIn.__init__(self)
126 self.raise_event = raise_event
127 self.on_disconnect = on_disconnect
128 self.on_connect = None
129 self.on_connect_failure = None
130 self.idlequeue = idlequeue
131 self.on_receive = None
132 self.server = None
133 self.port = None
134 self.conn_5tuple = None
135 self.set_state(DISCONNECTED)
136 self.estabilish_tls = estabilish_tls
137 self.certs = certs
138
139 self.ssl_lib = None
140 self._exported_methods=[self.onreceive, self.set_send_timeout,
141 self.set_send_timeout2, self.set_timeout, self.remove_timeout,
142 self.start_disconnect]
143
144
145 self.sendtimeout = 0
146
147
148 self.on_timeout = None
149 self.on_timeout2 = None
150
152 owner.Connection = self
153
155 self._owner.Connection = None
156 self._owner = None
157 self.disconnect(do_callback=False)
158
159 - def connect(self, conn_5tuple, on_connect, on_connect_failure):
160 """
161 Creates and connects transport to server and port defined in conn_5tuple
162 which should be item from list returned from getaddrinfo
163
164 :param conn_5tuple: 5-tuple returned from getaddrinfo
165 :param on_connect: callback called on successful connect to the server
166 :param on_connect_failure: callback called on failure when connecting
167 """
168 self.on_connect = on_connect
169 self.on_connect_failure = on_connect_failure
170 self.server, self.port = conn_5tuple[4][:2]
171 self.conn_5tuple = conn_5tuple
172
174 assert(newstate in STATES)
175 self.state = newstate
176
179
181 """
182 Preceeds call of on_connect callback
183 """
184
185
186 self.set_state(CONNECTED)
187 self.on_connect()
188
190 """
191 Preceeds call of on_connect_failure callback
192 """
193
194
195
196 self.disconnect(do_callback=False)
197 self.on_connect_failure(err_message=err_message)
198
199 - def send(self, raw_data, now=False):
203
209
211 """
212 Set the on_receive callback.
213
214 onreceive(None) sets callback to Dispatcher.ProcessNonBlocking which is
215 the default one that will decide what to do with received stanza based on
216 its tag name and namespace.
217
218 Do not confuse it with on_receive() method, which is the callback
219 itself.
220 """
221 if not recv_handler:
222 if hasattr(self, '_owner') and hasattr(self._owner, 'Dispatcher'):
223 self.on_receive = self._owner.Dispatcher.ProcessNonBlocking
224 else:
225 log.warn('No Dispatcher plugged. Received data will not be processed')
226 self.on_receive = None
227 return
228 self.on_receive = recv_handler
229
232
234 """
235 Called when there's no response from server in defined timeout
236 """
237 if self.on_timeout:
238 self.on_timeout()
239 self.renew_send_timeout()
240
242 """
243 called when there's no response from server in defined timeout
244 """
245 if self.on_timeout2:
246 self.on_timeout2()
247 self.renew_send_timeout2()
248
250 if self.on_timeout and self.sendtimeout > 0:
251 self.set_timeout(self.sendtimeout)
252
254 if self.on_timeout2 and self.sendtimeout2 > 0:
255 self.set_timeout2(self.sendtimeout2)
256
259
262
265
268
270 self.sendtimeout = timeout
271 if self.sendtimeout > 0:
272 self.on_timeout = on_timeout
273 else:
274 self.on_timeout = None
275
277 self.sendtimeout2 = timeout2
278 if self.sendtimeout2 > 0:
279 self.on_timeout2 = on_timeout2
280 else:
281 self.on_timeout2 = None
282
283
286
287
289 """
290 Non-blocking TCP socket wrapper
291
292 It is used for simple XMPP connection. Can be connected via proxy and can
293 estabilish TLS connection.
294 """
295 - def __init__(self, raise_event, on_disconnect, idlequeue, estabilish_tls,
296 certs, proxy_dict=None):
297 """
298 :param proxy_dict: dictionary with proxy data as loaded from config file
299 """
300 NonBlockingTransport.__init__(self, raise_event, on_disconnect, idlequeue,
301 estabilish_tls, certs)
302 IdleObject.__init__(self)
303
304
305 self.sendqueue = []
306
307
308 self.sendbuff = ''
309
310 self.proxy_dict = proxy_dict
311 self.on_remote_disconnect = self.disconnect
312
313
314 self.ssl_fingerprint_sha1 = []
315 self.ssl_certificate = []
316 self.ssl_errnum = []
317 self.ssl_cert_pem = []
318
319
324
325 - def connect(self, conn_5tuple, on_connect, on_connect_failure):
326 NonBlockingTransport.connect(self, conn_5tuple, on_connect,
327 on_connect_failure)
328 log.info('NonBlockingTCP Connect :: About to connect to %s:%s' %
329 (self.server, self.port))
330
331 try:
332 self._sock = socket.socket(*conn_5tuple[:3])
333 except socket.error, (errnum, errstr):
334 self._on_connect_failure('NonBlockingTCP Connect: Error while creating\
335 socket: %s %s' % (errnum, errstr))
336 return
337
338 self._send = self._sock.send
339 self._recv = self._sock.recv
340 self.fd = self._sock.fileno()
341
342
343
344 self._plug_idle(writable=True, readable=False)
345 self.peerhost = None
346
347
348
349 errnum = 0
350 errstr = str()
351
352
353
354 self.idlequeue.set_read_timeout(self.fd, CONNECT_TIMEOUT_SECONDS)
355
356 try:
357 self._sock.setblocking(False)
358 self._sock.connect((self.server, self.port))
359 except Exception, exc:
360 if type(exc.args) == tuple:
361 errnum, errstr = exc.args
362 else:
363 errnum = 'unknown'
364 errstr = exc.args
365
366 if errnum in (errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK):
367
368 log.info('After NB connect() of %s. "%s" raised => CONNECTING' %
369 (id(self), errstr))
370 self._tcp_connecting_started()
371 return
372
373
374
375 self._on_connect_failure('Exception while connecting to %s:%s - %s %s' %
376 (self.server, self.port, errnum, errstr))
377
392
405
407 """
408 Estabilishes TLS/SSL using this TCP connection by plugging a
409 NonBlockingTLS module
410 """
411 cacerts, mycerts = self.certs
412 result = tls_nb.NonBlockingTLS.get_instance(cacerts, mycerts).PlugIn(self)
413 if result:
414 on_succ()
415 else:
416 on_fail()
417
419 """
420 Called by idlequeu when receive on plugged socket is possible
421 """
422 log.info('pollin called, state == %s' % self.get_state())
423 self._do_receive()
424
442
454
469
479
486
492
493 - def send(self, raw_data, now=False):
494 """
495 Append raw_data to the queue of messages to be send. If supplied data is
496 unicode string, encode it to utf-8.
497 """
498 NonBlockingTransport.send(self, raw_data, now)
499
500 r = self.encode_stanza(raw_data)
501
502 if now:
503 self.sendqueue.insert(0, r)
504 self._do_send()
505 else:
506 self.sendqueue.append(r)
507
508 self._plug_idle(writable=True, readable=True)
509
511 """
512 Encode str or unicode to utf-8
513 """
514 if isinstance(stanza, unicode):
515 stanza = stanza.encode('utf-8')
516 elif not isinstance(stanza, str):
517 stanza = ustr(stanza).encode('utf-8')
518 return stanza
519
521 """
522 Plug file descriptor of socket to Idlequeue
523
524 Plugged socket will be watched for "send possible" or/and "recv possible"
525 events. pollin() callback is invoked on "recv possible", pollout() on
526 "send_possible".
527
528 Plugged socket will always be watched for "error" event - in that case,
529 pollend() is called.
530 """
531 log.info('Plugging fd %d, W:%s, R:%s' % (self.fd, writable, readable))
532 self.idlequeue.plug_idle(self, writable, readable)
533
535 """
536 Called when send() to connected socket is possible. First message from
537 sendqueue will be sent
538 """
539 if not self.sendbuff:
540 if not self.sendqueue:
541 log.warn('calling send on empty buffer and queue')
542 self._plug_idle(writable=False, readable=True)
543 return None
544 self.sendbuff = self.sendqueue.pop(0)
545 try:
546 send_count = self._send(self.sendbuff)
547 if send_count:
548 sent_data = self.sendbuff[:send_count]
549 self.sendbuff = self.sendbuff[send_count:]
550 self._plug_idle(
551 writable=((self.sendqueue!=[]) or (self.sendbuff!='')),
552 readable=True)
553 self.raise_event(DATA_SENT, sent_data)
554
555 except Exception:
556 log.error('_do_send:', exc_info=True)
557 traceback.print_exc()
558 self.disconnect()
559
561 """
562 Reads all pending incoming data. Will call owner's disconnected() method
563 if appropriate
564 """
565 received = None
566 errnum = 0
567 errstr = 'No Error Set'
568
569 try:
570
571 received = self._recv(RECV_BUFSIZE)
572 except socket.error, (errnum, errstr):
573 log.info("_do_receive: got %s:" % received, exc_info=True)
574 except tls_nb.SSLWrapper.Error, e:
575 log.info("_do_receive, caught SSL error, got %s:" % received,
576 exc_info=True)
577 errnum, errstr = e.errno, e.strerror
578
579 if received == '':
580 errstr = 'zero bytes on recv'
581
582 if (self.ssl_lib is None and received == '') or \
583 (self.ssl_lib == tls_nb.PYSTDLIB and errnum == 8 ) or \
584 (self.ssl_lib == tls_nb.PYOPENSSL and errnum == -1 ):
585
586
587 log.info("Disconnected by remote server: #%s, %s" % (errnum, errstr))
588 self.on_remote_disconnect()
589 return
590
591 if errnum:
592 log.info("Connection to %s:%s lost: %s %s" % (self.server, self.port,
593 errnum, errstr), exc_info=True)
594 self.disconnect()
595 return
596
597
598
599 if received is None:
600 return
601
602
603 self.remove_timeout()
604 self.renew_send_timeout()
605 self.renew_send_timeout2()
606
607 if self.on_receive:
608 self.raise_event(DATA_RECEIVED, received)
609 self._on_receive(received)
610 else:
611
612
613
614 log.error('SOCKET %s Unhandled data received: %s' % (id(self),
615 received))
616 self.disconnect()
617
619 """
620 Preceeds on_receive callback. It peels off and checks HTTP headers in
621 HTTP classes, in here it just calls the callback
622 """
623 self.on_receive(data)
624
625
627 """
628 Socket wrapper that creates HTTP message out of sent data and peels-off HTTP
629 headers from incoming messages
630 """
631
632 - def __init__(self, raise_event, on_disconnect, idlequeue, estabilish_tls,
633 certs, on_http_request_possible, on_persistent_fallback, http_dict,
634 proxy_dict=None):
635 """
636 :param on_http_request_possible: method to call when HTTP request to
637 socket owned by transport is possible.
638 :param on_persistent_fallback: callback called when server ends TCP
639 connection. It doesn't have to be fatal for HTTP session.
640 :param http_dict: dictionary with data for HTTP request and headers
641 """
642 NonBlockingTCP.__init__(self, raise_event, on_disconnect, idlequeue,
643 estabilish_tls, certs, proxy_dict)
644
645 self.http_protocol, self.http_host, self.http_port, self.http_path = \
646 urisplit(http_dict['http_uri'])
647 self.http_protocol = self.http_protocol or 'http'
648 self.http_path = self.http_path or '/'
649 self.http_version = http_dict['http_version']
650 self.http_persistent = http_dict['http_persistent']
651 self.add_proxy_headers = http_dict['add_proxy_headers']
652
653 if 'proxy_user' in http_dict and 'proxy_pass' in http_dict:
654 self.proxy_user, self.proxy_pass = http_dict['proxy_user'], http_dict[
655 'proxy_pass']
656 else:
657 self.proxy_user, self.proxy_pass = None, None
658
659
660 self.recvbuff = ''
661 self.expected_length = 0
662 self.pending_requests = 0
663 self.on_http_request_possible = on_http_request_possible
664 self.last_recv_time = 0
665 self.close_current_connection = False
666 self.on_remote_disconnect = lambda: on_persistent_fallback(self)
667
670
672 """
673 Preceeds passing received data to owner class. Gets rid of HTTP headers
674 and checks them.
675 """
676 if self.get_state() == PROXY_CONNECTING:
677 NonBlockingTCP._on_receive(self, data)
678 return
679
680
681 self.recvbuff = '%s%s' % (self.recvbuff or '', data)
682 statusline, headers, httpbody, buffer_rest = self.parse_http_message(
683 self.recvbuff)
684
685 if not (statusline and headers and httpbody):
686 log.debug('Received incomplete HTTP response')
687 return
688
689 if statusline[1] != '200':
690 log.error('HTTP Error: %s %s' % (statusline[1], statusline[2]))
691 self.disconnect()
692 return
693 self.expected_length = int(headers['Content-Length'])
694 if 'Connection' in headers and headers['Connection'].strip()=='close':
695 self.close_current_connection = True
696
697 if self.expected_length > len(httpbody):
698
699
700 log.info('not enough bytes in HTTP response - %d expected, got %d' %
701 (self.expected_length, len(httpbody)))
702 else:
703
704
705 self.recvbuff = buffer_rest
706
707
708 self.expected_length = 0
709
710 if not self.http_persistent or self.close_current_connection:
711
712 self.disconnect(do_callback=False)
713 self.close_current_connection = False
714 self.last_recv_time = time.time()
715 self.on_receive(data=httpbody, socket=self)
716 self.on_http_request_possible()
717
719 """
720 Builds http message with given body. Values for headers and status line
721 fields are taken from class variables
722 """
723 absolute_uri = '%s://%s:%s%s' % (self.http_protocol, self.http_host,
724 self.http_port, self.http_path)
725 headers = ['%s %s %s' % (method, absolute_uri, self.http_version),
726 'Host: %s:%s' % (self.http_host, self.http_port),
727 'User-Agent: Gajim',
728 'Content-Type: text/xml; charset=utf-8',
729 'Content-Length: %s' % len(str(httpbody))]
730 if self.add_proxy_headers:
731 headers.append('Proxy-Connection: keep-alive')
732 headers.append('Pragma: no-cache')
733 if self.proxy_user and self.proxy_pass:
734 credentials = '%s:%s' % (self.proxy_user, self.proxy_pass)
735 credentials = base64.encodestring(credentials).strip()
736 headers.append('Proxy-Authorization: Basic %s' % credentials)
737 else:
738 headers.append('Connection: Keep-Alive')
739 headers.append('\r\n')
740 headers = '\r\n'.join(headers)
741 return('%s%s' % (headers, httpbody))
742
744 """
745 Split http message into a tuple:
746 - (statusline - list of e.g. ['HTTP/1.1', '200', 'OK'],
747 - headers - dictionary of headers e.g. {'Content-Length': '604',
748 'Content-Type': 'text/xml; charset=utf-8'},
749 - httpbody - string with http body)
750 - http_rest - what is left in the message after a full HTTP header + body
751 """
752 splitted = message.split('\r\n\r\n')
753 if len(splitted) < 2:
754
755 buffer_rest = message
756 return ('', '', '', buffer_rest)
757 else:
758 (header, httpbody) = splitted[:2]
759 header = header.replace('\r', '')
760 header = header.lstrip('\n')
761 header = header.split('\n')
762 statusline = header[0].split(' ', 2)
763 header = header[1:]
764 headers = {}
765 for dummy in header:
766 row = dummy.split(' ', 1)
767 headers[row[0][:-1]] = row[1]
768 body_size = headers['Content-Length']
769 rest_splitted = splitted[2:]
770 while (len(httpbody) < body_size) and rest_splitted:
771
772 httpbody = '\n\n'.join([httpbody, rest_splitted.pop(0)])
773 buffer_rest = "\n\n".join(rest_splitted)
774 return (statusline, headers, httpbody, buffer_rest)
775
776
778 """
779 Class for BOSH HTTP connections. Slightly redefines HTTP transport by
780 calling bosh bodytag generating callback before putting data on wire
781 """
782
784 self.build_cb = build_cb
785
796