1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """
23 manager implementation and related classes
24
25 API Stability: semi-stable
26
27 @var LOCAL_IDENTITY: an identity for the manager itself; can be used
28 to compare against to verify that the manager
29 requested an action
30 @type LOCAL_IDENTITY: L{LocalIdentity}
31 """
32
33 import os
34
35 from twisted.internet import reactor, defer
36 from twisted.python import components, failure
37 from twisted.spread import pb
38 from twisted.cred import portal
39 from zope.interface import implements
40
41 from flumotion.common import errors, interfaces, log, registry
42 from flumotion.common import planet, common, dag, messages, reflectcall, server
43 from flumotion.common.i18n import N_, gettexter
44 from flumotion.common.identity import RemoteIdentity, LocalIdentity
45 from flumotion.common.netutils import addressGetHost
46 from flumotion.common.planet import moods
47 from flumotion.configure import configure
48 from flumotion.manager import admin, component, worker, base, config
49 from flumotion.twisted import checkers
50 from flumotion.twisted import portal as fportal
51 from flumotion.project import project
52
53 __all__ = ['ManagerServerFactory', 'Vishnu']
54 __version__ = "$Rev$"
55 T_ = gettexter()
56 LOCAL_IDENTITY = LocalIdentity('manager')
57
58
59
60
61
63 """
64 I implement L{twisted.cred.portal.IRealm}.
65 I make sure that when a L{pb.Avatar} is requested through me, the
66 Avatar being returned knows about the mind (client) requesting
67 the Avatar.
68 """
69
70 implements(portal.IRealm)
71
72 logCategory = 'dispatcher'
73
75 """
76 @param computeIdentity: see L{Vishnu.computeIdentity}
77 @type computeIdentity: callable
78 """
79 self._interfaceHeavens = {}
80 self._computeIdentity = computeIdentity
81 self._bouncer = None
82 self._avatarKeycards = {}
83
85 """
86 @param bouncer: the bouncer to authenticate with
87 @type bouncer: L{flumotion.component.bouncers.bouncer}
88 """
89 self._bouncer = bouncer
90
92 """
93 Register a Heaven as managing components with the given interface.
94
95 @type interface: L{twisted.python.components.Interface}
96 @param interface: a component interface to register the heaven with.
97 """
98 assert isinstance(heaven, base.ManagerHeaven)
99
100 self._interfaceHeavens[interface] = heaven
101
102
103
129
130 return (pb.IPerspective, avatar, cleanup)
131
132 def got_error(failure):
133
134
135
136
137
138 reactor.callLater(0, mind.broker.transport.loseConnection)
139 return failure
140
141 if pb.IPerspective not in ifaces:
142 raise errors.NoPerspectiveError(avatarId)
143 if len(ifaces) != 2:
144
145 raise errors.NoPerspectiveError(avatarId)
146 iface = [x for x in ifaces if x != pb.IPerspective][0]
147 if iface not in self._interfaceHeavens:
148 self.warning('unknown interface %r', iface)
149 raise errors.NoPerspectiveError(avatarId)
150
151 heaven = self._interfaceHeavens[iface]
152 klass = heaven.avatarClass
153 host = addressGetHost(mind.broker.transport.getPeer())
154 d = self._computeIdentity(keycard, host)
155 d.addCallback(lambda identity: \
156 klass.makeAvatar(heaven, avatarId, identity, mind))
157 d.addCallbacks(got_avatar, got_error)
158 return d
159
160
162 """
163 I am an object that ties together different objects related to a
164 component. I am used as values in a lookup hash in the vishnu.
165 """
166
168 self.state = None
169 self.id = None
170 self.avatar = None
171 self.jobState = None
172
173
175 """
176 I am the toplevel manager object that knows about all
177 heavens and factories.
178
179 @cvar dispatcher: dispatcher to create avatars
180 @type dispatcher: L{Dispatcher}
181 @cvar workerHeaven: the worker heaven
182 @type workerHeaven: L{worker.WorkerHeaven}
183 @cvar componentHeaven: the component heaven
184 @type componentHeaven: L{component.ComponentHeaven}
185 @cvar adminHeaven: the admin heaven
186 @type adminHeaven: L{admin.AdminHeaven}
187 @cvar configDir: the configuration directory for
188 this Vishnu's manager
189 @type configDir: str
190 """
191
192 implements(server.IServable)
193
194 logCategory = "vishnu"
195
196 - def __init__(self, name, unsafeTracebacks=0, configDir=None):
212 reactor.addSystemEventTrigger('before', 'shutdown', setStopped)
213
214 if configDir is not None:
215 self.configDir = configDir
216 else:
217 self.configDir = os.path.join(configure.configdir,
218 "managers", name)
219
220 self.bouncer = None
221
222 self.bundlerBasket = registry.getRegistry().makeBundlerBasket()
223
224 self._componentMappers = {}
225
226 self.state = planet.ManagerPlanetState()
227 self.state.set('name', name)
228 self.state.set('version', configure.version)
229
230 self.plugs = {}
231
232
233
234 self.portal = fportal.BouncerPortal(self.dispatcher, None)
235
236 self.factory = pb.PBServerFactory(self.portal,
237 unsafeTracebacks=unsafeTracebacks)
238 self.connectionInfo = {}
239 self.setConnectionInfo(None, None, None)
240
242 """Cancel any pending operations in preparation for shutdown.
243
244 This method is mostly useful for unit tests; currently, it is
245 not called during normal operation. Note that the caller is
246 responsible for stopping listening on the port, as the the
247 manager does not have a handle on the twisted port object.
248
249 @returns: A deferred that will fire when the manager has shut
250 down.
251 """
252 if self.bouncer:
253 return self.bouncer.stop()
254 else:
255 return defer.succeed(None)
256
260
262 """Returns the manager's configuration as a string suitable for
263 importing via loadConfiguration().
264 """
265 return config.exportPlanetXml(self.state)
266
284
285 - def addMessage(self, level, mid, format, *args, **kwargs):
286 """
287 Convenience message to construct a message and add it to the
288 planet state. `format' should be marked as translatable in the
289 source with N_, and *args will be stored as format arguments.
290 Keyword arguments are passed on to the message constructor. See
291 L{flumotion.common.messages.Message} for the meanings of the
292 rest of the arguments.
293
294 For example::
295
296 self.addMessage(messages.WARNING, 'foo-warning',
297 N_('The answer is %d'), 42, debug='not really')
298 """
299 self.addMessageObject(messages.Message(level,
300 T_(format, *args),
301 mid=mid, **kwargs))
302
304 """
305 Add a message to the planet state.
306
307 @type message: L{flumotion.common.messages.Message}
308 """
309 self.state.setitem('messages', message.id, message)
310
312 """
313 Clear any messages with the given message ID from the planet
314 state.
315
316 @type mid: message ID, normally a str
317 """
318 if mid in self.state.get('messages'):
319 self.state.delitem('messages', mid)
320
322 """
323 @param identity: L{flumotion.common.identity.Identity}
324 """
325 socket = 'flumotion.component.plugs.adminaction.AdminActionPlug'
326 if socket in self.plugs:
327 for plug in self.plugs[socket]:
328 plug.action(identity, message, args, kw)
329
331 """
332 Compute a suitable identity for a remote host. First looks to
333 see if there is a
334 L{flumotion.component.plugs.identity.IdentityProviderPlug} plug
335 installed on the manager, falling back to user@host.
336
337 The identity is only used in the adminaction interface. An
338 example of its use is when you have an adminaction plug that
339 checks an admin's privileges before actually doing an action;
340 the identity object you use here might store the privileges that
341 the admin has.
342
343 @param keycard: the keycard that the remote host used to log in.
344 @type keycard: L{flumotion.common.keycards.Keycard}
345 @param remoteHost: the ip of the remote host
346 @type remoteHost: str
347
348 @rtype: a deferred that will fire a
349 L{flumotion.common.identity.RemoteIdentity}
350 """
351
352 socket = 'flumotion.component.plugs.identity.IdentityProviderPlug'
353 if socket in self.plugs:
354 for plug in self.plugs[socket]:
355 identity = plug.computeIdentity(keycard, remoteHost)
356 if identity:
357 return identity
358 username = getattr(keycard, 'username', None)
359 return defer.succeed(RemoteIdentity(username, remoteHost))
360
362 """
363 Add a component state for the given component config entry.
364
365 @rtype: L{flumotion.common.planet.ManagerComponentState}
366 """
367
368 self.debug('adding component %s to %s'
369 % (conf.name, parent.get('name')))
370
371 if identity != LOCAL_IDENTITY:
372 self.adminAction(identity, '_addComponent', (conf, parent), {})
373
374 state = planet.ManagerComponentState()
375 state.set('name', conf.name)
376 state.set('type', conf.getType())
377 state.set('workerRequested', conf.worker)
378 state.setMood(moods.sleeping.value)
379 state.set('config', conf.getConfigDict())
380
381 state.set('parent', parent)
382 parent.append('components', state)
383
384 avatarId = conf.getConfigDict()['avatarId']
385
386 self.clearMessage('loadComponent-%s' % avatarId)
387
388 configDict = conf.getConfigDict()
389 projectName = configDict['project']
390 versionTuple = configDict['version']
391
392 projectVersion = None
393 try:
394 projectVersion = project.get(projectName, 'version')
395 except errors.NoProjectError:
396 m = messages.Warning(T_(N_(
397 "This component is configured for Flumotion project '%s', "
398 "but that project is not installed.\n"),
399 projectName))
400 state.append('messages', m)
401
402 if projectVersion:
403 self.debug('project %s, version %r, project version %r' % (
404 projectName, versionTuple, projectVersion))
405 if not common.checkVersionsCompat(
406 versionTuple,
407 common.versionStringToTuple(projectVersion)):
408 m = messages.Warning(T_(N_(
409 "This component is configured for "
410 "Flumotion '%s' version %s, "
411 "but you are running version %s.\n"
412 "Please update the configuration of the component.\n"),
413 projectName, common.versionTupleToString(versionTuple),
414 projectVersion))
415 state.append('messages', m)
416
417
418 m = ComponentMapper()
419 m.state = state
420 m.id = avatarId
421 self._componentMappers[state] = m
422 self._componentMappers[avatarId] = m
423
424 return state
425
427 """
428 Add a new config object into the planet state.
429
430 @returns: a list of all components added
431 @rtype: list of L{flumotion.common.planet.ManagerComponentState}
432 """
433
434 self.debug('syncing up planet state with config')
435 added = []
436
437 def checkNotRunning(comp, parentState):
438 name = comp.getName()
439
440 comps = dict([(x.get('name'), x)
441 for x in parentState.get('components')])
442 runningComps = dict([(x.get('name'), x)
443 for x in parentState.get('components')
444 if x.get('mood') != moods.sleeping.value])
445 if name not in comps:
446
447 return True
448 elif name not in runningComps:
449
450
451 oldComp = comps[name]
452 self.deleteComponent(oldComp)
453 return True
454
455
456
457
458 parent = comps[name].get('parent').get('name')
459 newConf = c.getConfigDict()
460 oldConf = comps[name].get('config')
461
462 if newConf == oldConf:
463 self.debug('%s already has component %s running with '
464 'same configuration', parent, name)
465 self.clearMessage('loadComponent-%s' % oldConf['avatarId'])
466 return False
467
468 self.info('%s already has component %s, but configuration '
469 'not the same -- notifying admin', parent, name)
470
471 diff = config.dictDiff(oldConf, newConf)
472 diffMsg = config.dictDiffMessageString(diff, 'existing', 'new')
473
474 self.addMessage(messages.WARNING,
475 'loadComponent-%s' % oldConf['avatarId'],
476 N_('Could not load component %r into %r: '
477 'a component is already running with '
478 'this name, but has a different '
479 'configuration.'), name, parent,
480 debug=diffMsg)
481 return False
482
483 state = self.state
484 atmosphere = state.get('atmosphere')
485 for c in conf.atmosphere.components.values():
486 if checkNotRunning(c, atmosphere):
487 added.append(self._addComponent(c, atmosphere, identity))
488
489 flows = dict([(x.get('name'), x) for x in state.get('flows')])
490 for f in conf.flows:
491 if f.name in flows:
492 flow = flows[f.name]
493 else:
494 self.info('creating flow %r', f.name)
495 flow = planet.ManagerFlowState(name=f.name, parent=state)
496 state.append('flows', flow)
497
498 for c in f.components.values():
499 if checkNotRunning(c, flow):
500 added.append(self._addComponent(c, flow, identity))
501
502 return added
503
505
506
507 componentsToStart = {}
508 for c in components:
509 workerId = c.get('workerRequested')
510 if not workerId in componentsToStart:
511 componentsToStart[workerId] = []
512 componentsToStart[workerId].append(c)
513 self.debug('_startComponents: componentsToStart %r' %
514 (componentsToStart, ))
515
516 for workerId, componentStates in componentsToStart.items():
517 self._workerCreateComponents(workerId, componentStates)
518
525
527 """
528 Load the configuration from the given XML, merging it on top of
529 the currently running configuration.
530
531 @param file: file to parse, either as an open file object,
532 or as the name of a file to open
533 @type file: str or file
534 @param identity: The identity making this request.. This is used by the
535 adminaction logging mechanism in order to say who is
536 performing the action.
537 @type identity: L{flumotion.common.identity.Identity}
538 """
539 self.debug('loading configuration')
540 mid = 'loadComponent-parse-error'
541 if isinstance(file, str):
542 mid += '-%s' % file
543 try:
544 self.clearMessage(mid)
545 conf = config.PlanetConfigParser(file)
546 conf.parse()
547 return self._loadComponentConfiguration(conf, identity)
548 except errors.ConfigError, e:
549 self.addMessage(messages.WARNING, mid,
550 N_('Invalid component configuration.'),
551 debug=e.args[0])
552 return defer.fail(e)
553 except errors.UnknownComponentError, e:
554 if isinstance(file, str):
555 debug = 'Configuration loaded from file %r' % file
556 else:
557 debug = 'Configuration loaded remotely'
558 self.addMessage(messages.WARNING, mid,
559 N_('Unknown component in configuration: %s.'),
560 e.args[0], debug=debug)
561 return defer.fail(e)
562 except Exception, e:
563 self.addMessage(messages.WARNING, mid,
564 N_('Unknown error while loading configuration.'),
565 debug=log.getExceptionMessage(e))
566 return defer.fail(e)
567
584
590
612
613 def setupErrback(failure):
614 self.warning('Error starting manager bouncer')
615 d.addCallbacks(setupCallback, setupErrback)
616 return d
617
634
635 __pychecker__ = 'maxargs=11'
636
637 - def loadComponent(self, identity, componentType, componentId,
638 componentLabel, properties, workerName,
639 plugs, eaters, isClockMaster, virtualFeeds):
640 """
641 Load a component into the manager configuration.
642
643 See L{flumotion.manager.admin.AdminAvatar.perspective_loadComponent}
644 for a definition of the argument types.
645 """
646 self.debug('loading %s component %s on %s',
647 componentType, componentId, workerName)
648 parentName, compName = common.parseComponentId(componentId)
649
650 if isClockMaster:
651 raise NotImplementedError("Clock master components are not "
652 "yet supported")
653 if worker is None:
654 raise errors.ConfigError("Component %r needs to specify the"
655 " worker on which it should run"
656 % componentId)
657
658 state = self.state
659 compState = None
660
661 compConf = config.ConfigEntryComponent(compName, parentName,
662 componentType,
663 componentLabel,
664 properties,
665 plugs, workerName,
666 eaters, isClockMaster,
667 None, None, virtualFeeds)
668
669 if compConf.defs.getNeedsSynchronization():
670 raise NotImplementedError("Components that need "
671 "synchronization are not yet "
672 "supported")
673
674 if parentName == 'atmosphere':
675 parentState = state.get('atmosphere')
676 else:
677 flows = dict([(x.get('name'), x) for x in state.get('flows')])
678 if parentName in flows:
679 parentState = flows[parentName]
680 else:
681 self.info('creating flow %r', parentName)
682 parentState = planet.ManagerFlowState(name=parentName,
683 parent=state)
684 state.append('flows', parentState)
685
686 components = [x.get('name') for x in parentState.get('components')]
687 if compName in components:
688 self.debug('%r already has component %r', parentName, compName)
689 raise errors.ComponentAlreadyExistsError(compName)
690
691 compState = self._addComponent(compConf, parentState, identity)
692
693 self._startComponents([compState], identity)
694
695 return compState
696
698 """
699 Create a heaven of the given klass that will send avatars to clients
700 implementing the given medium interface.
701
702 @param interface: the medium interface to create a heaven for
703 @type interface: L{flumotion.common.interfaces.IMedium}
704 @param klass: the type of heaven to create
705 @type klass: an implementor of L{flumotion.common.interfaces.IHeaven}
706 """
707 assert issubclass(interface, interfaces.IMedium)
708 heaven = klass(self)
709 self.dispatcher.registerHeaven(heaven, interface)
710 return heaven
711
713 """
714 @type bouncer: L{flumotion.component.bouncers.bouncer.Bouncer}
715 """
716 if self.bouncer:
717 self.warning("manager already had a bouncer, setting anyway")
718
719 self.bouncer = bouncer
720 self.portal.bouncer = bouncer
721 self.dispatcher.setBouncer(bouncer)
722
725
754
769
770 def stopLost():
771
772 def gotComponents(comps):
773 return avatarId in comps
774
775 def gotJobRunning(running):
776 if running:
777 self.warning('asked to stop lost component %r, but '
778 'it is still running', avatarId)
779
780
781 msg = "Cannot stop lost component which is still running."
782 raise errors.ComponentMoodError(msg)
783 else:
784 self.debug('component %r seems to be really lost, '
785 'setting to sleeping')
786 componentState.setMood(moods.sleeping.value)
787 componentState.set('moodPending', None)
788 return None
789
790 self.debug('asked to stop a lost component without avatar')
791 workerName = componentState.get('workerRequested')
792 if workerName and self.workerHeaven.hasAvatar(workerName):
793 self.debug('checking if component has job process running')
794 d = self.workerHeaven.getAvatar(workerName).getComponents()
795 d.addCallback(gotComponents)
796 d.addCallback(gotJobRunning)
797 return d
798 else:
799 self.debug('component lacks a worker, setting to sleeping')
800 d = defer.maybeDeferred(gotJobRunning, False)
801 return d
802
803 def stopUnknown():
804 msg = ('asked to stop a component without avatar in mood %s'
805 % moods.get(mood))
806 self.warning(msg)
807 return defer.fail(errors.ComponentMoodError(msg))
808
809 mood = componentState.get('mood')
810 stoppers = {moods.sad.value: stopSad,
811 moods.lost.value: stopLost}
812 return stoppers.get(mood, stopUnknown)()
813
815
816
817 d = componentAvatar.stop()
818
819 return d
820
851
853 """
854 Set the given message on the given component's state.
855 Can be called e.g. by a worker to report on a crashed component.
856 Sets the mood to sad if it is an error message.
857 """
858 if not avatarId in self._componentMappers:
859 self.warning('asked to set a message on non-mapped component %s' %
860 avatarId)
861 return
862
863 m = self._componentMappers[avatarId]
864 m.state.append('messages', message)
865 if message.level == messages.ERROR:
866 self.debug('Error message makes component sad')
867 m.state.setMood(moods.sad.value)
868
869
870
872
873 workerId = workerAvatar.avatarId
874 self.debug('vishnu.workerAttached(): id %s' % workerId)
875
876
877
878
879 components = [c for c in self._getComponentsToCreate()
880 if c.get('workerRequested') in (workerId, None)]
881
882
883
884
885 d = workerAvatar.getComponents()
886
887 def workerAvatarComponentListReceived(workerComponents):
888
889 lostComponents = list([c for c in self.getComponentStates()
890 if c.get('workerRequested') == workerId and \
891 c.get('mood') == moods.lost.value])
892 for comp in workerComponents:
893
894
895 if comp in self._componentMappers:
896 compState = self._componentMappers[comp].state
897 if compState in components:
898 components.remove(compState)
899 if compState in lostComponents:
900 lostComponents.remove(compState)
901
902 for compState in lostComponents:
903 self.info(
904 "Restarting previously lost component %s on worker %s",
905 self._componentMappers[compState].id, workerId)
906
907
908
909 compState.set('moodPending', None)
910 compState.setMood(moods.sleeping.value)
911
912 allComponents = components + lostComponents
913
914 if not allComponents:
915 self.debug(
916 "vishnu.workerAttached(): no components for this worker")
917 return
918
919 self._workerCreateComponents(workerId, allComponents)
920 d.addCallback(workerAvatarComponentListReceived)
921
922 reactor.callLater(0, self.componentHeaven.feedServerAvailable,
923 workerId)
924
926 """
927 Create the list of components on the given worker, sequentially, but
928 in no specific order.
929
930 @param workerId: avatarId of the worker
931 @type workerId: string
932 @param components: components to start
933 @type components: list of
934 L{flumotion.common.planet.ManagerComponentState}
935 """
936 self.debug("_workerCreateComponents: workerId %r, components %r" % (
937 workerId, components))
938
939 if not workerId in self.workerHeaven.avatars:
940 self.debug('worker %s not logged in yet, delaying '
941 'component start' % workerId)
942 return defer.succeed(None)
943
944 workerAvatar = self.workerHeaven.avatars[workerId]
945
946 d = defer.Deferred()
947
948 for c in components:
949 componentType = c.get('type')
950 conf = c.get('config')
951 self.debug('scheduling create of %s on %s'
952 % (conf['avatarId'], workerId))
953 d.addCallback(self._workerCreateComponentDelayed,
954 workerAvatar, c, componentType, conf)
955
956 d.addCallback(lambda result: self.debug(
957 '_workerCreateComponents(): completed setting up create chain'))
958
959
960 self.debug('_workerCreateComponents(): triggering create chain')
961 d.callback(None)
962
963 return d
964
981
982
983
984
986 self.debug('got avatarId %s for state %s' % (result, componentState))
987 m = self._componentMappers[componentState]
988 assert result == m.id, "received id %s is not the expected id %s" % (
989 result, m.id)
990
1014
1016
1017 workerId = workerAvatar.avatarId
1018 self.debug('vishnu.workerDetached(): id %s' % workerId)
1019
1036
1038
1039 m = (self.getComponentMapper(componentAvatar.avatarId)
1040 or ComponentMapper())
1041
1042 m.state = componentAvatar.componentState
1043 m.jobState = componentAvatar.jobState
1044 m.id = componentAvatar.avatarId
1045 m.avatar = componentAvatar
1046
1047 self._componentMappers[m.state] = m
1048 self._componentMappers[m.jobState] = m
1049 self._componentMappers[m.id] = m
1050 self._componentMappers[m.avatar] = m
1051
1053
1054
1055 self.debug('unregisterComponent(%r): cleaning up state' %
1056 componentAvatar)
1057
1058 m = self._componentMappers[componentAvatar]
1059
1060
1061 try:
1062 del self._componentMappers[m.jobState]
1063 except KeyError:
1064 self.warning('Could not remove jobState for %r' % componentAvatar)
1065 m.jobState = None
1066
1067 m.state.set('pid', None)
1068 m.state.set('workerName', None)
1069 m.state.set('moodPending', None)
1070
1071
1072 del self._componentMappers[m.avatar]
1073 m.avatar = None
1074
1076 cList = self.state.getComponents()
1077 self.debug('getComponentStates(): %d components' % len(cList))
1078 for c in cList:
1079 self.log(repr(c))
1080 mood = c.get('mood')
1081 if mood == None:
1082 self.warning('%s has mood None' % c.get('name'))
1083
1084 return cList
1085
1087 """
1088 Empty the planet of the given component.
1089
1090 @returns: a deferred that will fire when all listeners have been
1091 notified of the removal of the component.
1092 """
1093 self.debug('deleting component %r from state', componentState)
1094 c = componentState
1095 if c not in self._componentMappers:
1096 raise errors.UnknownComponentError(c)
1097
1098 flow = componentState.get('parent')
1099 if (c.get('moodPending') != None
1100 or c.get('mood') is not moods.sleeping.value):
1101 raise errors.BusyComponentError(c)
1102
1103 del self._componentMappers[self._componentMappers[c].id]
1104 del self._componentMappers[c]
1105 return flow.remove('components', c)
1106
1108 for flow in self.state.get('flows'):
1109 if flow.get('name') == flowName:
1110 return flow
1111
1113 """
1114 Empty the planet of a flow.
1115
1116 @returns: a deferred that will fire when the flow is removed.
1117 """
1118
1119 flow = self._getFlowByName(flowName)
1120 if flow is None:
1121 raise ValueError("No flow called %s found" % (flowName, ))
1122
1123 components = flow.get('components')
1124 for c in components:
1125
1126 if (c.get('moodPending') != None or
1127 c.get('mood') is not moods.sleeping.value):
1128 raise errors.BusyComponentError(c)
1129 for c in components:
1130 del self._componentMappers[self._componentMappers[c].id]
1131 del self._componentMappers[c]
1132 d = flow.empty()
1133 d.addCallback(lambda _: self.state.remove('flows', flow))
1134 return d
1135
1137 """
1138 Empty the planet of all components, and flows. Also clears all
1139 messages.
1140
1141 @returns: a deferred that will fire when the planet is empty.
1142 """
1143 for mid in self.state.get('messages').keys():
1144 self.clearMessage(mid)
1145
1146
1147 components = self.getComponentStates()
1148
1149
1150 components = [c for c in components
1151 if c.get('moodPending') != None]
1152 if components:
1153 state = components[0]
1154 raise errors.BusyComponentError(
1155 state,
1156 "moodPending is %s" % moods.get(state.get('moodPending')))
1157
1158
1159 components = [c for c in self.getComponentStates()
1160 if c.get('mood') is not moods.sleeping.value]
1161
1162
1163 d = defer.Deferred()
1164
1165 self.debug('need to stop %d components: %r' % (
1166 len(components), components))
1167
1168 for c in components:
1169 avatar = self._componentMappers[c].avatar
1170
1171
1172 if avatar:
1173 d.addCallback(lambda result, a: a.stop(), avatar)
1174 else:
1175 assert (c.get('mood') is moods.sad.value or
1176 c.get('mood') is moods.lost.value)
1177
1178 d.addCallback(self._emptyPlanetCallback)
1179
1180
1181 reactor.callLater(0, d.callback, None)
1182
1183 return d
1184
1186
1187
1188 components = self.getComponentStates()
1189 self.debug('_emptyPlanetCallback: need to delete %d components' %
1190 len(components))
1191
1192 for c in components:
1193 if c.get('mood') is not moods.sleeping.value:
1194 self.warning('Component %s is not sleeping', c.get('name'))
1195
1196 m = self._componentMappers[c]
1197 del self._componentMappers[m.id]
1198 del self._componentMappers[c]
1199
1200
1201 l = self._componentMappers.keys()
1202 if len(l) > 0:
1203 self.warning('mappers still has keys %r' % (repr(l)))
1204
1205 dList = []
1206
1207 dList.append(self.state.get('atmosphere').empty())
1208
1209 for f in self.state.get('flows'):
1210 self.debug('appending deferred for emptying flow %r' % f)
1211 dList.append(f.empty())
1212 self.debug('appending deferred for removing flow %r' % f)
1213 dList.append(self.state.remove('flows', f))
1214 self.debug('appended deferreds')
1215
1216 dl = defer.DeferredList(dList)
1217 return dl
1218
1220 """
1221 @rtype: list of L{flumotion.common.planet.ManagerComponentState}
1222 """
1223
1224 components = self.state.getComponents()
1225
1226
1227
1228
1229
1230 isSleeping = lambda c: c.get('mood') == moods.sleeping.value
1231 components = filter(isSleeping, components)
1232 return components
1233
1235
1236 if not workerName in self.workerHeaven.avatars:
1237 raise errors.ComponentNoWorkerError("Worker %s not logged in?"
1238 % workerName)
1239
1240 return self.workerHeaven.avatars[workerName]
1241
1243 if workerName in self.workerHeaven.avatars:
1244 return self._getWorker(workerName).feedServerPort
1245 return None
1246
1248 """
1249 Requests a number of ports on the worker named workerName. The
1250 ports will be reserved for the use of the caller until
1251 releasePortsOnWorker is called.
1252
1253 @returns: a list of ports as integers
1254 """
1255 return self._getWorker(workerName).reservePorts(numPorts)
1256
1258 """
1259 Tells the manager that the given ports are no longer being used,
1260 and may be returned to the allocation pool.
1261 """
1262 try:
1263 return self._getWorker(workerName).releasePorts(ports)
1264 except errors.ComponentNoWorkerError, e:
1265 self.warning('could not release ports: %r' % e.args)
1266
1268 """
1269 Look up an object mapper given the object.
1270
1271 @rtype: L{ComponentMapper} or None
1272 """
1273 if object in self._componentMappers.keys():
1274 return self._componentMappers[object]
1275
1276 return None
1277
1279 """
1280 Look up an object mapper given the object.
1281
1282 @rtype: L{ComponentMapper} or None
1283 """
1284 if object in self._componentMappers.keys():
1285 return self._componentMappers[object].state
1286
1287 return None
1288
1290 """
1291 Invokes method on all components of a certain type
1292 """
1293
1294 def invokeOnOneComponent(component, methodName, *args, **kwargs):
1295 m = self.getComponentMapper(component)
1296 if not m:
1297 self.warning('Component %s not mapped. Maybe deleted.',
1298 component.get('name'))
1299 raise errors.UnknownComponentError(component)
1300
1301 avatar = m.avatar
1302 if not avatar:
1303 self.warning('No avatar for %s, cannot call remote',
1304 component.get('name'))
1305 raise errors.SleepingComponentError(component)
1306
1307 try:
1308 return avatar.mindCallRemote(methodName, *args, **kwargs)
1309 except Exception, e:
1310 log_message = log.getExceptionMessage(e)
1311 msg = "exception on remote call %s: %s" % (methodName,
1312 log_message)
1313 self.warning(msg)
1314 raise errors.RemoteMethodError(methodName,
1315 log_message)
1316
1317
1318 dl_array = []
1319 for c in self.getComponentStates():
1320 if c.get('type') == componentType and \
1321 (c.get('mood') is moods.happy.value or
1322 c.get('mood') is moods.hungry.value):
1323 self.info("component %r to have %s run", c, methodName)
1324 d = invokeOnOneComponent(c, methodName, *args, **kwargs)
1325 dl_array.append(d)
1326 dl = defer.DeferredList(dl_array)
1327 return dl
1328