Package flumotion :: Package component :: Package consumers :: Package disker :: Module disker
[hide private]

Source Code for Module flumotion.component.consumers.disker.disker

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  import errno 
 23  import os 
 24  import time 
 25  import tempfile 
 26  import datetime as dt 
 27   
 28  import gobject 
 29  import gst 
 30   
 31  from twisted.internet import reactor 
 32   
 33  from flumotion.component import feedcomponent 
 34  from flumotion.common import log, gstreamer, pygobject, messages, errors 
 35  from flumotion.common import documentation, format 
 36  from flumotion.common import eventcalendar, poller 
 37  from flumotion.common.i18n import N_, gettexter 
 38  from flumotion.common.mimetypes import mimeTypeToExtention 
 39  from flumotion.common.pygobject import gsignal 
 40   
 41  #   the flumotion.twisted.flavors is not bundled, and as we only need it for 
 42  #   the interface, we can skip doing the import and thus not create 
 43  #   incompatibilities with workers running old versions of flavors that will be 
 44  #   asked to create diskers importing the IStateCacheableListener from that 
 45  #   module 
 46  # from flumotion.twisted.flavors import IStateCacheableListener 
 47   
 48  # proxy import 
 49  from flumotion.component.component import moods 
 50   
 51  __all__ = ['Disker'] 
 52  __version__ = "$Rev$" 
 53  T_ = gettexter() 
 54   
 55  # Disk Usage polling frequency 
 56  DISKPOLL_FREQ = 60 
 57   
 58  # Maximum number of information to store in the filelist 
 59  FILELIST_SIZE = 100 
 60   
 61  """ 
 62  Disker has a property 'ical-schedule'. This allows an ical file to be 
 63  specified in the config and have recordings scheduled based on events. 
 64  This file will be monitored for changes and events reloaded if this 
 65  happens. 
 66   
 67  The filename of a recording started from an ical file will be produced 
 68  via passing the ical event summary through strftime, so that an archive 
 69  can encode the date and time that it was begun. 
 70   
 71  The time that will be given to strftime will be given in the timezone of 
 72  the ical event. In practice this will either be UTC or the local time of 
 73  the machine running the disker, as the ical scheduler does not 
 74  understand arbitrary timezones. 
 75  """ 
 76   
 77   
