Package flumotion :: Package component :: Package producers :: Package playlist :: Module playlistparser
[hide private]

Source Code for Module flumotion.component.producers.playlist.playlistparser

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  import gst 
 23  from gst.extend import discoverer 
 24   
 25  import time 
 26  import calendar 
 27  from StringIO import StringIO 
 28   
 29  from xml.dom import Node 
 30   
 31  from twisted.internet import reactor 
 32   
 33  from flumotion.common import log, fxml 
 34   
 35  __version__ = "$Rev: 7849 $" 
 36   
 37   
38 -class PlaylistItem(object, log.Loggable):
39
40 - def __init__(self, piid, timestamp, uri, offset, duration):
41 self.id = piid 42 self.timestamp = timestamp 43 self.uri = uri 44 self.offset = offset 45 self.duration = duration 46 47 self.hasAudio = True 48 self.hasVideo = True 49 50 self.next = None 51 self.prev = None
52 53
54 -class Playlist(object, log.Loggable):
55 logCategory = 'playlist-list' 56
57 - def __init__(self, producer):
58 """ 59 Create an initially empty playlist 60 """ 61 self.items = None # PlaylistItem linked list 62 self._itemsById = {} 63 64 self.producer = producer
65
66 - def _findItem(self, timePosition):
67 # timePosition is the position in terms of the clock time 68 # Get the item that corresponds to timePosition, or None 69 cur = self.items 70 while cur: 71 if cur.timestamp < timePosition and \ 72 cur.timestamp + cur.duration > timePosition: 73 return cur 74 if cur.timestamp > timePosition: 75 return None # fail without having to iterate over everything 76 cur = cur.next 77 return None
78
79 - def _getCurrentItem(self):
80 position = self.producer.pipeline.get_clock().get_time() 81 item = self._findItem(position) 82 self.debug("Item %r found as current for playback position %d", 83 item, position) 84 return item
85
86 - def removeItems(self, piid):
87 current = self._getCurrentItem() 88 89 if piid not in self._itemsById: 90 return 91 92 items = self._itemsById[piid] 93 for item in items: 94 self.debug("removeItems: item %r ts: %d", item, item.timestamp) 95 if current: 96 self.debug("current ts: %d current dur: %d", 97 current.timestamp, current.duration) 98 if (current and item.timestamp < current.timestamp + 99 current.duration): 100 self.debug("Not removing current item!") 101 continue 102 self.unlinkItem(item) 103 self.producer.unscheduleItem(item) 104 105 del self._itemsById[piid]
106
107 - def addItem(self, piid, timestamp, uri, offset, duration, 108 hasAudio, hasVideo):
109 """ 110 Add an item to the playlist. 111 112 This may remove overlapping entries, or adjust timestamps/durations of 113 entries to make the new one fit. 114 """ 115 current = self._getCurrentItem() 116 if current and timestamp < current.timestamp + current.duration: 117 self.warning("New object at uri %s starts during current object, " 118 "cannot add") 119 return None 120 # We don't care about anything older than now; drop references to them 121 if current: 122 self.items = current 123 124 newitem = PlaylistItem(piid, timestamp, uri, offset, duration) 125 newitem.hasAudio = hasAudio 126 newitem.hasVideo = hasVideo 127 128 if piid in self._itemsById: 129 self._itemsById[piid].append(newitem) 130 else: 131 self._itemsById[piid] = [newitem] 132 133 # prev starts strictly before the new item 134 # next starts after the new item, and ends after the 135 # end of the new item 136 prev = next = None 137 item = self.items 138 while item: 139 if item.timestamp < newitem.timestamp: 140 prev = item 141 else: 142 break 143 item = item.next 144 145 if prev: 146 item = prev.next 147 while item: 148 if (item.timestamp > newitem.timestamp and 149 item.timestamp + item.duration > 150 newitem.timestamp + newitem.duration): 151 next = item 152 break 153 item = item.next 154 155 if prev: 156 # Then things between prev and next (next might be None) are to be 157 # deleted. Do so. 158 cur = prev.next 159 while cur != next: 160 self._itemsById[cur.id].remove(cur) 161 if not self._itemsById[cur.id]: 162 del self._itemsById[cur.id] 163 self.producer.unscheduleItem(cur) 164 cur = cur.next 165 166 # update links. 167 if prev: 168 prev.next = newitem 169 newitem.prev = prev 170 else: 171 self.items = newitem 172 173 if next: 174 newitem.next = next 175 next.prev = newitem 176 177 # Duration adjustments -> Reflect into gnonlin timeline 178 if prev and prev.timestamp + prev.duration > newitem.timestamp: 179 self.debug("Changing duration of previous item from %d to %d", 180 prev.duration, newitem.timestamp - prev.timestamp) 181 prev.duration = newitem.timestamp - prev.timestamp 182 self.producer.adjustItemScheduling(prev) 183 184 if next and newitem.timestamp + newitem.duration > next.timestamp: 185 self.debug("Changing timestamp of next item from %d to %d to fit", 186 newitem.timestamp, newitem.timestamp + newitem.duration) 187 ts = newitem.timestamp + newitem.duration 188 duration = next.duration - (ts - next.timestamp) 189 next.duration = duration 190 next.timestamp = ts 191 self.producer.adjustItemScheduling(next) 192 193 # Then we need to actually add newitem into the gnonlin timeline 194 if not self.producer.scheduleItem(newitem): 195 self.debug("Failed to schedule item, unlinking") 196 # Failed to schedule it. 197 self.unlinkItem(newitem) 198 return None 199 200 return newitem
201
202 - def unlinkItem(self, item):
203 if item.prev: 204 item.prev.next = item.next 205 else: 206 self.items = item.next 207 208 if item.next: 209 item.next.prev = item.prev
210 211
212 -class PlaylistParser(object, log.Loggable):
213 logCategory = 'playlist-parse' 214
215 - def __init__(self, playlist):
216 self.playlist = playlist 217 218 self._pending_items = [] 219 self._discovering = False 220 self._discovering_blocked = 0 221 222 self._baseDirectory = None
223
224 - def setBaseDirectory(self, baseDir):
225 if not baseDir.endswith('/'): 226 baseDir = baseDir + '/' 227 self._baseDirectory = baseDir
228
229 - def blockDiscovery(self):
230 """ 231 Prevent playlist parser from running discoverer on any pending 232 playlist entries. Multiple subsequent invocations will require 233 the same corresponding number of calls to L{unblockDiscovery} 234 to resume discovery. 235 """ 236 self._discovering_blocked += 1 237 self.debug(' blocking discovery: %d' % self._discovering_blocked)
238
239 - def unblockDiscovery(self):
240 """ 241 Resume discovering of any pending playlist entries. If 242 L{blockDiscovery} was called multiple times multiple 243 invocations of unblockDiscovery will be required to unblock 244 the discoverer. 245 """ 246 if self._discovering_blocked > 0: 247 self._discovering_blocked -= 1 248 self.debug('unblocking discovery: %d' % self._discovering_blocked) 249 if self._discovering_blocked < 1: 250 self.startDiscovery()
251
252 - def startDiscovery(self, doSort=True):
253 """ 254 Initiate discovery of any pending playlist entries. 255 256 @param doSort: should the pending entries be ordered 257 chronologically before initiating discovery 258 @type doSort: bool 259 """ 260 self.log('startDiscovery: discovering: %s, block: %d, pending: %d' % 261 (self._discovering, self._discovering_blocked, 262 len(self._pending_items))) 263 if not self._discovering and self._discovering_blocked < 1 \ 264 and self._pending_items: 265 if doSort: 266 self._sortPending() 267 self._discoverPending()
268
269 - def _sortPending(self):
270 self.debug('sort pending: %d' % len(self._pending_items)) 271 if not self._pending_items: 272 return 273 sortlist = [(elt[1], elt) for elt in self._pending_items] 274 sortlist.sort() 275 self._pending_items = [elt for (ts, elt) in sortlist]
276
277 - def _discoverPending(self):
278 279 def _discovered(disc, is_media): 280 self.debug("Discovered! is media: %d mime type %s", is_media, 281 disc.mimetype) 282 reactor.callFromThread(_discoverer_done, disc, is_media)
283 284 def _discoverer_done(disc, is_media): 285 if is_media: 286 self.debug("Discovery complete, media found") 287 # FIXME: does item exist because it is popped below ? 288 # if so, that's ugly and non-obvious and should be fixed 289 uri = "file://" + item[0] 290 timestamp = item[1] 291 duration = item[2] 292 offset = item[3] 293 piid = item[4] 294 295 hasA = disc.is_audio 296 hasV = disc.is_video 297 durationDiscovered = 0 298 if hasA and hasV: 299 durationDiscovered = min(disc.audiolength, 300 disc.videolength) 301 elif hasA: 302 durationDiscovered = disc.audiolength 303 elif hasV: 304 durationDiscovered = disc.videolength 305 if not duration or duration > durationDiscovered: 306 duration = durationDiscovered 307 308 if duration + offset > durationDiscovered: 309 offset = 0 310 311 if duration > 0: 312 self.playlist.addItem(piid, timestamp, uri, 313 offset, duration, hasA, hasV) 314 else: 315 self.warning("Duration of item is zero, not adding") 316 else: 317 self.warning("Discover failed to find media in %s", item[0]) 318 319 # We don't want to burn too much cpu discovering all the files; 320 # this throttles the discovery rate to a reasonable level 321 self.debug("Continuing on to next file in one second") 322 reactor.callLater(1, self._discoverPending)
323 324 if not self._pending_items: 325 self.debug("No more files to discover") 326 self._discovering = False 327 return 328 329 if self._discovering_blocked > 0: 330 self.debug("Discovering blocked: %d" % self._discovering_blocked) 331 self._discovering = False 332 return 333 334 self._discovering = True 335 336 item = self._pending_items.pop(0) 337 338 self.debug("Discovering file %s", item[0]) 339 disc = discoverer.Discoverer(item[0]) 340 341 disc.connect('discovered', _discovered) 342 disc.discover() 343
344 - def addItemToPlaylist(self, filename, timestamp, duration, offset, piid):
345 # We only want to add it if it's plausibly schedulable. 346 end = timestamp 347 if duration is not None: 348 end += duration 349 if end < time.time() * gst.SECOND: 350 self.debug("Early-out: ignoring add for item in past") 351 return 352 353 if filename[0] != '/' and self._baseDirectory: 354 filename = self._baseDirectory + filename 355 356 self._pending_items.append((filename, timestamp, duration, offset, 357 piid)) 358 359 # Now launch the discoverer for any pending items 360 self.startDiscovery()
361 362
363 -class PlaylistXMLParser(PlaylistParser):
364 logCategory = 'playlist-xml' 365
366 - def parseData(self, data):
367 """ 368 Parse playlist XML document data 369 """ 370 fileHandle = StringIO(data) 371 self.parseFile(fileHandle)
372
373 - def replaceFile(self, file, piid):
374 self.playlist.removeItems(piid) 375 self.parseFile(file, piid)
376
377 - def parseFile(self, file, piid=None):
378 """ 379 Parse a playlist file. Adds the contents of the file to the existing 380 playlist, overwriting any existing entries for the same time period. 381 """ 382 parser = fxml.Parser() 383 384 root = parser.getRoot(file) 385 386 node = root.documentElement 387 self.debug("Parsing playlist from file %s", file) 388 if node.nodeName != 'playlist': 389 raise fxml.ParserError("Root node is not 'playlist'") 390 391 self.blockDiscovery() 392 try: 393 for child in node.childNodes: 394 if child.nodeType == Node.ELEMENT_NODE and \ 395 child.nodeName == 'entry': 396 self.debug("Parsing entry") 397 self._parsePlaylistEntry(parser, child, piid) 398 finally: 399 self.unblockDiscovery()
400 401 # A simplified private version of this code from fxml without the 402 # undesirable unicode->str conversions. 403
404 - def _parseAttributes(self, node, required, optional):
405 out = [] 406 for k in required: 407 if node.hasAttribute(k): 408 out.append(node.getAttribute(k)) 409 else: 410 raise fxml.ParserError("Missing required attribute %s" % k) 411 412 for k in optional: 413 if node.hasAttribute(k): 414 out.append(node.getAttribute(k)) 415 else: 416 out.append(None) 417 return out
418
419 - def _parsePlaylistEntry(self, parser, entry, piid):
420 mandatory = ['filename', 'time'] 421 optional = ['duration', 'offset'] 422 423 (filename, timestamp, duration, offset) = self._parseAttributes( 424 entry, mandatory, optional) 425 426 if duration is not None: 427 duration = int(float(duration) * gst.SECOND) 428 if offset is None: 429 offset = 0 430 offset = int(offset) * gst.SECOND 431 432 timestamp = self._parseTimestamp(timestamp) 433 434 # Assume UTF-8 filesystem. 435 filename = filename.encode("UTF-8") 436 437 self.addItemToPlaylist(filename, timestamp, duration, offset, piid)
438
439 - def _parseTimestamp(self, ts):
440 # Take TS in YYYY-MM-DDThh:mm:ss.ssZ format, return timestamp in 441 # nanoseconds since the epoch 442 443 # time.strptime() doesn't handle the fractional seconds part. We ignore 444 # it entirely, after verifying that it has the right format. 445 tsmain, trailing = ts[:-4], ts[-4:] 446 if trailing[0] != '.' or trailing[3] != 'Z' or \ 447 not trailing[1].isdigit() or not trailing[2].isdigit(): 448 raise fxml.ParserError("Invalid timestamp %s" % ts) 449 format = "%Y-%m-%dT%H:%M:%S" 450 451 try: 452 timestruct = time.strptime(tsmain, format) 453 return int(calendar.timegm(timestruct) * gst.SECOND) 454 except ValueError: 455 raise fxml.ParserError("Invalid timestamp %s" % ts)
456