Package flumotion :: Package component :: Package consumers :: Package disker :: Module disker
[hide private]

Source Code for Module flumotion.component.consumers.disker.disker

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  import errno 
 23  import os 
 24  import time 
 25  import tempfile 
 26   
 27  import gobject 
 28  import gst 
 29   
 30  from twisted.internet import reactor 
 31   
 32  from flumotion.component import feedcomponent 
 33  from flumotion.common import log, gstreamer, pygobject, messages, errors 
 34  from flumotion.common import documentation, format 
 35  from flumotion.common import eventcalendar 
 36  from flumotion.common.i18n import N_, gettexter 
 37  from flumotion.common.mimetypes import mimeTypeToExtention 
 38  from flumotion.common.pygobject import gsignal 
 39  # proxy import 
 40  from flumotion.component.component import moods 
 41   
 42  __all__ = ['Disker'] 
 43  __version__ = "$Rev: 8162 $" 
 44  T_ = gettexter() 
 45   
 46  """ 
 47  Disker has a property 'ical-schedule'. This allows an ical file to be 
 48  specified in the config and have recordings scheduled based on events. 
 49  This file will be monitored for changes and events reloaded if this 
 50  happens. 
 51   
 52  The filename of a recording started from an ical file will be produced 
 53  via passing the ical event summary through strftime, so that an archive 
 54  can encode the date and time that it was begun. 
 55   
 56  The time that will be given to strftime will be given in the timezone of 
 57  the ical event. In practice this will either be UTC or the local time of 
 58  the machine running the disker, as the ical scheduler does not 
 59  understand arbitrary timezones. 
 60  """ 
 61   
 62   
