Package nbxmpp :: Module transports_nb
[hide private]
[frames] | no frames]

Source Code for Module nbxmpp.transports_nb

  1  ##   transports_nb.py 
  2  ##       based on transports.py 
  3  ## 
  4  ##   Copyright (C) 2003-2004 Alexey "Snake" Nezhdanov 
  5  ##       modified by Dimitur Kirov <dkirov@gmail.com> 
  6  ##       modified by Tomas Karasek <tom.to.the.k@gmail.com> 
  7  ## 
  8  ##   This program is free software; you can redistribute it and/or modify 
  9  ##   it under the terms of the GNU General Public License as published by 
 10  ##   the Free Software Foundation; either version 2, or (at your option) 
 11  ##   any later version. 
 12  ## 
 13  ##   This program is distributed in the hope that it will be useful, 
 14  ##   but WITHOUT ANY WARRANTY; without even the implied warranty of 
 15  ##   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 16  ##   GNU General Public License for more details. 
 17   
 18  """ 
 19  Transports are objects responsible for connecting to XMPP server and putting 
 20  data to wrapped sockets in in desired form (SSL, TLS, TCP, for HTTP proxy, 
 21  for SOCKS5 proxy...) 
 22   
 23  Transports are not aware of XMPP stanzas and only responsible for low-level 
 24  connection handling. 
 25  """ 
 26   
 27  from simplexml import ustr 
 28  from plugin import PlugIn 
 29  from idlequeue import IdleObject 
 30  import proxy_connectors 
 31  import tls_nb 
 32   
 33  import socket 
 34  import errno 
 35  import time 
 36  import traceback 
 37  import base64 
 38  import urlparse 
 39   
 40  import logging 
 41  log = logging.getLogger('nbxmpp.transports_nb') 
 42   