78 -class DiskerMedium(feedcomponent.FeedComponentMedium):
79 # called when admin ui wants to stop recording. call changeFilename to 80 # restart 81
82 - def remote_stopRecording(self):
83 self.comp.stopRecording()
84 85 # called when admin ui wants to change filename (this starts recording if 86 # the disker isn't currently writing to disk) 87
88 - def remote_changeFilename(self, filenameTemplate=None):
89 self.comp.changeFilename(filenameTemplate)
90
91 - def remote_scheduleRecordings(self, icalData):
92 icalFile = tempfile.TemporaryFile() 93 icalFile.write(icalData) 94 icalFile.seek(0) 95 96 self.comp.stopRecording() 97 98 self.comp.scheduleRecordings(icalFile) 99 icalFile.close()
100 101 # called when admin ui wants updated state (current filename info) 102
103 - def remote_notifyState(self):
104 self.comp.update_ui_state()
105 106
107 -class Disker(feedcomponent.ParseLaunchComponent, log.Loggable):
108 componentMediumClass = DiskerMedium 109 checkOffset = True 110 pipe_template = 'multifdsink sync-method=1 name=fdsink mode=1 sync=false' 111 file = None 112 directory = None 113 location = None 114 caps = None 115 last_tstamp = None 116 117 _startFilenameTemplate = None # template to use when starting off recording 118 _startTime = None # time of event when starting 119 _rotateTimeDelayedCall = None 120 _pollDiskDC = None # _pollDisk delayed calls 121 _symlinkToLastRecording = None 122 _symlinkToCurrentRecording = None 123 124 # see the commented out import statement for IStateCacheableListener at 125 # the beginning of this file 126 # implements(IStateCacheableListener) 127 128 ### BaseComponent methods 129
130 - def init(self):
131 self._can_schedule = (eventcalendar.HAS_ICALENDAR and 132 eventcalendar.HAS_DATEUTIL) 133 self.uiState.addKey('filename', None) 134 self.uiState.addKey('recording', False) 135 self.uiState.addKey('can-schedule', self._can_schedule) 136 self.uiState.addKey('has-schedule', False) 137 self.uiState.addKey('rotate-type', None) 138 self.uiState.addKey('disk-free', None) 139 # list of (dt (in UTC, without tzinfo), which, content) 140 self.uiState.addListKey('next-points') 141 self.uiState.addListKey('filelist') 142 143 self._diskPoller = poller.Poller(self._pollDisk, 144 DISKPOLL_FREQ, 145 start=False)
146 147 ### uiState observer triggers 148
149 - def observerAppend(self, observer, num):
150 # PB may not have finished setting up its state and doing a 151 # remoteCall immediately here may cause some problems to the other 152 # side. For us to send the initial disk usage value with no 153 # noticeable delay, we will do it in a callLater with a timeout 154 # value of 0 155 self.debug("observer has started watching us, starting disk polling") 156 if not self._diskPoller.running and not self._pollDiskDC: 157 self._pollDiskDC = reactor.callLater(0, 158 self._diskPoller.start, 159 immediately=True) 160 # Start the BaseComponent pollers 161 feedcomponent.ParseLaunchComponent.observerAppend(self, observer, num)
162
163 - def observerRemove(self, observer, num):
164 if num == 0: 165 # cancel delayed _pollDisk calls if there's any 166 if self._pollDiskDC: 167 self._pollDiskDC.cancel() 168 self._pollDiskDC = None 169 170 self.debug("no more observers left, shutting down disk polling") 171 self._diskPoller.stop() 172 # Stop the BaseComponent pollers 173 feedcomponent.ParseLaunchComponent.observerRemove(self, observer, num)
174
175 - def check_properties(self, props, addMessage):
176 props = self.config['properties'] 177 rotateType = props.get('rotate-type', 'none') 178 179 if not rotateType in ['none', 'size', 'time']: 180 msg = messages.Error(T_(N_( 181 "The configuration property 'rotate-type' should be set to " 182 "'size', time', or 'none', not '%s'. " 183 "Please fix the configuration."), 184 rotateType), mid='rotate-type') 185 addMessage(msg) 186 raise errors.ConfigError(msg) 187 188 if rotateType in ['size', 'time']: 189 if rotateType not in props.keys(): 190 msg = messages.Error(T_(N_( 191 "The configuration property '%s' should be set. " 192 "Please fix the configuration."), 193 rotateType), mid='rotate-type') 194 addMessage(msg) 195 raise errors.ConfigError(msg) 196 197 if props[rotateType] == 0: 198 msg = messages.Error(T_(N_("Configuration error: " \ 199 "'rotate-type' %s value cannot be set to 0."), 200 rotateType), mid='rotate-type') 201 addMessage(msg) 202 raise errors.ConfigError(msg)
203 204 ### ParseLaunchComponent methods 205
206 - def get_pipeline_string(self, properties):
207 directory = properties['directory'] 208 209 self.directory = directory 210 211 self.fixRenamedProperties(properties, [('rotateType', 'rotate-type')]) 212 213 rotateType = properties.get('rotate-type', 'none') 214 215 # now act on the properties 216 if rotateType == 'size': 217 self.setSizeRotate(properties['size']) 218 self.uiState.set('rotate-type', 219 'every %sB' % \ 220 format.formatStorage(properties['size'])) 221 elif rotateType == 'time': 222 self.setTimeRotate(properties['time']) 223 self.uiState.set('rotate-type', 224 'every %s' % \ 225 format.formatTime(properties['time'])) 226 else: 227 self.uiState.set('rotate-type', 'disabled') 228 # FIXME: should add a way of saying "do first cycle at this time" 229 230 return self.pipe_template
231
232 - def configure_pipeline(self, pipeline, properties):
233 self.debug('configure_pipeline for disker') 234 self._symlinkToLastRecording = \ 235 properties.get('symlink-to-last-recording', None) 236 self._symlinkToCurrentRecording = \ 237 properties.get('symlink-to-current-recording', None) 238 self._recordAtStart = properties.get('start-recording', True) 239 self._defaultFilenameTemplate = properties.get('filename', 240 '%s.%%Y%%m%%d-%%H%%M%%S' % self.getName()) 241 self._startFilenameTemplate = self._defaultFilenameTemplate 242 icalfn = properties.get('ical-schedule') 243 if self._can_schedule and icalfn: 244 self.scheduleRecordings(open(icalfn, 'r')) 245 elif icalfn: 246 # ical schedule is set, but self._can_schedule is False 247 248 def missingModule(moduleName): 249 m = messages.Error(T_(N_( 250 "An iCal file has been specified for scheduling, " 251 "but the '%s' module is not installed.\n"), moduleName), 252 mid='error-python-%s' % moduleName) 253 documentation.messageAddPythonInstall(m, moduleName) 254 self.debug(m) 255 self.addMessage(m)
256 257 if not eventcalendar.HAS_ICALENDAR: 258 missingModule('icalendar') 259 if not eventcalendar.HAS_DATEUTIL: 260 missingModule('dateutil') 261 # self._can_schedule is False, so one of the above surely happened 262 raise errors.ComponentSetupHandledError() 263 264 sink = self.get_element('fdsink') 265 266 if gstreamer.element_factory_has_property('multifdsink', 267 'resend-streamheader'): 268 sink.set_property('resend-streamheader', False) 269 else: 270 self.debug("resend-streamheader property not available, " 271 "resending streamheader when it changes in the caps") 272 sink.get_pad('sink').connect('notify::caps', self._notify_caps_cb) 273 # connect to client-removed so we can detect errors in file writing 274 sink.connect('client-removed', self._client_removed_cb) 275 276 # set event probe if we should react to video mark events 277 react_to_marks = properties.get('react-to-stream-markers', False) 278 if react_to_marks: 279 pfx = properties.get('stream-marker-filename-prefix', '%03d.') 280 self._markerPrefix = pfx 281 sink.get_pad('sink').add_event_probe(self._markers_event_probe)
282 283 ### our methods 284
285 - def _pollDisk(self):
286 # Figure out the remaining disk space where the disker is saving 287 # files to 288 self._pollDiskDC = None 289 s = None 290 try: 291 s = os.statvfs(self.directory) 292 except Exception, e: 293 self.debug('failed to figure out disk space: %s', 294 log.getExceptionMessage(e)) 295 296 if not s: 297 free = None 298 else: 299 free = format.formatStorage(s.f_frsize * s.f_bavail) 300 301 if self.uiState.get('disk-free') != free: 302 self.debug("disk usage changed, reporting to observers") 303 self.uiState.set('disk-free', free)
304
305 - def setTimeRotate(self, time):
306 """ 307 @param time: duration of file (in seconds) 308 """ 309 if self._rotateTimeDelayedCall: 310 self._rotateTimeDelayedCall.cancel() 311 self._rotateTimeDelayedCall = reactor.callLater( 312 time, self._rotateTimeCallLater, time)
313
314 - def setSizeRotate(self, size):
315 """ 316 @param size: size of file (in bytes) 317 """ 318 reactor.callLater(5, self._rotateSizeCallLater, size)
319
320 - def _rotateTimeCallLater(self, time):
321 self.changeFilename() 322 323 # reschedule ourselves indefinitely 324 self._rotateTimeDelayedCall = reactor.callLater( 325 time, self._rotateTimeCallLater, time)
326
327 - def _rotateSizeCallLater(self, size):
328 if not self.location: 329 self.warning('Cannot rotate file, no file location set') 330 else: 331 if os.stat(self.location).st_size > size: 332 self.changeFilename() 333 334 # Add a new one 335 reactor.callLater(5, self._rotateSizeCallLater, size)
336
337 - def getMime(self):
338 if self.caps: 339 return self.caps.get_structure(0).get_name()
340 341 # FIXME: is this method used anywhere ? 342
343 - def get_content_type(self):
344 mime = self.getMime() 345 if mime == 'multipart/x-mixed-replace': 346 mime += ";boundary=ThisRandomString" 347 return mime
348
349 - def scheduleRecordings(self, icalFile):
350 self.uiState.set('has-schedule', True) 351 self.debug('Parsing iCalendar file %s' % icalFile) 352 from flumotion.component.base import scheduler 353 try: 354 self.icalScheduler = scheduler.ICalScheduler(icalFile) 355 self.icalScheduler.subscribe(self.eventInstanceStarted, 356 self.eventInstanceEnded) 357 # FIXME: this should be handled through the subscription 358 # handlers; for that, we should subscribe before the calendar 359 # gets added 360 cal = self.icalScheduler.getCalendar() 361 eventInstances = cal.getActiveEventInstances() 362 if eventInstances: 363 instance = eventInstances[0] 364 content = instance.event.content 365 self.info('Event %s is in progress, start recording' % 366 content) 367 self._startFilenameTemplate = content 368 self._startTime = instance.start 369 self._recordAtStart = True 370 else: 371 self.info('No events in progress') 372 self._recordAtStart = False 373 self._updateNextPoints() 374 except (ValueError, IndexError, KeyError), e: 375 m = messages.Warning(T_(N_( 376 "Error parsing ical file %s, so not scheduling any" 377 " events." % icalFile)), 378 debug=log.getExceptionMessage(e), mid="error-parsing-ical") 379 self.addMessage(m)
380
381 - def changeFilename(self, filenameTemplate=None, datetime=None):
382 """ 383 @param filenameTemplate: strftime format string to decide filename 384 @param time: an aware datetime used for the filename and 385 to compare if an existing file needs to be 386 overwritten. defaulting to datetime.now(). 387 """ 388 mime = self.getMime() 389 ext = mimeTypeToExtention(mime) 390 391 # if the events comes from the calendar, datetime is aware and we can 392 # deduce from it both the local and utc time. 393 # in case datetime is None datetime.now() doesn't return an aware 394 # datetime, so we need to get both the local time and the utc time. 395 tm = datetime or dt.datetime.now() 396 tmutc = datetime or dt.datetime.utcnow() 397 398 self.stopRecording() 399 400 sink = self.get_element('fdsink') 401 if sink.get_state() == gst.STATE_NULL: 402 sink.set_state(gst.STATE_READY) 403 404 filename = "" 405 if not filenameTemplate: 406 filenameTemplate = self._defaultFilenameTemplate 407 filename = "%s.%s" % (format.strftime(filenameTemplate, 408 # for the filename we want to use the local time 409 tm.timetuple()), ext) 410 self.location = os.path.join(self.directory, filename) 411 412 # only overwrite existing files if it was last changed before the 413 # start of this event; ie. if it is a recording of a previous event 414 location = self.location 415 i = 1 416 while os.path.exists(location): 417 mtimeTuple = time.gmtime(os.stat(location).st_mtime) 418 # time.gmtime returns a time tuple in utc, so we compare against 419 # the utc timetuple of the datetime 420 if mtimeTuple <= tmutc.utctimetuple(): 421 self.info( 422 "Existing recording %s from previous event, overwriting", 423 location) 424 break 425 426 self.info( 427 "Existing recording %s from current event, changing name", 428 location) 429 location = self.location + '.' + str(i) 430 i += 1 431 self.location = location 432 433 self.info("Changing filename to %s", self.location) 434 try: 435 self.file = open(self.location, 'wb') 436 except IOError, e: 437 self.warning("Failed to open output file %s: %s", 438 self.location, log.getExceptionMessage(e)) 439 m = messages.Error(T_(N_( 440 "Failed to open output file '%s' for writing. " 441 "Check permissions on the file."), self.location)) 442 self.addMessage(m) 443 return 444 self._recordingStarted(self.file, self.location) 445 sink.emit('add', self.file.fileno()) 446 self.last_tstamp = time.time() 447 self.uiState.set('filename', self.location) 448 self.uiState.set('recording', True) 449 450 if self._symlinkToCurrentRecording: 451 self._updateSymlink(self.location, 452 self._symlinkToCurrentRecording)
453 477
478 - def stopRecording(self):
479 sink = self.get_element('fdsink') 480 if sink.get_state() == gst.STATE_NULL: 481 sink.set_state(gst.STATE_READY) 482 483 if self.file: 484 self.file.flush() 485 sink.emit('remove', self.file.fileno()) 486 self._recordingStopped(self.file, self.location) 487 self.file = None 488 self.uiState.set('filename', None) 489 self.uiState.set('recording', False) 490 try: 491 size = format.formatStorage(os.stat(self.location).st_size) 492 except EnvironmentError, e: 493 # catch File not found, permission denied, disk problems 494 size = "unknown" 495 496 # Limit number of entries on filelist, remove the oldest entry 497 fl = self.uiState.get('filelist', otherwise=[]) 498 if FILELIST_SIZE == len(fl): 499 self.uiState.remove('filelist', fl[0]) 500 501 self.uiState.append('filelist', (self.last_tstamp, 502 self.location, 503 size)) 504 505 if self._symlinkToLastRecording: 506 self._updateSymlink(self.location, 507 self._symlinkToLastRecording)
508
509 - def _notify_caps_cb(self, pad, param):
510 caps = pad.get_negotiated_caps() 511 if caps == None: 512 return 513 514 caps_str = gstreamer.caps_repr(caps) 515 self.debug('Got caps: %s' % caps_str) 516 517 new = True 518 if not self.caps == None: 519 self.warning('Already had caps: %s, replacing' % caps_str) 520 new = False 521 522 self.debug('Storing caps: %s' % caps_str) 523 self.caps = caps 524 525 if new and self._recordAtStart: 526 reactor.callLater(0, self.changeFilename, 527 self._startFilenameTemplate, self._startTime)
528 529 # multifdsink::client-removed 530
531 - def _client_removed_cb(self, element, arg0, client_status):
532 # treat as error if we were removed because of GST_CLIENT_STATUS_ERROR 533 # FIXME: can we use the symbol instead of a numeric constant ? 534 if client_status == 4: 535 # since we get called from the streaming thread, hand off handling 536 # to the reactor's thread 537 reactor.callFromThread(self._client_error_cb)
538
539 - def _client_error_cb(self):
540 self.file.close() 541 self.file = None 542 543 self.setMood(moods.sad) 544 messageId = "error-writing-%s" % self.location 545 m = messages.Error(T_(N_( 546 "Error writing to file '%s'."), self.location), 547 mid=messageId, priority=40) 548 self.addMessage(m)
549
550 - def eventInstanceStarted(self, eventInstance):
551 self.debug('starting recording of %s', eventInstance.event.content) 552 self.changeFilename(eventInstance.event.content, 553 eventInstance.start) 554 self._updateNextPoints()
555
556 - def eventInstanceEnded(self, eventInstance):
557 self.debug('ending recording of %s', eventInstance.event.content) 558 self.stopRecording() 559 self._updateNextPoints()
560
561 - def _updateNextPoints(self):
562 # query the scheduler for what the next points are in its window 563 # and set it on the UI state 564 565 current = self.uiState.get('next-points')[:] 566 points = self.icalScheduler.getPoints() 567 new = [] 568 569 # twisted says 'Currently can't jelly datetime objects with tzinfo', 570 # so convert all to UTC then remove tzinfo. 571 572 def _utcAndStripTZ(dt): 573 from flumotion.common import eventcalendar 574 return dt.astimezone(eventcalendar.UTC).replace(tzinfo=None)
575 576 for p in points: 577 dtUTC = _utcAndStripTZ(p.dt) 578 dtStart = p.eventInstance.start.replace(tzinfo=None) 579 new.append((dtUTC, p.which, 580 format.strftime(p.eventInstance.event.content, 581 dtStart.timetuple()))) 582 583 for t in current: 584 if t not in new: 585 self.debug('removing tuple %r from next-points', t) 586 self.uiState.remove('next-points', t) 587 588 for t in new: 589 if t not in current: 590 self.debug('appending tuple %r to next-points', t) 591 self.uiState.append('next-points', t) 592
593 - def _recordingStarted(self, file, location):
594 socket = 'flumotion.component.consumers.disker.disker_plug.DiskerPlug' 595 # make sure plugs are configured with our socket, see #732 596 if socket not in self.plugs: 597 return 598 for plug in self.plugs[socket]: 599 self.debug('invoking recordingStarted on ' 600 'plug %r on socket %s', plug, socket) 601 plug.recordingStarted(file, location)
602
603 - def _recordingStopped(self, file, location):
604 socket = 'flumotion.component.consumers.disker.disker_plug.DiskerPlug' 605 # make sure plugs are configured with our socket, see #732 606 if socket not in self.plugs: 607 return 608 for plug in self.plugs[socket]: 609 self.debug('invoking recordingStopped on ' 610 'plug %r on socket %s', plug, socket) 611 plug.recordingStopped(file, location)
612 613 ### marker methods 614
615 - def _markers_event_probe(self, element, event):
616 if event.type == gst.EVENT_CUSTOM_DOWNSTREAM: 617 evt_struct = event.get_structure() 618 if evt_struct.get_name() == 'FluStreamMark': 619 if evt_struct['action'] == 'start': 620 self._onMarkerStart(evt_struct['prog_id']) 621 elif evt_struct['action'] == 'stop': 622 self._onMarkerStop() 623 return True
624
625 - def _onMarkerStop(self):
626 self.stopRecording()
627
628 - def _onMarkerStart(self, data):
629 tmpl = self._defaultFilenameTemplate 630 if self._markerPrefix: 631 try: 632 tmpl = '%s%s' % (self._markerPrefix % data, 633 self._defaultFilenameTemplate) 634 except TypeError, err: 635 m = messages.Warning(T_(N_('Failed expanding filename prefix: ' 636 '%r <-- %r.'), 637 self._markerPrefix, data), 638 mid='expand-marker-prefix') 639 self.addMessage(m) 640 self.warning('Failed expanding filename prefix: ' 641 '%r <-- %r; %r' % 642 (self._markerPrefix, data, err)) 643 self.changeFilename(tmpl)
644
645 - def do_stop(self):
646 if self._pollDiskDC: 647 self._pollDiskDC.cancel() 648 self._pollDiskDC = None 649 self._diskPoller.stop()
650