Package flumotion :: Package component :: Module padmonitor
[hide private]

Source Code for Module flumotion.component.padmonitor

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_feedcomponent010 -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  import time 
 23   
 24  import gst 
 25  from twisted.internet import reactor, defer 
 26   
 27  from flumotion.common import log 
 28  from flumotion.common.poller import Poller 
 29   
 30  __version__ = "$Rev: 7532 $" 
 31   
 32   
33 -class PadMonitor(log.Loggable):
34 PAD_MONITOR_PROBE_FREQUENCY = 5.0 35 PAD_MONITOR_TIMEOUT = PAD_MONITOR_PROBE_FREQUENCY * 2.5 36
37 - def __init__(self, pad, name, setActive, setInactive):
38 self._last_data_time = -1 39 self._pad = pad 40 self.name = name 41 self._active = False 42 self._first = True 43 self._running = True 44 45 self._doSetActive = [] 46 self._doSetInactive = [] 47 self.addWatch(setActive, setInactive) 48 49 # This dict sillyness is because python's dict operations are atomic 50 # w.r.t. the GIL. 51 self._probe_id = {} 52 53 self.check_poller = Poller(self._check_timeout, 54 self.PAD_MONITOR_PROBE_FREQUENCY, 55 immediately=True) 56 57 self.watch_poller = Poller(self._watch_timeout, 58 self.PAD_MONITOR_TIMEOUT)
59
60 - def logMessage(self, message, *args):
61 if self._first: 62 self.debug(message, *args) 63 else: 64 self.log(message, *args)
65
66 - def isActive(self):
67 return self._active
68
69 - def detach(self):
70 self.check_poller.stop() 71 self.watch_poller.stop() 72 self._running = False 73 74 # implementation closely tied to _check_timeout wrt to GIL 75 # tricks, threadsafety, and getting the probe deferred to 76 # actually return 77 d, probe_id = self._probe_id.pop("id", (None, None)) 78 if probe_id: 79 self._pad.remove_buffer_probe(probe_id) 80 d.callback(None)
81
82 - def _check_timeout(self):
83 84 def probe_cb(pad, buffer): 85 """ 86 Periodically scheduled buffer probe, that ensures that we're 87 currently actually having dataflow through our eater 88 elements. 89 90 Called from GStreamer threads. 91 92 @param pad: The gst.Pad srcpad for one eater in this 93 component. 94 @param buffer: A gst.Buffer that has arrived on this pad 95 """ 96 self._last_data_time = time.time() 97 98 self.logMessage('buffer probe on %s has timestamp %s', self.name, 99 gst.TIME_ARGS(buffer.timestamp)) 100 101 deferred, probe_id = self._probe_id.pop("id", (None, None)) 102 if probe_id: 103 # This will be None only if detach() has been called. 104 self._pad.remove_buffer_probe(probe_id) 105 106 reactor.callFromThread(deferred.callback, None) 107 # Data received! Return to happy ASAP: 108 reactor.callFromThread(self.watch_poller.run) 109 110 self._first = False 111 112 # let the buffer through 113 return True
114 115 d = defer.Deferred() 116 # FIXME: this is racy: evaluate RHS, drop GIL, buffer probe 117 # fires before __setitem__ in LHS; need a mutex 118 self._probe_id['id'] = (d, self._pad.add_buffer_probe(probe_cb)) 119 return d
120
121 - def _watch_timeout(self):
122 self.log('last buffer for %s at %r', self.name, self._last_data_time) 123 124 now = time.time() 125 126 if self._last_data_time < 0: 127 # We never received any data in the first timeout period... 128 self._last_data_time = 0 129 self.setInactive() 130 elif self._last_data_time == 0: 131 # still no data... 132 pass 133 else: 134 # We received data at some time in the past. 135 delta = now - self._last_data_time 136 137 if self._active and delta > self.PAD_MONITOR_TIMEOUT: 138 self.info("No data received on pad %s for > %r seconds, " 139 "setting to hungry", 140 self.name, self.PAD_MONITOR_TIMEOUT) 141 self.setInactive() 142 elif not self._active and delta < self.PAD_MONITOR_TIMEOUT: 143 self.info("Receiving data again on pad %s, flow active", 144 self.name) 145 self.setActive()
146
147 - def addWatch(self, setActive, setInactive):
148 self._doSetActive.append(setActive) 149 self._doSetInactive.append(setInactive)
150
151 - def setInactive(self):
152 self._active = False 153 for setInactive in self._doSetInactive: 154 setInactive(self.name)
155
156 - def setActive(self):
157 self._active = True 158 for setActive in self._doSetActive: 159 setActive(self.name)
160 161
162 -class EaterPadMonitor(PadMonitor):
163
164 - def __init__(self, pad, name, setActive, setInactive, 165 reconnectEater, *args):
166 PadMonitor.__init__(self, pad, name, setActive, setInactive) 167 168 self._reconnectPoller = Poller(lambda: reconnectEater(*args), 169 self.PAD_MONITOR_TIMEOUT, 170 start=False)
171
172 - def setInactive(self):
173 PadMonitor.setInactive(self) 174 175 # It might be that we got detached while calling 176 # PadMonitor.setInactive() For example someone might have 177 # stopped the component as it went hungry, which would happen 178 # inside the PadMonitor.setInactive() call. The component 179 # would then detach us and the reconnect poller would get 180 # stopped. If that happened don't bother restarting it, as it 181 # will result in the reactor ending up in an unclean state. 182 # 183 # A prominent example of such situation is 184 # flumotion.test.test_component_disker, where the component 185 # gets stopped right after it goes hungry 186 if self._running: 187 # If an eater received a buffer before being marked as 188 # disconnected, and still within the buffer check 189 # interval, the next eaterCheck call could accidentally 190 # think the eater was reconnected properly. Setting this 191 # to 0 here avoids that happening in eaterCheck. 192 self._last_data_time = 0 193 194 self.debug('starting the reconnect poller') 195 self._reconnectPoller.start(immediately=True)
196
197 - def setActive(self):
198 PadMonitor.setActive(self) 199 self.debug('stopping the reconnect poller') 200 self._reconnectPoller.stop()
201
202 - def detach(self):
203 PadMonitor.detach(self) 204 self.debug('stopping the reconnect poller') 205 self._reconnectPoller.stop()
206 207
208 -class PadMonitorSet(dict, log.Loggable):
209
210 - def __init__(self, setActive, setInactive):
211 # These callbacks will be called when the set as a whole is 212 # active or inactive. 213 self._doSetActive = setActive 214 self._doSetInactive = setInactive 215 self._wasActive = True
216
217 - def attach(self, pad, name, klass=PadMonitor, *args):
218 """ 219 Watch for data flow through this pad periodically. 220 If data flow ceases for too long, we turn hungry. If data flow resumes, 221 we return to happy. 222 """ 223 224 def monitorActive(name): 225 self.info('Pad data flow at %s is active', name) 226 if self.isActive() and not self._wasActive: 227 # The wasActive check is to prevent _doSetActive from being 228 # called happy initially because of this; only if we 229 # previously went inactive because of an inactive monitor. A 230 # curious interface. 231 self._wasActive = True 232 self._doSetActive()
233 234 def monitorInactive(name): 235 self.info('Pad data flow at %s is inactive', name) 236 if self._wasActive: 237 self._doSetInactive() 238 self._wasActive = False
239 240 assert name not in self 241 monitor = klass(pad, name, monitorActive, monitorInactive, *args) 242 self[monitor.name] = monitor 243 self.info("Added pad monitor %s", monitor.name) 244
245 - def remove(self, name):
246 if name not in self: 247 self.warning("No pad monitor with name %s", name) 248 return 249 250 monitor = self.pop(name) 251 monitor.detach()
252
253 - def isActive(self):
254 for monitor in self.values(): 255 if not monitor.isActive(): 256 return False 257 return True
258