Package flumotion :: Package component :: Package misc :: Package httpserver :: Module httpserver
[hide private]

Source Code for Module flumotion.component.misc.httpserver.httpserver

  1  # -*- test-case-name: flumotion.test.test_component_httpserver -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  import os 
 23  import string 
 24  import time 
 25   
 26  from twisted.web import server, http 
 27  from twisted.web.resource import Resource 
 28  from twisted.internet import defer, reactor, error 
 29  from twisted.cred import credentials 
 30  from zope.interface import implements 
 31   
 32  from flumotion.common import log, messages, errors, netutils, interfaces 
 33  from flumotion.common.i18n import N_, gettexter 
 34  from flumotion.component import component 
 35  from flumotion.component.base import http as httpbase 
 36  from flumotion.component.component import moods 
 37  from flumotion.component.misc.httpserver import httpfile, localprovider 
 38  from flumotion.component.misc.httpserver import serverstats 
 39  from flumotion.component.misc.porter import porterclient 
 40  from flumotion.twisted import fdserver 
 41   
 42  __version__ = "$Rev: 8058 $" 
 43  T_ = gettexter() 
 44   
 45  UPTIME_UPDATE_INTERVAL = 5 
 46   
 47   
48 -class CancellableRequest(server.Request):
49
50 - def __init__(self, channel, queued):
51 server.Request.__init__(self, channel, queued) 52 now = time.time() 53 self.lastTimeWritten = now # Used by HTTPFileStreamer for timeout 54 # we index some things by the fd, so we need to store it so we 55 # can still use it (in the connectionLost() handler and in 56 # finish()) after transport's fd has been closed 57 self.fd = self.transport.fileno() 58 59 self._component = channel.factory.component 60 self._transfer = None 61 self._provider = None 62 self._startTime = now 63 self._completionTime = None 64 self._rangeFirstByte = None 65 self._rangeLastByte = None 66 self._resourceSize = None 67 self._bytesWritten = 0L 68 69 # Create the request statistic handler 70 self.stats = serverstats.RequestStatistics(self._component.stats) 71 72 self._component.requestStarted(self)
73
74 - def setResponseRange(self, first, last, size):
75 self._rangeFirstByte = first 76 self._rangeLastByte = last 77 self._resourceSize = size
78
79 - def write(self, data):
80 server.Request.write(self, data) 81 size = len(data) 82 self._bytesWritten += size 83 self.lastTimeWritten = time.time() 84 # Update statistics 85 self.stats.onDataSent(size)
86
87 - def finish(self):
88 # it can happen that this method will be called with the 89 # transport's fd already closed (if the connection is lost 90 # early in the request handling) 91 server.Request.finish(self) 92 # We sent Connection: close, so we must close the connection 93 self.transport.loseConnection() 94 self.requestCompleted(self.fd)
95
96 - def connectionLost(self, reason):
97 # this is called _after_ the self.transport.fileno() is not 98 # valid anymore, so we use the stored fd number 99 server.Request.connectionLost(self, reason) 100 self.requestCompleted(self.fd)
101
102 - def requestCompleted(self, fd):
103 if self._completionTime is None: 104 self._completionTime = time.time() 105 # Update statistics 106 self.stats.onCompleted(self._resourceSize) 107 duration = self._completionTime - self._startTime 108 self._component.requestFinished(self, self.stats.bytesSent, 109 duration, fd)
110
111 - def getLogFields(self):
112 headers = self.getAllHeaders() 113 duration = (self._completionTime or time.time()) - self._startTime 114 requestFields = {'ip': self.getClientIP(), 115 'method': self.method, 116 'uri': self.uri, 117 'get-parameters': self.args, 118 'clientproto': self.clientproto, 119 'response': self.code, 120 'bytes-sent': self._bytesWritten, 121 'referer': headers.get('referer', None), 122 'user-agent': headers.get('user-agent', None), 123 'time-connected': duration, 124 'resource-size': self._resourceSize, 125 'range-first': self._rangeFirstByte, 126 'range-last': self._rangeLastByte} 127 if self._provider: 128 # The request fields have higher priority than provider fields 129 providerFields = self._provider.getLogFields() 130 providerFields.update(requestFields) 131 requestFields = providerFields 132 return requestFields
133 134
135 -class Site(server.Site):
136 requestFactory = CancellableRequest 137
138 - def __init__(self, resource, component):
139 server.Site.__init__(self, resource) 140 141 self.component = component
142 143
144 -class StatisticsUpdater(object):
145 """ 146 I wrap a statistics ui state entry, to allow updates. 147 """ 148
149 - def __init__(self, state, key):
150 self._state = state 151 self._key = key
152
153 - def update(self, name, value):
154 if value != self._state.get(self._key).get(name, None): 155 self._state.setitem(self._key, name, value)
156 157
158 -class HTTPFileMedium(component.BaseComponentMedium):
159
160 - def __init__(self, comp):
161 """ 162 @type comp: L{HTTPFileStreamer} 163 """ 164 component.BaseComponentMedium.__init__(self, comp)
165
166 - def authenticate(self, bouncerName, keycard):
167 """ 168 @rtype: L{twisted.internet.defer.Deferred} firing a keycard or None. 169 """ 170 return self.callRemote('authenticate', bouncerName, keycard)
171
172 - def keepAlive(self, bouncerName, issuerName, ttl):
173 """ 174 @rtype: L{twisted.internet.defer.Deferred} 175 """ 176 return self.callRemote('keepAlive', bouncerName, issuerName, ttl)
177
178 - def removeKeycardId(self, bouncerName, keycardId):
179 """ 180 @rtype: L{twisted.internet.defer.Deferred} 181 """ 182 return self.callRemote('removeKeycardId', bouncerName, keycardId)
183
184 - def remote_expireKeycard(self, keycardId):
185 return self.comp.httpauth.expireKeycard(keycardId)
186
187 - def remote_expireKeycards(self, keycardId):
188 return self.comp.httpauth.expireKeycards(keycardId)
189
190 - def remote_getStreamData(self):
191 return self.comp.getStreamData()
192
193 - def remote_getLoadData(self):
194 return self.comp.getLoadData()
195
196 - def remote_updatePorterDetails(self, path, username, password):
197 return self.comp.updatePorterDetails(path, username, password)
198
199 - def remote_rotateLog(self):
200 return self.comp.rotateLog()
201 202
203 -class HTTPFileStreamer(component.BaseComponent, log.Loggable):
204 implements(interfaces.IStreamingComponent) 205 206 componentMediumClass = HTTPFileMedium 207 208 REQUEST_TIMEOUT = 30 # Time out requests after this many seconds of 209 # inactivity 210
211 - def init(self):
212 self.mountPoint = None 213 self.type = None 214 self.port = None 215 self.hostname = None 216 self.stats = None 217 self._rateControlPlug = None 218 self._fileProviderPlug = None 219 self._metadataProviderPlug = None 220 self._loggers = [] 221 self._requestModifiers = [] 222 self._logfilter = None 223 self.httpauth = None 224 self._startTime = time.time() 225 self._uptimeCallId = None 226 227 self._description = 'On-Demand Flumotion Stream' 228 229 self._singleFile = False 230 self._connected_clients = {} # fd -> CancellableRequest 231 self._total_bytes_written = 0 232 233 self._pbclient = None 234 235 self._twistedPort = None 236 self._timeoutRequestsCallLater = None 237 238 self._pendingDisconnects = {} 239 self._rootResource = None 240 241 # FIXME: maybe we want to allow the configuration to specify 242 # additional mime -> File class mapping ? 243 self._mimeToResource = { 244 'video/x-flv': httpfile.FLVFile, 245 'video/mp4': httpfile.MP4File, 246 } 247 248 self.uiState.addKey('stream-url', None) 249 self.uiState.addKey('server-uptime', 0) 250 self.uiState.addKey('file-provider', None) 251 self.uiState.addDictKey('request-statistics') 252 self.uiState.addDictKey('provider-statistics')
253
254 - def do_check(self):
255 props = self.config['properties'] 256 self.fixRenamedProperties(props, [ 257 ('issuer', 'issuer-class'), 258 ('porter_socket_path', 'porter-socket-path'), 259 ('porter_username', 'porter-username'), 260 ('porter_password', 'porter-password'), 261 ('mount_point', 'mount-point')]) 262 263 if props.get('type', 'master') == 'slave': 264 for k in 'socket-path', 'username', 'password': 265 if not 'porter-' + k in props: 266 msg = 'slave mode, missing required property porter-%s' % k 267 return defer.fail(errors.ConfigError(msg)) 268 269 path = props.get('path', None) 270 if path is None: 271 return 272 if os.path.isfile(path): 273 self._singleFile = True 274 elif os.path.isdir(path): 275 self._singleFile = False 276 else: 277 msg = "the file or directory specified in 'path': %s does " \ 278 "not exist or is neither a file nor directory" % path 279 return defer.fail(errors.ConfigError(msg))
280
281 - def have_properties(self, props):
282 desc = props.get('description', None) 283 if desc: 284 self._description = desc 285 286 # always make sure the mount point starts with / 287 mountPoint = props.get('mount-point', '/') 288 if not mountPoint.startswith('/'): 289 mountPoint = '/' + mountPoint 290 self.mountPoint = mountPoint 291 self.hostname = props.get('hostname', None) 292 if not self.hostname: 293 self.hostname = netutils.guess_public_hostname() 294 295 self.type = props.get('type', 'master') 296 self.port = props.get('port', 8801) 297 if self.type == 'slave': 298 # already checked for these in do_check 299 self._porterPath = props['porter-socket-path'] 300 self._porterUsername = props['porter-username'] 301 self._porterPassword = props['porter-password'] 302 socket = 'flumotion.component.plugs.request.RequestLoggerPlug' 303 self._loggers = self.plugs.get(socket, []) 304 socket = \ 305 'flumotion.component.plugs.requestmodifier.RequestModifierPlug' 306 self._requestModifiers = self.plugs.get(socket, []) 307 308 self.httpauth = httpbase.HTTPAuthentication(self) 309 if 'avatarId' in self.config: 310 self.httpauth.setRequesterId(self.config['avatarId']) 311 if 'bouncer' in props: 312 self.httpauth.setBouncerName(props['bouncer']) 313 if 'issuer-class' in props: 314 self.httpauth.setIssuerClass(props['issuer-class']) 315 if 'ip-filter' in props: 316 logFilter = http.LogFilter() 317 for f in props['ip-filter']: 318 logFilter.addIPFilter(f) 319 self._logfilter = logFilter 320 socket = \ 321 'flumotion.component.misc.httpserver.ratecontrol.RateControllerPlug' 322 plugs = self.plugs.get(socket, []) 323 if plugs: 324 # Rate controller factory plug; only one supported. 325 path = props.get('path') 326 self._rateControlPlug = self.plugs[socket][-1] 327 328 socket = \ 329 'flumotion.component.misc.httpserver.fileprovider.FileProviderPlug' 330 plugs = self.plugs.get(socket, []) 331 if plugs: 332 # FileProvider factory plug; only one supported. 333 self._fileProviderPlug = plugs[-1] 334 else: 335 # Create a default local provider using path property 336 # Delegate the property checks to the plug 337 plugProps = {"properties": {"path": props.get('path', None)}} 338 self._fileProviderPlug = localprovider.FileProviderLocalPlug( 339 plugProps) 340 341 socket = ('flumotion.component.misc.httpserver' 342 '.metadataprovider.MetadataProviderPlug') 343 plugs = self.plugs.get(socket, []) 344 if plugs: 345 self._metadataProviderPlug = plugs[-1] 346 347 # Update uiState 348 self.uiState.set('stream-url', self.getUrl())
349
350 - def do_setup(self):
351 self.have_properties(self.config['properties']) 352 353 root = self._rootResource 354 if root is None: 355 root = self._getDefaultRootResource() 356 357 if root is None: 358 raise errors.WrongStateError( 359 "a resource or path property must be set") 360 361 site = Site(root, self) 362 self._timeoutRequestsCallLater = reactor.callLater( 363 self.REQUEST_TIMEOUT, self._timeoutRequests) 364 365 # Create statistics handler and start updating ui state 366 self.stats = serverstats.ServerStatistics() 367 updater = StatisticsUpdater(self.uiState, "request-statistics") 368 self.stats.startUpdates(updater) 369 updater = StatisticsUpdater(self.uiState, "provider-statistics") 370 self._fileProviderPlug.startStatsUpdates(updater) 371 self._updateUptime() 372 373 d = defer.Deferred() 374 if self.type == 'slave': 375 # Streamer is slaved to a porter. 376 if self._singleFile: 377 self._pbclient = porterclient.HTTPPorterClientFactory( 378 site, [self.mountPoint], d) 379 else: 380 self._pbclient = porterclient.HTTPPorterClientFactory( 381 site, [], d, 382 prefixes=[self.mountPoint]) 383 creds = credentials.UsernamePassword(self._porterUsername, 384 self._porterPassword) 385 self._pbclient.startLogin(creds, self._pbclient.medium) 386 self.debug("Starting porter login!") 387 # This will eventually cause d to fire 388 reactor.connectWith(fdserver.FDConnector, self._porterPath, 389 self._pbclient, 10, checkPID=False) 390 else: 391 # File Streamer is standalone. 392 try: 393 self.debug('Going to listen on port %d' % self.port) 394 iface = "" 395 # we could be listening on port 0, in which case we need 396 # to figure out the actual port we listen on 397 self._twistedPort = reactor.listenTCP(self.port, 398 site, interface=iface) 399 self.port = self._twistedPort.getHost().port 400 self.debug('Listening on port %d' % self.port) 401 except error.CannotListenError: 402 t = 'Port %d is not available.' % self.port 403 self.warning(t) 404 m = messages.Error(T_(N_( 405 "Network error: TCP port %d is not available."), 406 self.port)) 407 self.addMessage(m) 408 self.setMood(moods.sad) 409 return defer.fail(errors.ComponentSetupHandledError(t)) 410 # fire callback so component gets happy 411 d.callback(None) 412 # we are responsible for setting component happy 413 414 def setComponentHappy(result): 415 self.httpauth.scheduleKeepAlive() 416 self.setMood(moods.happy) 417 return result
418 d.addCallback(setComponentHappy) 419 return d
420
421 - def do_stop(self):
422 if self.stats: 423 self.stats.stopUpdates() 424 if self._fileProviderPlug: 425 self._fileProviderPlug.stopStatsUpdates() 426 if self.httpauth: 427 self.httpauth.stopKeepAlive() 428 if self._timeoutRequestsCallLater: 429 self._timeoutRequestsCallLater.cancel() 430 self._timeoutRequestsCallLater = None 431 if self._uptimeCallId: 432 self._uptimeCallId.cancel() 433 self._uptimeCallId = None 434 if self._twistedPort: 435 self._twistedPort.stopListening() 436 437 l = [self.remove_all_clients()] 438 if self.type == 'slave' and self._pbclient: 439 if self._singleFile: 440 l.append(self._pbclient.deregisterPath(self.mountPoint)) 441 else: 442 l.append(self._pbclient.deregisterPrefix(self.mountPoint)) 443 return defer.DeferredList(l)
444
445 - def updatePorterDetails(self, path, username, password):
446 """ 447 Provide a new set of porter login information, for when we're in slave 448 mode and the porter changes. 449 If we're currently connected, this won't disconnect - it'll just change 450 the information so that next time we try and connect we'll use the 451 new ones 452 @param path: new path 453 @param username: new username 454 @param password: new password 455 """ 456 if self.type != 'slave': 457 raise errors.WrongStateError( 458 "Can't specify porter details in master mode") 459 460 self._porterUsername = username 461 self._porterPassword = password 462 463 creds = credentials.UsernamePassword(self._porterUsername, 464 self._porterPassword) 465 self._pbclient.startLogin(creds, self.medium) 466 467 self._updatePath(path)
468
469 - def _updatePath(self, path):
470 # If we've changed paths, we must do some extra work. 471 if path == self._porterPath: 472 return 473 self._porterPath = path 474 475 # Stop trying to connect with the old connector. 476 self._pbclient.stopTrying() 477 478 self._pbclient.resetDelay() 479 reactor.connectWith(fdserver.FDConnector, self._porterPath, 480 self._pbclient, 10, checkPID=False)
481
482 - def _timeoutRequests(self):
483 self._timeoutRequestsCallLater = None 484 485 now = time.time() 486 for request in self._connected_clients.values(): 487 if now - request.lastTimeWritten > self.REQUEST_TIMEOUT: 488 self.debug("Timing out connection on request for [fd %5d]", 489 request.fd) 490 # Apparently this is private API. However, calling 491 # loseConnection is not sufficient - it won't drop the 492 # connection until the send queue is empty, which might never 493 # happen for an uncooperative client 494 request.channel.transport.connectionLost( 495 errors.TimeoutException()) 496 497 # FIXME: ideally, we shouldn't create another callLater if the 498 # component is shutting down, to leave the environment clean 499 # and tidy (right now, let's hope the process will be stopped 500 # eventually anyway) 501 self._timeoutRequestsCallLater = reactor.callLater( 502 self.REQUEST_TIMEOUT, self._timeoutRequests)
503
504 - def _getDefaultRootResource(self):
505 node = self._fileProviderPlug.getRootPath() 506 if node is None: 507 return None 508 509 self.debug('Starting with mount point "%s"' % self.mountPoint) 510 factory = httpfile.MimedFileFactory(self.httpauth, 511 mimeToResource=self._mimeToResource, 512 rateController=self._rateControlPlug, 513 requestModifiers=self._requestModifiers, 514 metadataProvider=self._metadataProviderPlug) 515 516 root = factory.create(node) 517 if self.mountPoint != '/': 518 root = self._createRootResourceForPath(self.mountPoint, root) 519 520 return root
521
522 - def _createRootResourceForPath(self, path, fileResource):
523 if path.endswith('/'): 524 path = path[:-1] 525 526 root = Resource() 527 children = string.split(path[1:], '/') 528 parent = root 529 for child in children[:-1]: 530 resource = Resource() 531 self.debug("Putting Resource at %s", child) 532 parent.putChild(child, resource) 533 parent = resource 534 self.debug("Putting resource %r at %r", fileResource, children[-1]) 535 parent.putChild(children[-1], fileResource) 536 return root
537
538 - def remove_client(self, fd):
539 """ 540 Remove a client when requested. 541 542 Used by keycard expiry. 543 """ 544 if fd in self._connected_clients: 545 request = self._connected_clients[fd] 546 self.debug("Removing client for fd %d", fd) 547 request.unregisterProducer() 548 request.channel.transport.loseConnection() 549 else: 550 self.debug("No client with fd %d found", fd)
551
552 - def remove_all_clients(self):
553 l = [] 554 for fd in self._connected_clients: 555 d = defer.Deferred() 556 self._pendingDisconnects[fd] = d 557 l.append(d) 558 559 request = self._connected_clients[fd] 560 request.unregisterProducer() 561 request.channel.transport.loseConnection() 562 563 self.debug("Waiting for %d clients to finish", len(l)) 564 return defer.DeferredList(l)
565
566 - def requestStarted(self, request):
567 # request does not yet have proto and uri 568 fd = request.transport.fileno() # ugly! 569 self._connected_clients[fd] = request 570 self.debug("[fd %5d] (ts %f) request %r started", 571 fd, time.time(), request)
572
573 - def requestFinished(self, request, bytesWritten, timeConnected, fd):
574 575 # PROBE: finishing request; see httpstreamer.resources 576 self.debug('[fd %5d] (ts %f) finishing request %r', 577 request.transport.fileno(), time.time(), request) 578 579 self.httpauth.cleanupAuth(fd) 580 ip = request.getClientIP() 581 if not self._logfilter or not self._logfilter.isInRange(ip): 582 fields = request.getLogFields() 583 fields.update({'time': time.gmtime(), 584 'username': '-'}) # FIXME: put the httpauth name 585 l = [] 586 for logger in self._loggers: 587 l.append(defer.maybeDeferred( 588 logger.event, 'http_session_completed', fields)) 589 d = defer.DeferredList(l) 590 else: 591 d = defer.succeed(None) 592 593 del self._connected_clients[fd] 594 595 self._total_bytes_written += bytesWritten 596 597 def firePendingDisconnect(_): 598 self.debug("Logging completed") 599 if fd in self._pendingDisconnects: 600 pending = self._pendingDisconnects.pop(fd) 601 self.debug("Firing pending disconnect deferred") 602 pending.callback(None) 603 604 # PROBE: finished request; see httpstreamer.resources 605 self.debug('[fd %5d] (ts %f) finished request %r', 606 fd, time.time(), request)
607 608 d.addCallback(firePendingDisconnect) 609
610 - def getDescription(self):
611 return self._description
612
613 - def getUrl(self):
614 return "http://%s:%d%s" % (self.hostname, self.port, self.mountPoint)
615
616 - def getStreamData(self):
617 socket = 'flumotion.component.plugs.streamdata.StreamDataProviderPlug' 618 if self.plugs[socket]: 619 plug = self.plugs[socket][-1] 620 return plug.getStreamData() 621 else: 622 return {'protocol': 'HTTP', 623 'description': self._description, 624 'url': self.getUrl()}
625
626 - def getClients(self):
627 """ 628 Return the number of connected clients 629 """ 630 return len(self._connected_clients)
631
632 - def getBytesSent(self):
633 """ 634 Current Bandwidth 635 """ 636 bytesTransferred = self._total_bytes_written 637 for request in self._connected_clients.values(): 638 if request._transfer: 639 bytesTransferred += request._transfer.bytesWritten 640 return bytesTransferred
641
642 - def getLoadData(self):
643 """ 644 Return a tuple (deltaadded, deltaremoved, bytes_transferred, 645 current_clients, current_load) of our current bandwidth and 646 user values. The deltas and current_load are NOT currently 647 implemented here, we set them as zero. 648 """ 649 return (0, 0, self.getBytesSent(), self.getClients(), 0)
650
651 - def rotateLog(self):
652 """ 653 Close the logfile, then reopen using the previous logfilename 654 """ 655 for logger in self._loggers: 656 self.debug('rotating logger %r' % logger) 657 logger.rotate()
658
659 - def setRootResource(self, resource):
660 """Attaches a root resource to this component. The root resource is the 661 once which will be used when accessing the mount point. 662 This is normally called from a plugs start() method. 663 @param resource: root resource 664 @type resource: L{twisted.web.resource.Resource} 665 """ 666 rootResource = self._createRootResourceForPath( 667 self.getMountPoint(), resource) 668 669 self._rootResource = rootResource
670
671 - def getMountPoint(self):
672 """Get the mount point of this component 673 @returns: the mount point 674 """ 675 # This is called early, before do_setup() 676 return self.config['properties'].get('mount-point')
677
678 - def _updateUptime(self):
679 uptime = time.time() - self._startTime 680 self.uiState.set("server-uptime", uptime) 681 self._uptimeCallId = reactor.callLater(UPTIME_UPDATE_INTERVAL, 682 self._updateUptime)
683