Package flumotion :: Package admin :: Module multi
[hide private]

Source Code for Module flumotion.admin.multi

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """admin model used to connect to multiple managers""" 
 23   
 24  from twisted.internet import defer 
 25   
 26  from flumotion.common import log, planet, errors, startset, watched 
 27  from flumotion.admin import admin 
 28   
 29  __version__ = "$Rev$" 
 30   
 31   
32 -def get_admin_for_object(object):
33 import warnings 34 warnings.warn('Use getAdminForObject', DeprecationWarning, stacklevel=2) 35 return getAdminForObject(object)
36 37
38 -def getAdminForObject(object):
39 if object.get('parent'): 40 return get_admin_for_object(object.get('parent')) 41 else: 42 return object.admin
43 44
45 -class MultiAdminModel(log.Loggable):
46 logCategory = 'multiadmin' 47
48 - def __init__(self):
49 self.admins = watched.WatchedDict() # {managerId: AdminModel} 50 51 self._listeners = [] 52 self._reconnectHandlerIds = {} # managerId => [disconnect, id..] 53 self._startSet = startset.StartSet(self.admins.has_key, 54 errors.AlreadyConnectingError, 55 errors.AlreadyConnectedError)
56 57 # Listener implementation 58
59 - def emit(self, signal_name, *args, **kwargs):
60 self.debug('emit %r %r %r' % (signal_name, args, kwargs)) 61 assert signal_name != 'handler' 62 for c in self._listeners: 63 if getattr(c, 'model_handler', None): 64 c.model_handler(c, signal_name, *args, **kwargs) 65 elif getattr(c, 'model_%s' % signal_name): 66 getattr(c, 'model_%s' % signal_name)(*args, **kwargs) 67 else: 68 s = 'No model_%s in %r and no model_handler' % (signal_name, c) 69 raise NotImplementedError(s)
70
71 - def addListener(self, obj):
72 assert not obj in self._listeners 73 self._listeners.append(obj)
74
75 - def removeListener(self, obj):
76 self._listeners.remove(obj)
77
78 - def _managerConnected(self, admin):
79 if admin.managerId not in self._reconnectHandlerIds: 80 # the first time a manager is connected to, start listening 81 # for reconnections; intertwingled with removeManager() 82 ids = [] 83 ids.append(admin.connect('connected', 84 self._managerConnected)) 85 ids.append(admin.connect('disconnected', 86 self._managerDisconnected)) 87 self._reconnectHandlerIds[admin.managerId] = admin, ids 88 89 adminplanet = admin.planet 90 self.info('Connected to manager %s (planet %s)', 91 admin.managerId, adminplanet.get('name')) 92 assert admin.managerId not in self.admins 93 self.admins[admin.managerId] = admin 94 self.emit('addPlanet', admin, adminplanet)
95
96 - def _managerDisconnected(self, admin):
97 if admin.managerId in self.admins: 98 self.emit('removePlanet', admin, admin.planet) 99 del self.admins[admin.managerId] 100 else: 101 self.warning('Could not find admin model %r', admin)
102
103 - def addManager(self, connectionInfo, tenacious=False, 104 writeConnection=True):
105 i = connectionInfo 106 managerId = str(i) 107 108 # This dance of deferreds is here so as to make sure that 109 # removeManager can cancel a pending connection. 110 111 # can raise errors.AlreadyConnectingError or 112 # errors.AlreadyConnectedError 113 try: 114 startD = self._startSet.createStart(managerId) 115 except Exception, e: 116 return defer.fail(e) 117 118 a = admin.AdminModel() 119 connectD = a.connectToManager(i, tenacious, 120 writeConnection=writeConnection) 121 assert a.managerId == managerId 122 123 def connect_callback(_): 124 self._startSet.avatarStarted(managerId)
125 126 def connect_errback(failure): 127 self._startSet.avatarStopped(managerId, lambda _: failure)
128 129 connectD.addCallbacks(connect_callback, connect_errback) 130 131 def start_callback(_): 132 self._managerConnected(a) 133 134 def start_errback(failure): 135 a.shutdown() 136 return failure 137 138 startD.addCallbacks(start_callback, start_errback) 139 140 return startD 141
142 - def removeManager(self, managerId):
143 self.info('disconnecting from %s', managerId) 144 145 # Four cases: 146 # (1) We have no idea about this managerId, the caller is 147 # confused -- do nothing 148 # (2) We started connecting to this managerId, but never 149 # succeeded -- cancel pending connections 150 # (3) We connected at least once, and are connected now -- we 151 # have entries in the _reconnectHandlerIds and in self.admins -- 152 # disconnect from the signals, disconnect from the remote 153 # manager, and don't try to reconnect 154 # (4) We connected at least once, but are disconnected now -- we 155 # have an entry in _reconnectHandlerIds but not self.admins -- 156 # disconnect from the signals, and stop trying to reconnect 157 158 # stop listening to admin's signals, if the manager had actually 159 # connected at some point 160 if managerId in self._reconnectHandlerIds: 161 admin, handlerIds = self._reconnectHandlerIds.pop(managerId) 162 map(admin.disconnect, handlerIds) # (3) and (4) 163 if managerId not in self.admins: 164 admin.shutdown() # (4) 165 166 if managerId in self.admins: # (3) 167 admin = self.admins[managerId] 168 admin.shutdown() 169 self._managerDisconnected(admin) 170 171 # Firing this has the side effect of errbacking on any pending 172 # start, calling start_errback above if appropriate. (2) 173 self._startSet.avatarStopped( 174 managerId, lambda _: errors.ConnectionCancelledError()) 175 176 # always succeed, see (1) 177 return defer.succeed(managerId)
178
179 - def for_each_component(self, object, proc):
180 '''Call a procedure on each component that is a child of OBJECT''' 181 # ah, for multimethods... 182 if isinstance(object, planet.AdminPlanetState): 183 self.for_each_component(object.get('atmosphere'), proc) 184 for f in object.get('flows'): 185 self.for_each_component(f, proc) 186 elif (isinstance(object, planet.AdminAtmosphereState) or 187 isinstance(object, planet.AdminFlowState)): 188 for c in object.get('components'): 189 self.for_each_component(c, proc) 190 elif isinstance(object, planet.AdminComponentState): 191 proc(object)
192
193 - def do_component_op(self, object, op):
194 '''Call a method on the remote component object associated with 195 a component state''' 196 admin = get_admin_for_object(object) 197 198 def do_op(object): 199 admin.callRemote('component'+op, object)
200 self.for_each_component(object, do_op) 201