Package flumotion :: Package component :: Package combiners :: Package switch :: Module switch
[hide private]

Source Code for Module flumotion.component.combiners.switch.switch

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  import gst 
 23  import gobject 
 24   
 25  from twisted.internet import defer, reactor 
 26   
 27  from flumotion.common import errors, messages, log, python 
 28  from flumotion.common.i18n import N_, gettexter 
 29  from flumotion.common.planet import moods 
 30  from flumotion.component import feedcomponent 
 31  from flumotion.component.base import scheduler 
 32  from flumotion.component.padmonitor import PadMonitor 
 33  from flumotion.component.plugs import base 
 34  from flumotion.worker.checks import check 
 35   
 36  __version__ = "$Rev: 7993 $" 
 37  T_ = gettexter() 
 38   
 39   
40 -class SwitchMedium(feedcomponent.FeedComponentMedium):
41
42 - def remote_switchToMaster(self):
43 return self.comp.switch_to("master")
44
45 - def remote_switchToBackup(self):
46 return self.comp.switch_to("backup")
47
48 - def remote_switchTo(self, logicalFeed):
49 return self.comp.switch_to(logicalFeed)
50 51
52 -class ICalSwitchPlug(base.ComponentPlug):
53 logCategory = "ical-switch" 54
55 - def start(self, component):
56 self._sid = None 57 self.sched = None 58 try: 59 60 def eventStarted(eventInstance): 61 self.debug("event started %r", eventInstance.event.uid) 62 component.switch_to("backup")
63 64 def eventEnded(eventInstance): 65 self.debug("event ended %r", eventInstance.event.uid) 66 component.switch_to("master")
67 68 # if an event starts, semantics are to switch to backup 69 # if an event ends, semantics are to switch to master 70 filename = self.args['properties']['ical-schedule'] 71 self.sched = scheduler.ICalScheduler(open(filename, 'r')) 72 self._sid = self.sched.subscribe(eventStarted, eventEnded) 73 if self.sched.getCalendar().getActiveEventInstances(): 74 component.idealFeed = "backup" 75 except ValueError: 76 fmt = N_("Error parsing ical file %s, so not scheduling " 77 "any events.") 78 component.addWarning("error-parsing-ical", fmt, filename) 79 except ImportError, e: 80 fmt = N_("An ical file has been specified for scheduling, " 81 "but the necessary modules are not installed.") 82 component.addWarning("error-parsing-ical", fmt, debug=e.message) 83
84 - def stop(self, component):
85 if self.sched: 86 self.sched.unsubscribe(self._sid)
87 88
89 -class Switch(feedcomponent.MultiInputParseLaunchComponent):
90 logCategory = 'switch' 91 componentMediumClass = SwitchMedium 92
93 - def init(self):
94 self.uiState.addKey("active-eater") 95 self.icalScheduler = None 96 97 # This structure maps logical feeds to sets of eaters. For 98 # example, "master" and "backup" could be logical feeds, and 99 # would be the keys in this dict, mapping to lists of eater 100 # aliases corresponding to those feeds. The lengths of those 101 # lists is equal to the number of feeders that the element has, 102 # which is the number of individual streams in a logical feed. 103 # 104 # For example, {"master": ["audio-master", "video-master"], 105 # "backup": ["audio-backup", "video-backup"]} 106 # logical feed name -> [eater alias] 107 self.logicalFeeds = {} 108 # logical feed names in order of preference 109 self.feedsByPriority = [] 110 111 # eater alias -> (sink pad, switch element) 112 self.switchPads = {} 113 114 # Two variables form the state of the switch component. 115 # idealFeed 116 # The feed that we would like to provide, as chosen by 117 # the user, either by the UI, an ical file, a pattern 118 # detection, etc. 119 # activeFeed 120 # The feed currently being provided 121 self.idealFeed = None 122 self.activeFeed = None 123 124 # store of new segment events consumed on switch pads 125 # due to them having gone inactive 126 # eater alias -> event 127 self.newSegmentEvents = {} 128 129 # probe ids 130 # pad -> probe handler id 131 self.eventProbeIds = {} 132 self.bufferProbeIds = {} 133 134 # pad monitors for switch sink pads 135 self._padMonitors = {}
136
137 - def addWarning(self, id, format, *args, **kwargs):
138 self.warning(format, *args) 139 m = messages.Message(messages.WARNING, T_(format, *args), 140 mid=id, **kwargs) 141 self.addMessage(m)
142
143 - def clearWarning(self, id):
144 for m in self.state.get('messages')[:]: 145 if m.id == id: 146 self.state.remove('messages', m)
147
148 - def do_check(self):
149 150 def checkSignal(fact): 151 fact = fact.load() 152 signals = gobject.signal_list_names(fact.get_element_type()) 153 return 'block' in signals
154 155 def cb(result): 156 for m in result.messages: 157 self.addMessage(m) 158 return result.value
159 160 self.debug("checking for input-selector element") 161 d = check.checkPlugin('selector', 'gst-plugins-bad', 162 (0, 10, 5, 2), 'input-selector', checkSignal) 163 d.addCallback(cb) 164 return d 165
166 - def do_setup(self):
167 ical = self.config['properties'].get('ical-schedule', None) 168 if ical: 169 args = {'properties': {'ical-schedule': ical}} 170 self.icalScheduler = ICalSwitchPlug(args) 171 self.icalScheduler.start(self)
172
173 - def create_pipeline(self):
174 for name, aliases in self.get_logical_feeds(): 175 assert name not in self.logicalFeeds 176 for alias in aliases: 177 assert alias in self.eaters 178 self.logicalFeeds[name] = aliases 179 if self.idealFeed is None: 180 self.debug("idealFeed being set to %s", name) 181 self.idealFeed = name 182 self.feedsByPriority.append(name) 183 184 return feedcomponent.MultiInputParseLaunchComponent.create_pipeline( 185 self)
186
187 - def get_logical_feeds(self):
188 raise errors.NotImplementedError('subclasses should implement ' 189 'get_logical_feeds')
190
191 - def configure_pipeline(self, pipeline, properties):
192 193 def getDownstreamElement(e): 194 for pad in e.pads(): 195 if pad.get_direction() is gst.PAD_SRC: 196 peer = pad.get_peer() 197 return peer, peer.get_parent() 198 raise AssertionError('failed to find the switch')
199 200 switchElements = self.get_switch_elements(pipeline) 201 for alias in self.eaters: 202 e = pipeline.get_by_name(self.eaters[alias].elementName) 203 pad = None 204 while e not in switchElements: 205 self.log("Element: %s", e.get_name()) 206 pad, e = getDownstreamElement(e) 207 self.debug('eater %s maps to pad %s', alias, pad) 208 self.switchPads[alias] = pad, e 209 210 # set active pad correctly on each of the switch elements 211 # (pad, switch) 212 pairs = [self.switchPads[alias] 213 for alias in self.logicalFeeds[self.idealFeed]] 214 215 for p, s in pairs: 216 s.set_property('active-pad', p) 217 self.activeFeed = self.idealFeed 218 self.uiState.set("active-eater", self.idealFeed) 219 220 self.install_logical_feed_watches() 221 222 self.do_switch() 223 224 # add pad monitors on switch sink pads before we set them eaters active 225
226 - def install_logical_feed_watches(self):
227 228 def eaterSetActive(eaterAlias): 229 for feed, aliases in self.logicalFeeds.items(): 230 if eaterAlias in aliases: 231 if feed not in activeFeeds: 232 activeFeeds.append(feed) 233 self.feedSetActive(feed) 234 return
235 236 def eaterSetInactive(eaterAlias): 237 for feed, aliases in self.logicalFeeds.items(): 238 if eaterAlias in aliases and feed in activeFeeds: 239 activeFeeds.remove(feed) 240 self.feedSetInactive(feed) 241 # add an event and buffer probe to the switch pad 242 # so we can rewrite the newsegment that comes 243 # when eater is active again 244 # Not rewriting it causes the pad running time 245 # to be wrong due to the new segment having a start 246 # time being much lower than any subsequent buffers. 247 pad = self.switchPads[eaterAlias][0] 248 self.eventProbeIds[pad] = \ 249 pad.add_event_probe(self._eventProbe) 250 self.bufferProbeIds[pad] = \ 251 pad.add_buffer_probe(self._bufferProbe) 252 return 253 254 activeFeeds = [] 255 for alias in self.eaters: 256 self._padMonitors[alias] = PadMonitor(self.switchPads[alias][0], 257 alias, eaterSetActive, eaterSetInactive) 258
259 - def _eventProbe(self, pad, event):
260 # called from GStreamer threads 261 ret = True 262 if event.type == gst.EVENT_NEWSEGMENT: 263 ret = False 264 self.newSegmentEvents[pad] = event 265 if self.eventProbeIds[pad]: 266 pad.remove_event_probe(self.eventProbeIds[pad]) 267 del self.eventProbeIds[pad] 268 return ret
269
270 - def _bufferProbe(self, pad, buffer):
271 # called from GStreamer threads 272 ts = buffer.timestamp 273 if pad in self.newSegmentEvents: 274 parsed = self.newSegmentEvents[pad].parse_new_segment() 275 newEvent = gst.event_new_new_segment(parsed[0], parsed[1], 276 parsed[2], ts, parsed[4], parsed[5]) 277 pad.push_event(newEvent) 278 del self.newSegmentEvents[pad] 279 if pad in self.bufferProbeIds: 280 pad.remove_buffer_probe(self.bufferProbeIds[pad]) 281 del self.bufferProbeIds[pad] 282 return True
283
284 - def get_switch_elements(self, pipeline):
285 raise errors.NotImplementedError('subclasses should implement ' 286 'get_switch_elements')
287
288 - def is_active(self, feed):
289 return python.all([self.eaters[alias].isActive() 290 for alias in self.logicalFeeds[feed]])
291
292 - def feedSetActive(self, feed):
293 self.debug('feed %r is now active', feed) 294 if feed == self.idealFeed: 295 self.do_switch()
296
297 - def feedSetInactive(self, feed):
298 self.debug('feed %r is now inactive', feed)
299 300 # this function is used by the watchdogs 301
302 - def auto_switch(self):
303 allFeeds = self.feedsByPriority[:] 304 feed = None 305 while allFeeds: 306 feed = allFeeds.pop(0) 307 if self.is_active(feed): 308 self.debug('autoswitch selects feed %r', feed) 309 self.do_switch(feed) 310 break 311 else: 312 self.debug("could not select feed %r because not active", feed) 313 if feed is None: 314 feed = self.feedsByPriority.get(0, None) 315 self.debug('no feeds active during autoswitch, choosing %r', 316 feed) 317 self.do_switch(feed)
318 319 # switch_to should only be called when the ideal feed is requested to be 320 # changed, so not by watchdog reasons. 321
322 - def switch_to(self, feed):
323 """ 324 @param feed: a logical feed 325 """ 326 if feed not in self.logicalFeeds: 327 self.warning("unknown logical feed: %s", feed) 328 return None 329 330 self.debug('scheduling switch to feed %s', feed) 331 self.idealFeed = feed 332 # here we should bump this feed above others in feedsByPriority 333 self.feedsByPriority = [feed] 334 for name, aliases in self.get_logical_feeds(): 335 if name != feed: 336 self.feedsByPriority.append(name) 337 338 if not self.pipeline: 339 return 340 341 if self.is_active(feed): 342 self.do_switch() 343 else: 344 fmt = N_("Tried to switch to %s, but feed is unavailable. " 345 "Will retry when the feed is back.") 346 self.addWarning("temporary-switch-problem", fmt, feed)
347 348 # Switching multiple eaters is easy. The only trick is that we have 349 # to close the previous segment at the same running time, on both 350 # switch elements, and open the new segment at the same running 351 # time. The block()/switch() signal API on switch elements lets us 352 # do this. See the docs for switch's `block' and `switch' signals 353 # for more information. 354
355 - def do_switch(self, feed=None):
356 if feed == None: 357 feed = self.idealFeed 358 359 self.clearWarning('temporary-switch-problem') 360 if feed == self.activeFeed: 361 self.debug("already streaming from feed %r", feed) 362 return 363 if feed not in self.logicalFeeds: 364 self.warning("unknown logical feed: %s", feed) 365 return 366 367 # (pad, switch) 368 pairs = [self.switchPads[alias] 369 for alias in self.logicalFeeds[feed]] 370 371 stop_times = [e.emit('block') for p, e in pairs] 372 start_times = [p.get_property('running-time') for p, e in pairs] 373 374 stop_time = max(stop_times) 375 self.debug('stop time = %d', stop_time) 376 self.debug('stop time = %s', gst.TIME_ARGS(stop_time)) 377 378 if stop_time != gst.CLOCK_TIME_NONE: 379 diff = float(max(stop_times) - min(stop_times)) 380 if diff > gst.SECOND * 10: 381 fmt = N_("When switching to %s, feed timestamps out" 382 " of sync by %us") 383 self.addWarning('large-timestamp-difference', fmt, 384 feed, diff / gst.SECOND, priority=40) 385 386 start_time = min(start_times) 387 self.debug('start time = %s', gst.TIME_ARGS(start_time)) 388 389 self.debug('switching from %r to %r', self.activeFeed, feed) 390 for p, e in pairs: 391 self.debug("switching to pad %r", p) 392 e.emit('switch', p, stop_time, start_time) 393 394 self.activeFeed = feed 395 self.uiState.set("active-eater", feed)
396 397
398 -class SingleSwitch(Switch):
399 logCategory = "single-switch" 400
401 - def get_logical_feeds(self):
402 return [('master', ['master']), 403 ('backup', ['backup'])]
404
405 - def get_muxer_string(self, properties):
406 return ("input-selector name=muxer ! " 407 "identity silent=true single-segment=true name=iden ")
408
409 - def get_switch_elements(self, pipeline):
410 return [pipeline.get_by_name('muxer')]
411 412
413 -class AVSwitch(Switch):
414 logCategory = "av-switch" 415
416 - def init(self):
417 # property name -> caps property name 418 self.vparms = {'video-width': 'width', 'video-height': 'height', 419 'video-framerate': 'framerate', 420 'video-pixel-aspect-ratio': 'par'} 421 self.aparms = {'audio-channels': 'channels', 422 'audio-samplerate': 'samplerate'}
423
424 - def get_logical_feeds(self):
425 return [('master', ['video-master', 'audio-master']), 426 ('backup', ['video-backup', 'audio-backup'])]
427
428 - def get_switch_elements(self, pipeline):
429 # these have to be in the same order as the lists in 430 # get_logical_feeds 431 return [pipeline.get_by_name('vswitch'), 432 pipeline.get_by_name('aswitch')]
433
434 - def addError(self, id, format, *args, **kwargs):
435 self.warning(format, *args) 436 m = messages.Message(messages.ERROR, T_(format, *args), 437 id=id, **kwargs) 438 self.addMessage(m) 439 raise errors.ComponentSetupHandledError()
440
441 - def do_check(self):
442 propkeys = python.set(self.config['properties'].keys()) 443 vparms = python.set(self.vparms.keys()) 444 aparms = python.set(self.aparms.keys()) 445 446 for kind, parms in ('Video', vparms), ('Audio', aparms): 447 missing = parms - (propkeys & parms) 448 if missing and missing != parms: 449 fmt = N_("%s parameter(s) were specified but not all. " 450 "Missing parameters are: %r") 451 self.addError("video-params-not-specified", fmt, kind, 452 list(missing))
453
454 - def get_pipeline_string(self, properties):
455 456 def i420caps(framerate, par, width, height): 457 return ("video/x-raw-yuv,width=%d,height=%d,framerate=%d/%d," 458 "pixel-aspect-ratio=%d/%d,format=(fourcc)I420" 459 % (width, height, framerate[0], framerate[1], 460 par[0], par[1]))
461 462 def audiocaps(channels, samplerate): 463 return ("audio/x-raw-int,channels=%d,samplerate=%d,width=16," 464 "depth=16,signed=true" % (channels, samplerate))
465 466 def props2caps(proc, parms, prefix, suffix=' ! '): 467 kw = dict([(parms[prop], properties[prop]) 468 for prop in properties if prop in parms]) 469 if kw: 470 return prefix + proc(**kw) + suffix 471 else: 472 return '' 473 474 vforce = props2caps(i420caps, self.vparms, 475 "ffmpegcolorspace ! videorate ! videoscale " 476 "! capsfilter caps=") 477 aforce = props2caps(audiocaps, self.aparms, 478 "audioconvert ! audioconvert ! capsfilter caps=") 479 480 pipeline = ("input-selector name=vswitch" 481 " ! identity silent=true single-segment=true" 482 " ! @feeder:video@ " 483 "input-selector name=aswitch" 484 " ! identity silent=true single-segment=true" 485 " ! @feeder:audio@ ") 486 for alias in self.eaters: 487 if "video" in alias: 488 pipeline += '@eater:%s@ ! %s vswitch. ' % (alias, vforce) 489 elif "audio" in alias: 490 pipeline += '@eater:%s@ ! %s aswitch. ' % (alias, aforce) 491 else: 492 raise AssertionError() 493 494 return pipeline 495