Package flumotion :: Package common :: Module medium
[hide private]

Source Code for Module flumotion.common.medium

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """base classes for PB client-side mediums. 
 23  """ 
 24   
 25  import time 
 26   
 27  from twisted.spread import pb 
 28  from twisted.internet import defer 
 29  from zope.interface import implements 
 30   
 31  from flumotion.common import log, interfaces, bundleclient, errors 
 32  from flumotion.common import messages 
 33  from flumotion.common.netutils import addressGetHost 
 34  from flumotion.configure import configure 
 35  from flumotion.twisted import pb as fpb 
 36  from flumotion.twisted.compat import reactor 
 37   
 38  __version__ = "$Rev$" 
 39   
 40   
41 -class BaseMedium(fpb.Referenceable):
42 """ 43 I am a base interface for PB clients interfacing with PB server-side 44 avatars. 45 Used by admin/worker/component to talk to manager's vishnu, 46 and by job to talk to worker's brain. 47 48 @ivar remote: a remote reference to the server-side object on 49 which perspective_(methodName) methods can be called 50 @type remote: L{twisted.spread.pb.RemoteReference} 51 @type bundleLoader: L{flumotion.common.bundleclient.BundleLoader} 52 """ 53 54 # subclasses will need to set this to the specific medium type 55 # tho... 56 implements(interfaces.IMedium) 57 logCategory = "basemedium" 58 remoteLogName = "baseavatar" 59 60 remote = None 61 bundleLoader = None 62
63 - def setRemoteReference(self, remoteReference):
64 """ 65 Set the given remoteReference as the reference to the server-side 66 avatar. 67 68 @param remoteReference: L{twisted.spread.pb.RemoteReference} 69 """ 70 self.debug('%r.setRemoteReference: %r' % (self, remoteReference)) 71 self.remote = remoteReference 72 73 def nullRemote(x): 74 self.debug('%r: disconnected from %r' % (self, self.remote)) 75 self.remote = None
76 self.remote.notifyOnDisconnect(nullRemote) 77 78 self.bundleLoader = bundleclient.BundleLoader(self.callRemote) 79 80 # figure out connection addresses if it's an internet address 81 tarzan = None 82 jane = None 83 try: 84 transport = remoteReference.broker.transport 85 tarzan = transport.getHost() 86 jane = transport.getPeer() 87 except Exception, e: 88 self.debug("could not get connection info, reason %r" % e) 89 if tarzan and jane: 90 self.debug("connection is from me on %s to remote on %s" % ( 91 addressGetHost(tarzan), 92 addressGetHost(jane)))
93
94 - def hasRemoteReference(self):
95 """ 96 Does the medium have a remote reference to a server-side avatar ? 97 """ 98 return self.remote != None
99
100 - def callRemoteLogging(self, level, stackDepth, name, *args, **kwargs):
101 """ 102 Call the given method with the given arguments remotely on the 103 server-side avatar. 104 105 Gets serialized to server-side perspective_ methods. 106 107 @param level: the level we should log at (log.DEBUG, log.INFO, etc) 108 @type level: int 109 @param stackDepth: the number of stack frames to go back to get 110 file and line information, negative or zero. 111 @type stackDepth: non-positive int 112 @param name: name of the remote method 113 @type name: str 114 """ 115 if level is not None: 116 debugClass = str(self.__class__).split(".")[-1].upper() 117 startArgs = [self.remoteLogName, debugClass, name] 118 format, debugArgs = log.getFormatArgs( 119 '%s --> %s: callRemote(%s, ', startArgs, 120 ')', (), args, kwargs) 121 logKwArgs = self.doLog(level, stackDepth - 1, 122 format, *debugArgs) 123 124 if not self.remote: 125 self.warning('Tried to callRemote(%s), but we are disconnected' 126 % name) 127 return defer.fail(errors.NotConnectedError()) 128 129 def callback(result): 130 format, debugArgs = log.getFormatArgs( 131 '%s <-- %s: callRemote(%s, ', startArgs, 132 '): %s', (log.ellipsize(result), ), args, kwargs) 133 self.doLog(level, -1, format, *debugArgs, **logKwArgs) 134 return result
135 136 def errback(failure): 137 format, debugArgs = log.getFormatArgs( 138 '%s <-- %s: callRemote(%s, ', startArgs, 139 '): %r', (failure, ), args, kwargs) 140 self.doLog(level, -1, format, *debugArgs, **logKwArgs) 141 return failure 142 143 d = self.remote.callRemote(name, *args, **kwargs) 144 if level is not None: 145 d.addCallbacks(callback, errback) 146 return d 147
148 - def callRemote(self, name, *args, **kwargs):
149 """ 150 Call the given method with the given arguments remotely on the 151 server-side avatar. 152 153 Gets serialized to server-side perspective_ methods. 154 """ 155 return self.callRemoteLogging(log.DEBUG, -1, name, *args, 156 **kwargs)
157
158 - def getBundledFunction(self, module, function):
159 """ 160 Returns the given function in the given module, loading the 161 module from a bundle. 162 163 If we can't find the bundle for the given module, or if the 164 given module does not contain the requested function, we will 165 raise L{flumotion.common.errors.RemoteRunError} (perhaps a 166 poorly chosen error). If importing the module raises an 167 exception, that exception will be passed through unmodified. 168 169 @param module: module the function lives in 170 @type module: str 171 @param function: function to run 172 @type function: str 173 174 @returns: a callable, the given function in the given module. 175 """ 176 177 def gotModule(mod): 178 if hasattr(mod, function): 179 return getattr(mod, function) 180 else: 181 msg = 'No procedure named %s in module %s' % (function, 182 module) 183 self.warning('%s', msg) 184 raise errors.RemoteRunError(msg)
185 186 def gotModuleError(failure): 187 failure.trap(errors.NoBundleError) 188 msg = 'Failed to find bundle for module %s' % module 189 self.warning('%s', msg) 190 raise errors.RemoteRunError(msg) 191 192 d = self.bundleLoader.loadModule(module) 193 d.addCallbacks(gotModule, gotModuleError) 194 return d 195
196 - def runBundledFunction(self, module, function, *args, **kwargs):
197 """ 198 Runs the given function in the given module with the given 199 arguments. 200 201 This method calls getBundledFunction and then invokes the 202 function. Any error raised by getBundledFunction or by invoking 203 the function will be passed through unmodified. 204 205 Callers that expect to return their result over a PB connection 206 should catch nonserializable exceptions so as to prevent nasty 207 backtraces in the logs. 208 209 @param module: module the function lives in 210 @type module: str 211 @param function: function to run 212 @type function: str 213 214 @returns: the return value of the given function in the module. 215 """ 216 self.debug('runBundledFunction(%r, %r)', module, function) 217 218 def gotFunction(proc): 219 220 def invocationError(failure): 221 self.warning('Exception raised while calling ' 222 '%s.%s(*args=%r, **kwargs=%r): %s', 223 module, function, args, kwargs, 224 log.getFailureMessage(failure)) 225 return failure
226 227 self.debug('calling %s.%s(%r, %r)', module, function, args, 228 kwargs) 229 d = defer.maybeDeferred(proc, *args, **kwargs) 230 d.addErrback(invocationError) 231 return d 232 233 d = self.getBundledFunction(module, function) 234 d.addCallback(gotFunction) 235 return d 236 237
238 -class PingingMedium(BaseMedium):
239 _pingInterval = configure.heartbeatInterval 240 _pingCheckInterval = (configure.heartbeatInterval * 241 configure.pingTimeoutMultiplier) 242 _pingDC = None 243 _clock = reactor 244
245 - def startPinging(self, disconnect):
246 """ 247 @param disconnect: a method to call when we do not get ping replies 248 @type disconnect: callable 249 """ 250 self.debug('startPinging') 251 self._lastPingback = self._clock.seconds() 252 if self._pingDC: 253 self.debug("Cannot start pinging, already pinging") 254 return 255 self._pingDisconnect = disconnect 256 self._ping() 257 self._pingCheck()
258
259 - def _ping(self):
260 261 def pingback(result): 262 self._lastPingback = self._clock.seconds() 263 self.log('pinged, pingback at %r' % self._lastPingback)
264 265 def pingFailed(failure): 266 # ignoring the connection failures so they don't end up in 267 # the logs - we'll notice the lack of pingback eventually 268 failure.trap(pb.PBConnectionLost) 269 self.log('ping failed: %s' % log.getFailureMessage(failure))
270 271 if self.remote: 272 self.log('pinging') 273 d = self.callRemoteLogging(log.LOG, 0, 'ping') 274 d.addCallbacks(pingback, pingFailed) 275 else: 276 self.info('tried to ping, but disconnected yo') 277 278 self._pingDC = self._clock.callLater(self._pingInterval, 279 self._ping) 280
281 - def remoteMessageReceived(self, broker, message, args, kw):
282 self._lastPingback = self._clock.seconds() 283 return BaseMedium.remoteMessageReceived( 284 self, broker, message, args, kw)
285
286 - def callRemoteLogging(self, level, stackDepth, name, *args, **kwargs):
287 d = BaseMedium.callRemoteLogging( 288 self, level, stackDepth, name, *args, **kwargs) 289 290 def cb(result): 291 self._lastPingback = self._clock.seconds() 292 return result
293 d.addCallback(cb) 294 return d 295
296 - def _pingCheck(self):
297 self._pingCheckDC = None 298 if (self.remote and 299 ((self._clock.seconds() - self._lastPingback) > 300 self._pingCheckInterval)): 301 self.info('no pingback in %f seconds, closing connection', 302 self._pingCheckInterval) 303 self._pingDisconnect() 304 else: 305 self._pingCheckDC = self._clock.callLater(self._pingCheckInterval, 306 self._pingCheck)
307
308 - def stopPinging(self):
309 if self._pingCheckDC: 310 self._pingCheckDC.cancel() 311 self._pingCheckDC = None 312 313 if self._pingDC: 314 self._pingDC.cancel() 315 self._pingDC = None
316
317 - def _disconnect(self):
318 if self.remote: 319 self.remote.broker.transport.loseConnection()
320
321 - def setRemoteReference(self, remote, clock=reactor):
322 self._clock = clock 323 324 BaseMedium.setRemoteReference(self, remote) 325 326 def stopPingingCb(x): 327 self.debug('stop pinging') 328 self.stopPinging()
329 self.remote.notifyOnDisconnect(stopPingingCb) 330 331 self.startPinging(self._disconnect) 332
333 - def remote_writeFluDebugMarker(self, level, marker):
334 """ 335 Sets a marker that will be prefixed to the log strings. Setting this 336 marker to multiple elements at a time helps debugging. 337 @param marker: A string to prefix all the log strings. 338 @type marker: str 339 """ 340 self.writeMarker(marker, level)
341