63 -class DiskerMedium(feedcomponent.FeedComponentMedium):
64 # called when admin ui wants to stop recording. call changeFilename to 65 # restart 66
67 - def remote_stopRecording(self):
68 self.comp.stopRecording()
69 70 # called when admin ui wants to change filename (this starts recording if 71 # the disker isn't currently writing to disk) 72
73 - def remote_changeFilename(self, filenameTemplate=None):
74 self.comp.changeFilename(filenameTemplate)
75
76 - def remote_scheduleRecordings(self, icalData):
77 icalFile = tempfile.TemporaryFile() 78 icalFile.write(icalData) 79 icalFile.seek(0) 80 81 self.comp.stopRecording() 82 83 self.comp.scheduleRecordings(icalFile) 84 icalFile.close()
85 86 # called when admin ui wants updated state (current filename info) 87
88 - def remote_notifyState(self):
89 self.comp.update_ui_state()
90 91
92 -class Disker(feedcomponent.ParseLaunchComponent, log.Loggable):
93 componentMediumClass = DiskerMedium 94 checkOffset = True 95 pipe_template = 'multifdsink sync-method=1 name=fdsink mode=1 sync=false' 96 file = None 97 directory = None 98 location = None 99 caps = None 100 101 _startFilenameTemplate = None # template to use when starting off recording 102 _startTimeTuple = None # time tuple of event when starting 103 _rotateTimeDelayedCall = None 104 _symlinkToLastRecording = None 105 _symlinkToCurrentRecording = None 106 107 ### BaseComponent methods 108
109 - def init(self):
110 self._can_schedule = (eventcalendar.HAS_ICALENDAR and 111 eventcalendar.HAS_DATEUTIL) 112 self.uiState.addKey('filename', None) 113 self.uiState.addKey('recording', False) 114 self.uiState.addKey('can-schedule', self._can_schedule) 115 self.uiState.addKey('has-schedule', False) 116 # list of (dt (in UTC, without tzinfo), which, content) 117 self.uiState.addListKey('next-points')
118 119 ### ParseLaunchComponent methods 120
121 - def get_pipeline_string(self, properties):
122 directory = properties['directory'] 123 124 self.directory = directory 125 126 self.fixRenamedProperties(properties, [('rotateType', 'rotate-type')]) 127 128 rotateType = properties.get('rotate-type', 'none') 129 130 # validate rotate-type and size/time properties first 131 if not rotateType in ['none', 'size', 'time']: 132 m = messages.Error(T_(N_( 133 "The configuration property 'rotate-type' should be set to " 134 "'size', time', or 'none', not '%s'. " 135 "Please fix the configuration."), 136 rotateType), mid='rotate-type') 137 self.addMessage(m) 138 raise errors.ComponentSetupHandledError() 139 140 # size and time types need the property specified 141 if rotateType in ['size', 'time']: 142 if rotateType not in properties.keys(): 143 m = messages.Error(T_(N_( 144 "The configuration property '%s' should be set. " 145 "Please fix the configuration."), 146 rotateType), mid='rotate-type') 147 self.addMessage(m) 148 raise errors.ComponentSetupHandledError() 149 150 # now act on the properties 151 if rotateType == 'size': 152 self.setSizeRotate(properties['size']) 153 elif rotateType == 'time': 154 self.setTimeRotate(properties['time']) 155 # FIXME: should add a way of saying "do first cycle at this time" 156 157 return self.pipe_template
158
159 - def configure_pipeline(self, pipeline, properties):
160 self.debug('configure_pipeline for disker') 161 self._symlinkToLastRecording = \ 162 properties.get('symlink-to-last-recording', None) 163 self._symlinkToCurrentRecording = \ 164 properties.get('symlink-to-current-recording', None) 165 self._recordAtStart = properties.get('start-recording', True) 166 self._defaultFilenameTemplate = properties.get('filename', 167 '%s.%%Y%%m%%d-%%H%%M%%S' % self.getName()) 168 self._startFilenameTemplate = self._defaultFilenameTemplate 169 icalfn = properties.get('ical-schedule') 170 if self._can_schedule and icalfn: 171 self.scheduleRecordings(open(icalfn, 'r')) 172 elif icalfn: 173 # ical schedule is set, but self._can_schedule is False 174 175 def missingModule(moduleName): 176 m = messages.Error(T_(N_( 177 "An iCal file has been specified for scheduling, " 178 "but the '%s' module is not installed.\n"), moduleName), 179 mid='error-python-%s' % moduleName) 180 documentation.messageAddPythonInstall(m, moduleName) 181 self.debug(m) 182 self.addMessage(m)
183 184 if not eventcalendar.HAS_ICALENDAR: 185 missingModule('icalendar') 186 if not eventcalendar.HAS_DATEUTIL: 187 missingModule('dateutil') 188 # self._can_schedule is False, so one of the above surely happened 189 raise errors.ComponentSetupHandledError() 190 191 sink = self.get_element('fdsink') 192 sink.get_pad('sink').connect('notify::caps', self._notify_caps_cb) 193 # connect to client-removed so we can detect errors in file writing 194 sink.connect('client-removed', self._client_removed_cb) 195 196 # set event probe if we should react to video mark events 197 react_to_marks = properties.get('react-to-stream-markers', False) 198 if react_to_marks: 199 pfx = properties.get('stream-marker-filename-prefix', '%03d.') 200 self._markerPrefix = pfx 201 sink.get_pad('sink').add_event_probe(self._markers_event_probe)
202 203 ### our methods 204
205 - def setTimeRotate(self, time):
206 """ 207 @param time: duration of file (in seconds) 208 """ 209 if self._rotateTimeDelayedCall: 210 self._rotateTimeDelayedCall.cancel() 211 self._rotateTimeDelayedCall = reactor.callLater( 212 time, self._rotateTimeCallLater, time)
213
214 - def setSizeRotate(self, size):
215 """ 216 @param size: size of file (in bytes) 217 """ 218 reactor.callLater(5, self._rotateSizeCallLater, size)
219
220 - def _rotateTimeCallLater(self, time):
221 self.changeFilename() 222 223 # reschedule ourselves indefinitely 224 self._rotateTimeDelayedCall = reactor.callLater( 225 time, self._rotateTimeCallLater, time)
226
227 - def _rotateSizeCallLater(self, size):
228 if not self.location: 229 self.warning('Cannot rotate file, no file location set') 230 else: 231 if os.stat(self.location).st_size > size: 232 self.changeFilename() 233 234 # Add a new one 235 reactor.callLater(5, self._rotateSizeCallLater, size)
236
237 - def getMime(self):
238 if self.caps: 239 return self.caps.get_structure(0).get_name()
240 241 # FIXME: is this method used anywhere ? 242
243 - def get_content_type(self):
244 mime = self.getMime() 245 if mime == 'multipart/x-mixed-replace': 246 mime += ";boundary=ThisRandomString" 247 return mime
248
249 - def scheduleRecordings(self, icalFile):
250 self.uiState.set('has-schedule', True) 251 self.debug('Parsing iCalendar file %s' % icalFile) 252 from flumotion.component.base import scheduler 253 try: 254 self.icalScheduler = scheduler.ICalScheduler(icalFile) 255 self.icalScheduler.subscribe(self.eventInstanceStarted, 256 self.eventInstanceEnded) 257 # FIXME: this should be handled through the subscription 258 # handlers; for that, we should subscribe before the calendar 259 # gets added 260 cal = self.icalScheduler.getCalendar() 261 eventInstances = cal.getActiveEventInstances() 262 if eventInstances: 263 instance = eventInstances[0] 264 content = instance.event.content 265 self.info('Event %s is in progress, start recording' % 266 content) 267 self._startFilenameTemplate = content 268 self._startTimeTuple = instance.start.utctimetuple() 269 self._recordAtStart = True 270 else: 271 self.info('No events in progress') 272 self._recordAtStart = False 273 self._updateNextPoints() 274 except (ValueError, IndexError, KeyError), e: 275 m = messages.Warning(T_(N_( 276 "Error parsing ical file %s, so not scheduling any" 277 " events." % icalFile)), 278 debug=log.getExceptionMessage(e), mid="error-parsing-ical") 279 self.addMessage(m)
280
281 - def changeFilename(self, filenameTemplate=None, timeTuple=None):
282 """ 283 @param filenameTemplate: strftime format string to decide filename 284 @param timeTuple: a valid time tuple to pass to strftime, 285 defaulting to time.localtime(). 286 """ 287 mime = self.getMime() 288 ext = mimeTypeToExtention(mime) 289 290 self.stopRecording() 291 292 sink = self.get_element('fdsink') 293 if sink.get_state() == gst.STATE_NULL: 294 sink.set_state(gst.STATE_READY) 295 296 filename = "" 297 if not filenameTemplate: 298 filenameTemplate = self._defaultFilenameTemplate 299 filename = "%s.%s" % (format.strftime(filenameTemplate, 300 timeTuple or time.localtime()), ext) 301 self.location = os.path.join(self.directory, filename) 302 303 # only overwrite existing files if it was last changed before the 304 # start of this event; ie. if it is a recording of a previous event 305 location = self.location 306 i = 1 307 while os.path.exists(location): 308 mtimeTuple = time.gmtime(os.stat(location).st_mtime) 309 if mtimeTuple <= timeTuple: 310 self.info( 311 "Existing recording %s from previous event, overwriting", 312 location) 313 break 314 315 self.info( 316 "Existing recording %s from current event, changing name", 317 location) 318 location = self.location + '.' + str(i) 319 i += 1 320 self.location = location 321 322 self.info("Changing filename to %s", self.location) 323 try: 324 self.file = open(self.location, 'wb') 325 except IOError, e: 326 self.warning("Failed to open output file %s: %s", 327 self.location, log.getExceptionMessage(e)) 328 m = messages.Error(T_(N_( 329 "Failed to open output file '%s' for writing. " 330 "Check permissions on the file."), self.location)) 331 self.addMessage(m) 332 return 333 self._recordingStarted(self.file, self.location) 334 sink.emit('add', self.file.fileno()) 335 self.uiState.set('filename', self.location) 336 self.uiState.set('recording', True) 337 338 if self._symlinkToCurrentRecording: 339 self._updateSymlink(self.location, 340 self._symlinkToCurrentRecording)
341 365
366 - def stopRecording(self):
367 sink = self.get_element('fdsink') 368 if sink.get_state() == gst.STATE_NULL: 369 sink.set_state(gst.STATE_READY) 370 371 if self.file: 372 self.file.flush() 373 sink.emit('remove', self.file.fileno()) 374 self._recordingStopped(self.file, self.location) 375 self.file = None 376 self.uiState.set('filename', None) 377 self.uiState.set('recording', False) 378 if self._symlinkToLastRecording: 379 self._updateSymlink(self.location, 380 self._symlinkToLastRecording)
381
382 - def _notify_caps_cb(self, pad, param):
383 caps = pad.get_negotiated_caps() 384 if caps == None: 385 return 386 387 caps_str = gstreamer.caps_repr(caps) 388 self.debug('Got caps: %s' % caps_str) 389 390 new = True 391 if not self.caps == None: 392 self.warning('Already had caps: %s, replacing' % caps_str) 393 new = False 394 395 self.debug('Storing caps: %s' % caps_str) 396 self.caps = caps 397 398 if new and self._recordAtStart: 399 reactor.callLater(0, self.changeFilename, 400 self._startFilenameTemplate, self._startTimeTuple)
401 402 # multifdsink::client-removed 403
404 - def _client_removed_cb(self, element, arg0, client_status):
405 # treat as error if we were removed because of GST_CLIENT_STATUS_ERROR 406 # FIXME: can we use the symbol instead of a numeric constant ? 407 if client_status == 4: 408 # since we get called from the streaming thread, hand off handling 409 # to the reactor's thread 410 reactor.callFromThread(self._client_error_cb)
411
412 - def _client_error_cb(self):
413 self.file.close() 414 self.file = None 415 416 self.setMood(moods.sad) 417 messageId = "error-writing-%s" % self.location 418 m = messages.Error(T_(N_( 419 "Error writing to file '%s'."), self.location), 420 mid=messageId, priority=40) 421 self.addMessage(m)
422
423 - def eventInstanceStarted(self, eventInstance):
424 self.debug('starting recording of %s', eventInstance.event.content) 425 self.changeFilename(eventInstance.event.content, 426 eventInstance.start.timetuple()) 427 self._updateNextPoints()
428
429 - def eventInstanceEnded(self, eventInstance):
430 self.debug('ending recording of %s', eventInstance.event.content) 431 self.stopRecording() 432 self._updateNextPoints()
433
434 - def _updateNextPoints(self):
435 # query the scheduler for what the next points are in its window 436 # and set it on the UI state 437 438 current = self.uiState.get('next-points')[:] 439 points = self.icalScheduler.getPoints() 440 new = [] 441 442 # twisted says 'Currently can't jelly datetime objects with tzinfo', 443 # so convert all to UTC then remove tzinfo. 444 445 def _utcAndStripTZ(dt): 446 from flumotion.common import eventcalendar 447 return dt.astimezone(eventcalendar.UTC).replace(tzinfo=None)
448 449 for p in points: 450 dtUTC = _utcAndStripTZ(p.dt) 451 dtStart = p.eventInstance.start.replace(tzinfo=None) 452 new.append((dtUTC, p.which, 453 format.strftime(p.eventInstance.event.content, 454 dtStart.timetuple()))) 455 456 for t in current: 457 if t not in new: 458 self.debug('removing tuple %r from next-points', t) 459 self.uiState.remove('next-points', t) 460 461 for t in new: 462 if t not in current: 463 self.debug('appending tuple %r to next-points', t) 464 self.uiState.append('next-points', t) 465
466 - def _recordingStarted(self, file, location):
467 socket = 'flumotion.component.consumers.disker.disker_plug.DiskerPlug' 468 # make sure plugs are configured with our socket, see #732 469 if socket not in self.plugs: 470 return 471 for plug in self.plugs[socket]: 472 self.debug('invoking recordingStarted on ' 473 'plug %r on socket %s', plug, socket) 474 plug.recordingStarted(file, location)
475
476 - def _recordingStopped(self, file, location):
477 socket = 'flumotion.component.consumers.disker.disker_plug.DiskerPlug' 478 # make sure plugs are configured with our socket, see #732 479 if socket not in self.plugs: 480 return 481 for plug in self.plugs[socket]: 482 self.debug('invoking recordingStopped on ' 483 'plug %r on socket %s', plug, socket) 484 plug.recordingStopped(file, location)
485 486 ### marker methods 487
488 - def _markers_event_probe(self, element, event):
489 if event.type == gst.EVENT_CUSTOM_DOWNSTREAM: 490 evt_struct = event.get_structure() 491 if evt_struct.get_name() == 'FluStreamMark': 492 if evt_struct['action'] == 'start': 493 self._onMarkerStart(evt_struct['prog_id']) 494 elif evt_struct['action'] == 'stop': 495 self._onMarkerStop() 496 return True
497
498 - def _onMarkerStop(self):
499 self.stopRecording()
500
501 - def _onMarkerStart(self, data):
502 tmpl = self._defaultFilenameTemplate 503 if self._markerPrefix: 504 try: 505 tmpl = '%s%s' % (self._markerPrefix % data, 506 self._defaultFilenameTemplate) 507 except TypeError, err: 508 m = messages.Warning(T_(N_('Failed expanding filename prefix: ' 509 '%r <-- %r.'), 510 self._markerPrefix, data), 511 mid='expand-marker-prefix') 512 self.addMessage(m) 513 self.warning('Failed expanding filename prefix: ' 514 '%r <-- %r; %r' % 515 (self._markerPrefix, data, err)) 516 self.changeFilename(tmpl)
517