43 -def urisplit(uri):
44 """ 45 Function for splitting URI string to tuple (protocol, host, port, path). 46 e.g. urisplit('http://httpcm.jabber.org:123/webclient') returns ('http', 47 'httpcm.jabber.org', 123, '/webclient') return 443 as default port if proto 48 is https else 80 49 """ 50 splitted = urlparse.urlsplit(uri) 51 proto, host, path = splitted.scheme, splitted.hostname, splitted.path 52 try: 53 port = splitted.port 54 except ValueError: 55 log.warn('port cannot be extracted from BOSH URL %s, using default port' \ 56 % uri) 57 port = '' 58 if not port: 59 if proto == 'https': 60 port = 443 61 else: 62 port = 80 63 return proto, host, port, path
64
65 -def get_proxy_data_from_dict(proxy):
66 tcp_host, tcp_port, proxy_user, proxy_pass = None, None, None, None 67 proxy_type = proxy['type'] 68 if proxy_type == 'bosh' and not proxy['bosh_useproxy']: 69 # with BOSH not over proxy we have to parse the hostname from BOSH URI 70 proto, tcp_host, tcp_port, path = urisplit(proxy['bosh_uri']) 71 else: 72 # with proxy!=bosh or with bosh over HTTP proxy we're connecting to proxy 73 # machine 74 tcp_host, tcp_port = proxy['host'], proxy['port'] 75 if proxy.get('useauth', False): 76 proxy_user, proxy_pass = proxy['user'], proxy['pass'] 77 return tcp_host, tcp_port, proxy_user, proxy_pass
78 79 #: timeout to connect to the server socket, it doesn't include auth 80 CONNECT_TIMEOUT_SECONDS = 30 81 82 #: how long to wait for a disconnect to complete 83 DISCONNECT_TIMEOUT_SECONDS = 5 84 85 #: size of the buffer which reads data from server 86 # if lower, more stanzas will be fragmented and processed twice 87 RECV_BUFSIZE = 32768 # 2x maximum size of ssl packet, should be plenty 88 # it's inefficient but should work. Problem is that connect machine makes wrong 89 # assumptions and that we only check for pending data in sockets but not in SSL 90 # buffer... 91 92 DATA_RECEIVED = 'DATA RECEIVED' 93 DATA_SENT = 'DATA SENT' 94 DATA_ERROR = 'DATA ERROR' 95 96 DISCONNECTED = 'DISCONNECTED' 97 DISCONNECTING = 'DISCONNECTING' 98 CONNECTING = 'CONNECTING' 99 PROXY_CONNECTING = 'PROXY_CONNECTING' 100 CONNECTED = 'CONNECTED' 101 STATES = (DISCONNECTED, CONNECTING, PROXY_CONNECTING, CONNECTED, DISCONNECTING) 102
103 -class NonBlockingTransport(PlugIn):
104 """ 105 Abstract class representing a transport 106 107 Subclasses CAN have different constructor signature but connect method SHOULD 108 be the same. 109 """ 110
111 - def __init__(self, raise_event, on_disconnect, idlequeue, estabilish_tls, 112 certs):
113 """ 114 Each trasport class can have different constructor but it has to have at 115 least all the arguments of NonBlockingTransport constructor 116 117 :param raise_event: callback for monitoring of sent and received data 118 :param on_disconnect: callback called on disconnection during runtime 119 :param idlequeue: processing idlequeue 120 :param estabilish_tls: boolean whether to estabilish TLS connection after 121 TCP connection is done 122 :param certs: tuple of (cacerts, mycerts) see constructor of 123 tls_nb.NonBlockingTLS for more details 124 """ 125 PlugIn.__init__(self) 126 self.raise_event = raise_event 127 self.on_disconnect = on_disconnect 128 self.on_connect = None 129 self.on_connect_failure = None 130 self.idlequeue = idlequeue 131 self.on_receive = None 132 self.server = None 133 self.port = None 134 self.conn_5tuple = None 135 self.set_state(DISCONNECTED) 136 self.estabilish_tls = estabilish_tls 137 self.certs = certs 138 # type of used ssl lib (if any) will be assigned to this member var 139 self.ssl_lib = None 140 self._exported_methods=[self.onreceive, self.set_send_timeout, 141 self.set_send_timeout2, self.set_timeout, self.remove_timeout, 142 self.start_disconnect] 143 144 # time to wait for SOME stanza to come and then send keepalive 145 self.sendtimeout = 0 146 147 # in case we want to something different than sending keepalives 148 self.on_timeout = None 149 self.on_timeout2 = None
150
151 - def plugin(self, owner):
152 owner.Connection = self
153
154 - def plugout(self):
155 self._owner.Connection = None 156 self._owner = None 157 self.disconnect(do_callback=False)
158
159 - def connect(self, conn_5tuple, on_connect, on_connect_failure):
160 """ 161 Creates and connects transport to server and port defined in conn_5tuple 162 which should be item from list returned from getaddrinfo 163 164 :param conn_5tuple: 5-tuple returned from getaddrinfo 165 :param on_connect: callback called on successful connect to the server 166 :param on_connect_failure: callback called on failure when connecting 167 """ 168 self.on_connect = on_connect 169 self.on_connect_failure = on_connect_failure 170 self.server, self.port = conn_5tuple[4][:2] 171 self.conn_5tuple = conn_5tuple
172
173 - def set_state(self, newstate):
174 assert(newstate in STATES) 175 self.state = newstate
176
177 - def get_state(self):
178 return self.state
179
180 - def _on_connect(self):
181 """ 182 Preceeds call of on_connect callback 183 """ 184 # data is reference to socket wrapper instance. We don't need it in client 185 # because 186 self.set_state(CONNECTED) 187 self.on_connect()
188
189 - def _on_connect_failure(self, err_message):
190 """ 191 Preceeds call of on_connect_failure callback 192 """ 193 # In case of error while connecting we need to disconnect transport 194 # but we don't want to call DisconnectHandlers from client, 195 # thus the do_callback=False 196 self.disconnect(do_callback=False) 197 self.on_connect_failure(err_message=err_message)
198
199 - def send(self, raw_data, now=False):
200 if self.get_state() == DISCONNECTED: 201 log.error('Unable to send %s \n because state is %s.' % 202 (raw_data, self.get_state()))
203
204 - def disconnect(self, do_callback=True):
205 self.set_state(DISCONNECTED) 206 if do_callback: 207 # invoke callback given in __init__ 208 self.on_disconnect()
209
210 - def onreceive(self, recv_handler):
211 """ 212 Set the on_receive callback. 213 214 onreceive(None) sets callback to Dispatcher.ProcessNonBlocking which is 215 the default one that will decide what to do with received stanza based on 216 its tag name and namespace. 217 218 Do not confuse it with on_receive() method, which is the callback 219 itself. 220 """ 221 if not recv_handler: 222 if hasattr(self, '_owner') and hasattr(self._owner, 'Dispatcher'): 223 self.on_receive = self._owner.Dispatcher.ProcessNonBlocking 224 else: 225 log.warn('No Dispatcher plugged. Received data will not be processed') 226 self.on_receive = None 227 return 228 self.on_receive = recv_handler
229
230 - def _tcp_connecting_started(self):
232
233 - def read_timeout(self):
234 """ 235 Called when there's no response from server in defined timeout 236 """ 237 if self.on_timeout: 238 self.on_timeout() 239 self.renew_send_timeout()
240
241 - def read_timeout2(self):
242 """ 243 called when there's no response from server in defined timeout 244 """ 245 if self.on_timeout2: 246 self.on_timeout2() 247 self.renew_send_timeout2()
248
249 - def renew_send_timeout(self):
250 if self.on_timeout and self.sendtimeout > 0: 251 self.set_timeout(self.sendtimeout)
252
253 - def renew_send_timeout2(self):
254 if self.on_timeout2 and self.sendtimeout2 > 0: 255 self.set_timeout2(self.sendtimeout2)
256
257 - def set_timeout(self, timeout):
258 self.idlequeue.set_read_timeout(self.fd, timeout)
259
260 - def set_timeout2(self, timeout2):
261 self.idlequeue.set_read_timeout(self.fd, timeout2, self.read_timeout2)
262
263 - def get_fd(self):
264 pass
265
266 - def remove_timeout(self):
267 self.idlequeue.remove_timeout(self.fd)
268
269 - def set_send_timeout(self, timeout, on_timeout):
270 self.sendtimeout = timeout 271 if self.sendtimeout > 0: 272 self.on_timeout = on_timeout 273 else: 274 self.on_timeout = None
275
276 - def set_send_timeout2(self, timeout2, on_timeout2):
277 self.sendtimeout2 = timeout2 278 if self.sendtimeout2 > 0: 279 self.on_timeout2 = on_timeout2 280 else: 281 self.on_timeout2 = None
282 283 # FIXME: where and why does this need to be called
284 - def start_disconnect(self):
286 287
288 -class NonBlockingTCP(NonBlockingTransport, IdleObject):
289 """ 290 Non-blocking TCP socket wrapper 291 292 It is used for simple XMPP connection. Can be connected via proxy and can 293 estabilish TLS connection. 294 """
295 - def __init__(self, raise_event, on_disconnect, idlequeue, estabilish_tls, 296 certs, proxy_dict=None):
297 """ 298 :param proxy_dict: dictionary with proxy data as loaded from config file 299 """ 300 NonBlockingTransport.__init__(self, raise_event, on_disconnect, idlequeue, 301 estabilish_tls, certs) 302 IdleObject.__init__(self) 303 304 # queue with messages to be send 305 self.sendqueue = [] 306 307 # bytes remained from the last send message 308 self.sendbuff = '' 309 310 self.proxy_dict = proxy_dict 311 self.on_remote_disconnect = self.disconnect 312 313 # ssl variables 314 self.ssl_fingerprint_sha1 = [] 315 self.ssl_certificate = [] 316 self.ssl_errnum = [] 317 self.ssl_cert_pem = []
318 319 # FIXME: transport should not be aware xmpp
320 - def start_disconnect(self):
321 NonBlockingTransport.start_disconnect(self) 322 self.send('</stream:stream>', now=True) 323 self.disconnect()
324
325 - def connect(self, conn_5tuple, on_connect, on_connect_failure):
326 NonBlockingTransport.connect(self, conn_5tuple, on_connect, 327 on_connect_failure) 328 log.info('NonBlockingTCP Connect :: About to connect to %s:%s' % 329 (self.server, self.port)) 330 331 try: 332 self._sock = socket.socket(*conn_5tuple[:3]) 333 except socket.error, (errnum, errstr): 334 self._on_connect_failure('NonBlockingTCP Connect: Error while creating\ 335 socket: %s %s' % (errnum, errstr)) 336 return 337 338 self._send = self._sock.send 339 self._recv = self._sock.recv 340 self.fd = self._sock.fileno() 341 342 # we want to be notified when send is possible to connected socket because 343 # it means the TCP connection is estabilished 344 self._plug_idle(writable=True, readable=False) 345 self.peerhost = None 346 347 # variable for errno symbol that will be found from exception raised 348 # from connect() 349 errnum = 0 350 errstr = str() 351 352 # set timeout for TCP connecting - if nonblocking connect() fails, pollend 353 # is called. If if succeeds pollout is called. 354 self.idlequeue.set_read_timeout(self.fd, CONNECT_TIMEOUT_SECONDS) 355 356 try: 357 self._sock.setblocking(False) 358 self._sock.connect((self.server, self.port)) 359 except Exception, exc: 360 if type(exc.args) == tuple: 361 errnum, errstr = exc.args 362 else: 363 errnum = 'unknown' 364 errstr = exc.args 365 366 if errnum in (errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK): 367 # connecting in progress 368 log.info('After NB connect() of %s. "%s" raised => CONNECTING' % 369 (id(self), errstr)) 370 self._tcp_connecting_started() 371 return 372 373 # if there was some other exception, call failure callback and unplug 374 # transport which will also remove read_timeouts for descriptor 375 self._on_connect_failure('Exception while connecting to %s:%s - %s %s' % 376 (self.server, self.port, errnum, errstr))
377
378 - def _connect_to_proxy(self):
379 self.set_state(PROXY_CONNECTING) 380 if self.proxy_dict['type'] == 'socks5': 381 proxyclass = proxy_connectors.SOCKS5Connector 382 elif self.proxy_dict['type'] == 'http' : 383 proxyclass = proxy_connectors.HTTPCONNECTConnector 384 proxyclass.get_instance( 385 send_method=self.send, 386 onreceive=self.onreceive, 387 old_on_receive=self.on_receive, 388 on_success=self._on_connect, 389 on_failure=self._on_connect_failure, 390 xmpp_server=self.proxy_dict['xmpp_server'], 391 proxy_creds=self.proxy_dict['credentials'])
392
393 - def _on_connect(self):
394 """ 395 Preceed invoking of on_connect callback. TCP connection is already 396 estabilished by this time 397 """ 398 if self.estabilish_tls: 399 self.tls_init( 400 on_succ = lambda: NonBlockingTransport._on_connect(self), 401 on_fail = lambda: self._on_connect_failure( 402 'error while estabilishing TLS')) 403 else: 404 NonBlockingTransport._on_connect(self)
405
406 - def tls_init(self, on_succ, on_fail):
407 """ 408 Estabilishes TLS/SSL using this TCP connection by plugging a 409 NonBlockingTLS module 410 """ 411 cacerts, mycerts = self.certs 412 result = tls_nb.NonBlockingTLS.get_instance(cacerts, mycerts).PlugIn(self) 413 if result: 414 on_succ() 415 else: 416 on_fail()
417
418 - def pollin(self):
419 """ 420 Called by idlequeu when receive on plugged socket is possible 421 """ 422 log.info('pollin called, state == %s' % self.get_state()) 423 self._do_receive()
424
425 - def pollout(self):
426 """ 427 Called by idlequeu when send to plugged socket is possible 428 """ 429 log.info('pollout called, state == %s' % self.get_state()) 430 431 if self.get_state() == CONNECTING: 432 log.info('%s socket wrapper connected' % id(self)) 433 self.idlequeue.remove_timeout(self.fd) 434 self._plug_idle(writable=False, readable=False) 435 self.peerhost = self._sock.getsockname() 436 if self.proxy_dict: 437 self._connect_to_proxy() 438 else: 439 self._on_connect() 440 else: 441 self._do_send()
442
443 - def pollend(self):
444 """ 445 Called by idlequeue on TCP connection errors 446 """ 447 log.info('pollend called, state == %s' % self.get_state()) 448 449 if self.get_state() == CONNECTING: 450 self._on_connect_failure('Error during connect to %s:%s' % 451 (self.server, self.port)) 452 else: 453 self.disconnect()
454
455 - def disconnect(self, do_callback=True):
456 if self.get_state() == DISCONNECTED: 457 return 458 self.set_state(DISCONNECTED) 459 self.idlequeue.unplug_idle(self.fd) 460 if 'NonBlockingTLS' in self.__dict__: 461 self.NonBlockingTLS.PlugOut() 462 try: 463 self._sock.shutdown(socket.SHUT_RDWR) 464 self._sock.close() 465 except socket.error, (errnum, errstr): 466 log.info('Error while disconnecting socket: %s' % errstr) 467 self.fd = -1 468 NonBlockingTransport.disconnect(self, do_callback)
469
470 - def read_timeout(self):
471 log.info('read_timeout called, state == %s' % self.get_state()) 472 if self.get_state() == CONNECTING: 473 # if read_timeout is called during connecting, connect() didn't end yet 474 # thus we have to call the tcp failure callback 475 self._on_connect_failure('Error during connect to %s:%s' % 476 (self.server, self.port)) 477 else: 478 NonBlockingTransport.read_timeout(self)
479
480 - def set_timeout(self, timeout):
481 if self.get_state() != DISCONNECTED and self.fd != -1: 482 NonBlockingTransport.set_timeout(self, timeout) 483 else: 484 log.warn('set_timeout: TIMEOUT NOT SET: state is %s, fd is %s' % 485 (self.get_state(), self.fd))
486
487 - def remove_timeout(self):
488 if self.fd: 489 NonBlockingTransport.remove_timeout(self) 490 else: 491 log.warn('remove_timeout: no self.fd state is %s' % self.get_state())
492
493 - def send(self, raw_data, now=False):
494 """ 495 Append raw_data to the queue of messages to be send. If supplied data is 496 unicode string, encode it to utf-8. 497 """ 498 NonBlockingTransport.send(self, raw_data, now) 499 500 r = self.encode_stanza(raw_data) 501 502 if now: 503 self.sendqueue.insert(0, r) 504 self._do_send() 505 else: 506 self.sendqueue.append(r) 507 508 self._plug_idle(writable=True, readable=True)
509
510 - def encode_stanza(self, stanza):
511 """ 512 Encode str or unicode to utf-8 513 """ 514 if isinstance(stanza, unicode): 515 stanza = stanza.encode('utf-8') 516 elif not isinstance(stanza, str): 517 stanza = ustr(stanza).encode('utf-8') 518 return stanza
519
520 - def _plug_idle(self, writable, readable):
521 """ 522 Plug file descriptor of socket to Idlequeue 523 524 Plugged socket will be watched for "send possible" or/and "recv possible" 525 events. pollin() callback is invoked on "recv possible", pollout() on 526 "send_possible". 527 528 Plugged socket will always be watched for "error" event - in that case, 529 pollend() is called. 530 """ 531 log.info('Plugging fd %d, W:%s, R:%s' % (self.fd, writable, readable)) 532 self.idlequeue.plug_idle(self, writable, readable)
533
534 - def _do_send(self):
535 """ 536 Called when send() to connected socket is possible. First message from 537 sendqueue will be sent 538 """ 539 if not self.sendbuff: 540 if not self.sendqueue: 541 log.warn('calling send on empty buffer and queue') 542 self._plug_idle(writable=False, readable=True) 543 return None 544 self.sendbuff = self.sendqueue.pop(0) 545 try: 546 send_count = self._send(self.sendbuff) 547 if send_count: 548 sent_data = self.sendbuff[:send_count] 549 self.sendbuff = self.sendbuff[send_count:] 550 self._plug_idle( 551 writable=((self.sendqueue!=[]) or (self.sendbuff!='')), 552 readable=True) 553 self.raise_event(DATA_SENT, sent_data) 554 555 except Exception: 556 log.error('_do_send:', exc_info=True) 557 traceback.print_exc() 558 self.disconnect()
559
560 - def _do_receive(self):
561 """ 562 Reads all pending incoming data. Will call owner's disconnected() method 563 if appropriate 564 """ 565 received = None 566 errnum = 0 567 errstr = 'No Error Set' 568 569 try: 570 # get as many bites, as possible, but not more than RECV_BUFSIZE 571 received = self._recv(RECV_BUFSIZE) 572 except socket.error, (errnum, errstr): 573 log.info("_do_receive: got %s:" % received, exc_info=True) 574 except tls_nb.SSLWrapper.Error, e: 575 log.info("_do_receive, caught SSL error, got %s:" % received, 576 exc_info=True) 577 errnum, errstr = e.errno, e.strerror 578 579 if received == '': 580 errstr = 'zero bytes on recv' 581 582 if (self.ssl_lib is None and received == '') or \ 583 (self.ssl_lib == tls_nb.PYSTDLIB and errnum == 8 ) or \ 584 (self.ssl_lib == tls_nb.PYOPENSSL and errnum == -1 ): 585 # 8 in stdlib: errstr == EOF occured in violation of protocol 586 # -1 in pyopenssl: errstr == Unexpected EOF 587 log.info("Disconnected by remote server: #%s, %s" % (errnum, errstr)) 588 self.on_remote_disconnect() 589 return 590 591 if errnum: 592 log.info("Connection to %s:%s lost: %s %s" % (self.server, self.port, 593 errnum, errstr), exc_info=True) 594 self.disconnect() 595 return 596 597 # this branch is for case of non-fatal SSL errors - None is returned from 598 # recv() but no errnum is set 599 if received is None: 600 return 601 602 # we have received some bytes, stop the timeout! 603 self.remove_timeout() 604 self.renew_send_timeout() 605 self.renew_send_timeout2() 606 # pass received data to owner 607 if self.on_receive: 608 self.raise_event(DATA_RECEIVED, received) 609 self._on_receive(received) 610 else: 611 # This should never happen, so we need the debug. 612 # (If there is no handler on receive specified, data is passed to 613 # Dispatcher.ProcessNonBlocking) 614 log.error('SOCKET %s Unhandled data received: %s' % (id(self), 615 received)) 616 self.disconnect()
617
618 - def _on_receive(self, data):
619 """ 620 Preceeds on_receive callback. It peels off and checks HTTP headers in 621 HTTP classes, in here it just calls the callback 622 """ 623 self.on_receive(data)
624 625
626 -class NonBlockingHTTP(NonBlockingTCP):
627 """ 628 Socket wrapper that creates HTTP message out of sent data and peels-off HTTP 629 headers from incoming messages 630 """ 631
632 - def __init__(self, raise_event, on_disconnect, idlequeue, estabilish_tls, 633 certs, on_http_request_possible, on_persistent_fallback, http_dict, 634 proxy_dict=None):
635 """ 636 :param on_http_request_possible: method to call when HTTP request to 637 socket owned by transport is possible. 638 :param on_persistent_fallback: callback called when server ends TCP 639 connection. It doesn't have to be fatal for HTTP session. 640 :param http_dict: dictionary with data for HTTP request and headers 641 """ 642 NonBlockingTCP.__init__(self, raise_event, on_disconnect, idlequeue, 643 estabilish_tls, certs, proxy_dict) 644 645 self.http_protocol, self.http_host, self.http_port, self.http_path = \ 646 urisplit(http_dict['http_uri']) 647 self.http_protocol = self.http_protocol or 'http' 648 self.http_path = self.http_path or '/' 649 self.http_version = http_dict['http_version'] 650 self.http_persistent = http_dict['http_persistent'] 651 self.add_proxy_headers = http_dict['add_proxy_headers'] 652 653 if 'proxy_user' in http_dict and 'proxy_pass' in http_dict: 654 self.proxy_user, self.proxy_pass = http_dict['proxy_user'], http_dict[ 655 'proxy_pass'] 656 else: 657 self.proxy_user, self.proxy_pass = None, None 658 659 # buffer for partial responses 660 self.recvbuff = '' 661 self.expected_length = 0 662 self.pending_requests = 0 663 self.on_http_request_possible = on_http_request_possible 664 self.last_recv_time = 0 665 self.close_current_connection = False 666 self.on_remote_disconnect = lambda: on_persistent_fallback(self)
667
668 - def http_send(self, raw_data, now=False):
669 self.send(self.build_http_message(raw_data), now)
670
671 - def _on_receive(self, data):
672 """ 673 Preceeds passing received data to owner class. Gets rid of HTTP headers 674 and checks them. 675 """ 676 if self.get_state() == PROXY_CONNECTING: 677 NonBlockingTCP._on_receive(self, data) 678 return 679 680 # append currently received data to HTTP msg in buffer 681 self.recvbuff = '%s%s' % (self.recvbuff or '', data) 682 statusline, headers, httpbody, buffer_rest = self.parse_http_message( 683 self.recvbuff) 684 685 if not (statusline and headers and httpbody): 686 log.debug('Received incomplete HTTP response') 687 return 688 689 if statusline[1] != '200': 690 log.error('HTTP Error: %s %s' % (statusline[1], statusline[2])) 691 self.disconnect() 692 return 693 self.expected_length = int(headers['Content-Length']) 694 if 'Connection' in headers and headers['Connection'].strip()=='close': 695 self.close_current_connection = True 696 697 if self.expected_length > len(httpbody): 698 # If we haven't received the whole HTTP mess yet, let's end the thread. 699 # It will be finnished from one of following recvs on plugged socket. 700 log.info('not enough bytes in HTTP response - %d expected, got %d' % 701 (self.expected_length, len(httpbody))) 702 else: 703 # First part of buffer has been extraced and is going to be handled, 704 # remove it from buffer 705 self.recvbuff = buffer_rest 706 707 # everything was received 708 self.expected_length = 0 709 710 if not self.http_persistent or self.close_current_connection: 711 # not-persistent connections disconnect after response 712 self.disconnect(do_callback=False) 713 self.close_current_connection = False 714 self.last_recv_time = time.time() 715 self.on_receive(data=httpbody, socket=self) 716 self.on_http_request_possible()
717
718 - def build_http_message(self, httpbody, method='POST'):
719 """ 720 Builds http message with given body. Values for headers and status line 721 fields are taken from class variables 722 """ 723 absolute_uri = '%s://%s:%s%s' % (self.http_protocol, self.http_host, 724 self.http_port, self.http_path) 725 headers = ['%s %s %s' % (method, absolute_uri, self.http_version), 726 'Host: %s:%s' % (self.http_host, self.http_port), 727 'User-Agent: Gajim', 728 'Content-Type: text/xml; charset=utf-8', 729 'Content-Length: %s' % len(str(httpbody))] 730 if self.add_proxy_headers: 731 headers.append('Proxy-Connection: keep-alive') 732 headers.append('Pragma: no-cache') 733 if self.proxy_user and self.proxy_pass: 734 credentials = '%s:%s' % (self.proxy_user, self.proxy_pass) 735 credentials = base64.encodestring(credentials).strip() 736 headers.append('Proxy-Authorization: Basic %s' % credentials) 737 else: 738 headers.append('Connection: Keep-Alive') 739 headers.append('\r\n') 740 headers = '\r\n'.join(headers) 741 return('%s%s' % (headers, httpbody))
742
743 - def parse_http_message(self, message):
744 """ 745 Split http message into a tuple: 746 - (statusline - list of e.g. ['HTTP/1.1', '200', 'OK'], 747 - headers - dictionary of headers e.g. {'Content-Length': '604', 748 'Content-Type': 'text/xml; charset=utf-8'}, 749 - httpbody - string with http body) 750 - http_rest - what is left in the message after a full HTTP header + body 751 """ 752 splitted = message.split('\r\n\r\n') 753 if len(splitted) < 2: 754 # no complete http message. Keep filling the buffer until we find one 755 buffer_rest = message 756 return ('', '', '', buffer_rest) 757 else: 758 (header, httpbody) = splitted[:2] 759 header = header.replace('\r', '') 760 header = header.lstrip('\n') 761 header = header.split('\n') 762 statusline = header[0].split(' ', 2) 763 header = header[1:] 764 headers = {} 765 for dummy in header: 766 row = dummy.split(' ', 1) 767 headers[row[0][:-1]] = row[1] 768 body_size = headers['Content-Length'] 769 rest_splitted = splitted[2:] 770 while (len(httpbody) < body_size) and rest_splitted: 771 # Complete httpbody until it has the announced size 772 httpbody = '\n\n'.join([httpbody, rest_splitted.pop(0)]) 773 buffer_rest = "\n\n".join(rest_splitted) 774 return (statusline, headers, httpbody, buffer_rest)
775 776
777 -class NonBlockingHTTPBOSH(NonBlockingHTTP):
778 """ 779 Class for BOSH HTTP connections. Slightly redefines HTTP transport by 780 calling bosh bodytag generating callback before putting data on wire 781 """ 782
783 - def set_stanza_build_cb(self, build_cb):
784 self.build_cb = build_cb
785
786 - def _do_send(self):
787 if self.state == PROXY_CONNECTING: 788 NonBlockingTCP._do_send(self) 789 return 790 if not self.sendbuff: 791 stanza = self.build_cb(socket=self) 792 stanza = self.encode_stanza(stanza) 793 stanza = self.build_http_message(httpbody=stanza) 794 self.sendbuff = stanza 795 NonBlockingTCP._do_send(self)
796