1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """
23 worker-side objects for components
24 """
25
26 import os
27 import time
28 import socket
29
30 from twisted.internet import reactor, error, defer
31 from twisted.spread import pb
32 from twisted.python import reflect
33 from zope.interface import implements
34
35 from flumotion.configure import configure
36 from flumotion.common import interfaces, errors, log, planet, medium
37 from flumotion.common import componentui, common, messages
38 from flumotion.common import interfaces, reflectcall, debug
39 from flumotion.common.i18n import N_, gettexter
40 from flumotion.common.planet import moods
41 from flumotion.common.poller import Poller
42 from flumotion.twisted import credentials
43 from flumotion.twisted import pb as fpb
44 from flumotion.twisted.flavors import IStateCacheableListener
45
46
47 __version__ = "$Rev$"
48 T_ = gettexter()
49
50
52 """
53 I am a client factory for a component logging in to the manager.
54 """
55 logCategory = 'component'
56 perspectiveInterface = interfaces.IComponentMedium
57
76
80
81
82
84
85 def remoteDisconnected(remoteReference):
86 if reactor.killed:
87 self.log('Connection to manager lost due to shutdown')
88 else:
89 self.warning('Lost connection to manager, '
90 'will attempt to reconnect')
91
92 def loginCallback(reference):
93 self.info("Logged in to manager")
94 self.debug("remote reference %r" % reference)
95
96 self.medium.setRemoteReference(reference)
97 reference.notifyOnDisconnect(remoteDisconnected)
98
99 def loginFailedDisconnect(failure):
100
101
102 self.debug('Login failed, reason: %s, disconnecting', failure)
103 self.disconnect()
104 return failure
105
106 def accessDeniedErrback(failure):
107 failure.trap(errors.NotAuthenticatedError)
108 self.warning('Access denied.')
109
110 def connectionRefusedErrback(failure):
111 failure.trap(error.ConnectionRefusedError)
112 self.warning('Connection to manager refused.')
113
114 def alreadyLoggedInErrback(failure):
115 failure.trap(errors.AlreadyConnectedError)
116 self.warning('Component with id %s is already logged in.',
117 self.medium.authenticator.avatarId)
118
119 def loginFailedErrback(failure):
120 self.warning('Login failed, reason: %s' % failure)
121
122 d.addCallback(loginCallback)
123 d.addErrback(loginFailedDisconnect)
124 d.addErrback(accessDeniedErrback)
125 d.addErrback(connectionRefusedErrback)
126 d.addErrback(alreadyLoggedInErrback)
127 d.addErrback(loginFailedErrback)
128
129
130
134
135
137 """
138 Creates a deferred chain created by chaining calls to the given
139 procedures, each of them made with the given args and kwargs.
140 Only the result of the last procedure is returned; results for the
141 other procedures are discarded.
142
143 Failures triggered during any of the procedure short-circuit execution
144 of the other procedures and should be handled by the errbacks attached
145 to the deferred returned here.
146
147 @rtype: L{twisted.internet.defer.Deferred}
148 """
149
150 def call_proc(_, p):
151 log.debug('', 'calling %r', p)
152 return p(*args, **kwargs)
153 if not procs:
154 return defer.succeed(None)
155 p, procs = procs[0], procs[1:]
156 d = defer.maybeDeferred(call_proc, None, p)
157 for p in procs:
158 d.addCallback(call_proc, p)
159 return d
160
161
162
163
165 """
166 I am a medium interfacing with a manager-side avatar.
167 I implement a Referenceable for the manager's avatar to call on me.
168 I have a remote reference to the manager's avatar to call upon.
169 I am created by the L{ComponentClientFactory}.
170
171 @cvar authenticator: the authenticator used to log in to manager
172 @type authenticator: L{flumotion.twisted.pb.Authenticator}
173 """
174
175 implements(interfaces.IComponentMedium)
176 logCategory = 'basecompmed'
177
179 """
180 @param component: L{flumotion.component.component.BaseComponent}
181 """
182 self.comp = component
183 self.authenticator = None
184 self.broker = None
185
189
190
191
192 - def setup(self, config):
194
196 """
197 Return the manager IP as seen by us.
198 """
199 assert self.remote or self.broker
200 broker = self.broker or self.remote.broker
201 peer = broker.transport.getPeer()
202 try:
203 host = peer.host
204 except AttributeError:
205 host = peer[1]
206
207 res = socket.gethostbyname(host)
208 self.debug("getManagerIP(): we think the manager's IP is %r" % res)
209 return res
210
212 """
213 Return the IP of this component based on connection to the manager.
214
215 Note: this is insufficient in general, and should be replaced by
216 network mapping stuff later.
217 """
218 assert self.remote
219 host = self.remote.broker.transport.getHost()
220 self.debug("getIP(): using %r as our IP", host.host)
221 return host.host
222
224 """
225 Set the authenticator the client factory has used to log in to the
226 manager. Can be reused by the component's medium to make
227 feed connections which also get authenticated by the manager's
228 bouncer.
229
230 @type authenticator: L{flumotion.twisted.pb.Authenticator}
231 """
232 self.authenticator = authenticator
233
234
235
236
238 """
239 Return the state of the component, which will be serialized to a
240 L{flumotion.common.planet.ManagerJobState} object.
241
242 @rtype: L{flumotion.common.planet.WorkerJobState}
243 @returns: state of component
244 """
245
246
247 self.comp.state.set('manager-ip', self.getManagerIP())
248 return self.comp.state
249
251 """
252 Return the configuration of the component.
253
254 @rtype: dict
255 @returns: component's current configuration
256 """
257 return self.comp.config
258
260 self.info('Stopping component')
261 return self.comp.stop()
262
267
269 """Get a WorkerComponentUIState containing details needed to
270 present an admin-side UI state
271 """
272 return self.comp.uiState
273
275 """Get mood of the component
276 """
277 return self.comp.getMood()
278
280 """
281 Base implementation of getMasterClockInfo, can be overridden by
282 subclasses. By default, just returns None.
283 """
284 return None
285
288
290 """
291 Sets the Flumotion debugging levels based on the passed debug string.
292
293 @since: 0.6.0
294 """
295 self.debug('Setting Flumotion debug level to %s' % debug)
296 self.comp.uiState.set('flu-debug', debug)
297 log.setDebug(debug)
298
299
301 """
302 I am the base class for all Flumotion components.
303
304 @ivar name: the name of the component
305 @type name: string
306 @ivar medium: the component's medium
307 @type medium: L{BaseComponentMedium}
308 @ivar uiState: state of the component to be shown in a UI.
309 Contains at least the following keys.
310 - cpu-percent: percentage of CPU use in last interval
311 - start-time: time when component was started, in epoch
312 seconds
313 - current-time: current time in epoch seconds, as seen on
314 component's machine, which might be out of
315 sync
316 - virtual-size: virtual memory size in bytes
317 Subclasses can add additional keys for their respective UI.
318 @type uiState: L{componentui.WorkerComponentUIState}
319
320 @cvar componentMediumClass: the medium class to use for this component
321 @type componentMediumClass: child class of L{BaseComponentMedium}
322 """
323
324 logCategory = 'basecomp'
325 componentMediumClass = BaseComponentMedium
326
327 implements(IStateCacheableListener)
328
329 - def __init__(self, config, haveError=None):
330 """
331 Subclasses should not override __init__ at all.
332
333 Instead, they should implement init(), which will be called
334 by this implementation automatically.
335
336 L{flumotion.common.common.InitMixin} for more details.
337 """
338 self.debug("initializing %r with config %r", type(self), config)
339 self.config = config
340 self._haveError = haveError
341
342
343 common.InitMixin.__init__(self)
344
345 self.setup()
346
347
348
350 """
351 A subclass should do as little as possible in its init method.
352 In particular, it should not try to access resources.
353
354 Failures during init are marshalled back to the manager through
355 the worker's remote_create method, since there is no component state
356 proxied to the manager yet at the time of init.
357 """
358 self.state = planet.WorkerJobState()
359
360 self.name = self.config['name']
361
362 self.state.set('pid', os.getpid())
363 self.setMood(moods.waking)
364
365 self.medium = None
366
367 self.uiState = componentui.WorkerComponentUIState()
368 self.uiState.addKey('cpu-percent')
369 self.uiState.addKey('start-time')
370 self.uiState.addKey('current-time')
371 self.uiState.addKey('virtual-size')
372 self.uiState.addKey('total-memory')
373 self.uiState.addKey('num-cpus')
374 self.uiState.addKey('flu-debug')
375
376 self.uiState.addHook(self)
377
378 self.plugs = {}
379
380 self._happyWaits = []
381
382
383 self._lastTime = time.time()
384 self._lastClock = time.clock()
385 self._cpuPoller = Poller(self._pollCPU, 5, start=False)
386 self._memoryPoller = Poller(self._pollMemory, 60, start=False)
387 self._cpuPollerDC = None
388 self._memoryPollerDC = None
389 self._shutdownHook = None
390
391
392
394 """
395 Triggered when a uiState observer was added.
396
397 Default implementation is to start the memory and cpu pollers.
398
399 Note:
400 Subclasses can override me but should chain me up to start these
401 pollers
402 """
403 self.debug("observer has started watching us, starting pollers")
404 if not self._cpuPoller.running and not self._cpuPollerDC:
405 self._cpuPollerDC = reactor.callLater(0,
406 self._cpuPoller.start,
407 immediately=True)
408 if not self._memoryPoller.running and not self._memoryPollerDC:
409 self._memoryPollerDC = reactor.callLater(0,
410 self._memoryPoller.start,
411 immediately=True)
412
414 """
415 Triggered when a uiState observer has left.
416
417 Default implementation is to stop the memory and cpu pollers
418 when the total number of observers denoted by the 'num'
419 argument becomes zero.
420
421 Note:
422 Subclasses can override me but should chain me up to stop these
423 pollers
424 """
425 if num == 0:
426 self.debug("no more observers left, shutting down pollers")
427
428 if self._cpuPollerDC:
429 self._cpuPollerDC.cancel()
430 self._cpuPollerDC = None
431 if self._memoryPollerDC:
432 self._memoryPollerDC.cancel()
433 self._memoryPollerDC = None
434
435 if self._cpuPoller:
436 self._cpuPoller.stop()
437 if self._memoryPoller:
438 self._memoryPoller.stop()
439
441 """
442 Subclasses can implement me to run any checks before the component
443 performs setup.
444
445 Messages can be added to the component state's 'messages' list key.
446 Any error messages added will trigger the component going to sad,
447 with L{flumotion.common.errors.ComponentSetupError} being raised
448 before getting to setup stage; do_setup() will not be called.
449
450 In the event of a fatal problem that can't be expressed through an
451 error message, this method should raise an exception or return a
452 failure.
453
454 It is not necessary to chain up in this function. The return
455 value may be a deferred.
456 """
457 return defer.maybeDeferred(self.check_properties,
458 self.config['properties'],
459 self.addMessage)
460
462 """
463 BaseComponent convenience vmethod for running checks.
464
465 A component implementation can override this method to run any
466 checks that it needs to. Typically, a check_properties
467 implementation will call the provided addMessage() callback to
468 note warnings or errors. For errors, addMessage() will set
469 component's mood to sad, which will abort the init process
470 before getting to do_setup().
471
472 @param properties: The component's properties
473 @type properties: dict of string => object
474 @param addMessage: Thunk to add a message to the component
475 state. Will raise an exception if the
476 message is of level ERROR.
477 @type addMessage: L{flumotion.common.messages.Message} -> None
478 """
479 pass
480
482 """
483 Subclasses can implement me to set up the component before it is
484 started. It should set up the component, possibly opening files
485 and resources.
486 Non-programming errors should not be raised, but returned as a
487 failing deferred.
488
489 The return value may be a deferred.
490 """
491 plug_starts = []
492 for socket, plugs in self.config['plugs'].items():
493 self.plugs[socket] = []
494 for plug in plugs:
495 entry = plug['entries']['default']
496 instance = reflectcall.reflectCall(entry['module-name'],
497 entry['function-name'],
498 plug)
499 self.plugs[socket].append(instance)
500 self.debug('Starting plug %r on socket %s',
501 instance, socket)
502 plug_starts.append(instance.start)
503
504
505
506 checks = common.get_all_methods(self, 'do_check', False)
507
508 def checkErrorCallback(result):
509
510
511
512
513 current = self.state.get('mood')
514 if current == moods.sad.value:
515 self.warning('Running checks made the component sad.')
516 raise errors.ComponentSetupHandledError()
517
518 checks.append(checkErrorCallback)
519
520 return _maybeDeferredChain(plug_starts + checks, self)
521
523 """
524 BaseComponent vmethod for stopping.
525 The component should do any cleanup it needs, but must not set the
526 component's mood to sleeping.
527
528 @Returns: L{twisted.internet.defer.Deferred}
529 """
530 plug_stops = []
531 for socket, plugs in self.plugs.items():
532 for plug in plugs:
533 self.debug('Stopping plug %r on socket %s', plug, socket)
534 plug_stops.append(plug.stop)
535
536 for message in self.state.get('messages'):
537
538 self.state.remove('messages', message)
539
540
541 if self._cpuPollerDC:
542 self._cpuPollerDC.cancel()
543 self._cpuPollerDC = None
544 if self._memoryPollerDC:
545 self._memoryPollerDC.cancel()
546 self._memoryPollerDC = None
547
548 if self._cpuPoller:
549 self._cpuPoller.stop()
550 self._cpuPoller = None
551 if self._memoryPoller:
552 self._memoryPoller.stop()
553 self._memoryPoller = None
554
555 if self._shutdownHook:
556 self.debug('_stoppedCallback: firing shutdown hook')
557 self._shutdownHook()
558
559 return _maybeDeferredChain(plug_stops, self)
560
561
562
564 """
565 Sets up the component. Called during __init__, so be sure not
566 to raise exceptions, instead adding messages to the component
567 state.
568 """
569
570 def run_setups():
571 setups = common.get_all_methods(self, 'do_setup', False)
572 return _maybeDeferredChain(setups, self)
573
574 def setup_complete(_):
575 self.debug('setup completed')
576 self.setup_completed()
577
578 def got_error(failure):
579 txt = log.getFailureMessage(failure)
580 self.debug('got_error: %s', txt)
581 if not failure.check(errors.ComponentSetupHandledError):
582 self.warning('Setup failed: %s', txt)
583 m = messages.Error(T_(N_("Could not setup component.")),
584 debug=txt,
585 mid="component-setup-%s" % self.name)
586
587 self.addMessage(m)
588
589
590 return None
591
592 self.setMood(moods.waking)
593 self.uiState.set('start-time', time.time())
594
595 self.uiState.set('total-memory', self._getTotalMemory())
596 self.uiState.set('num-cpus', self._getNumberOfCPUs())
597 self.uiState.set('flu-debug', log.getDebug())
598
599 d = run_setups()
600 d.addCallbacks(setup_complete, got_error)
601
602
604 self.debug('turning happy')
605 self.setMood(moods.happy)
606
608 """
609 Set the shutdown hook for this component (replacing any previous hook).
610 When a component is stopped, then this hook will be fired.
611 """
612 self._shutdownHook = shutdownHook
613
615 """
616 Tell the component to stop.
617 The connection to the manager will be closed.
618 The job process will also finish.
619 """
620 self.debug('BaseComponent.stop')
621
622
623 self.setMood(moods.waking)
624
625
626 stops = common.get_all_methods(self, 'do_stop', True)
627 return _maybeDeferredChain(stops, self)
628
629
630
633
635 self.state.set('workerName', workerName)
636
639
647
649 """
650 Set the given mood on the component if it's different from the current
651 one.
652 """
653 current = self.state.get('mood')
654
655 if current == mood.value:
656 self.log('already in mood %r' % mood)
657 return
658 elif current == moods.sad.value:
659 self.info('tried to set mood to %r, but already sad :-(' % mood)
660 return
661
662 self.doLog(log.DEBUG, -2, 'MOOD changed to %r by caller', mood)
663 self.state.set('mood', mood.value)
664
665 if mood == moods.happy:
666 while self._happyWaits:
667 self._happyWaits.pop(0).callback(None)
668 elif mood == moods.sad:
669 while self._happyWaits:
670 self._happyWaits.pop(0).errback(errors.ComponentStartError())
671
673 """
674 Gets the mood on the component.
675
676 @rtype: int
677 """
678 return self.state.get('mood')
679
690
692 """
693 Add a message to the component.
694 If any of the messages is an error, the component will turn sad.
695
696 @type message: L{flumotion.common.messages.Message}
697 """
698 self.state.append('messages', message)
699 if message.level == messages.ERROR:
700 self.debug('error message, turning sad')
701 self.setMood(moods.sad)
702 if self._haveError:
703 self._haveError(message)
704
706 """
707 Add a warning messages for deprecated properties.
708
709 @param list: list of property names.
710 @type list: list of str
711 """
712 m = messages.Warning(T_(N_(
713 "Your configuration uses deprecated properties. "
714 "Please update your configuration and correct them.\n")),
715 mid="deprecated")
716 for prop in list:
717 m.add(T_(N_(
718 "Please remove '%s' property.\n"), prop))
719 self.addMessage(m)
720
722 """
723 Fix properties that have been renamed from a previous version,
724 and add a warning for them.
725
726 @param properties: properties; will be modified as a result.
727 @type properties: dict
728 @param list: list of (old, new) tuples of property names.
729 @type list: list of tuple of (str, str)
730 """
731 found = []
732 for old, new in list:
733 if old in properties:
734 found.append((old, new))
735
736 if found:
737 m = messages.Warning(T_(N_(
738 "Your configuration uses deprecated properties. "
739 "Please update your configuration and correct them.\n")),
740 mid="deprecated")
741 for old, new in found:
742 m.add(T_(N_(
743 "Please rename '%s' to '%s'.\n"),
744 old, new))
745 self.debug("Setting new property '%s' to %r", new,
746 properties[old])
747 properties[new] = properties[old]
748 del properties[old]
749 self.addMessage(m)
750
752 """
753 Call a remote method on all admin client views on this component.
754
755 This gets serialized through the manager and multiplexed to all
756 admin clients, and from there on to all views connected to each
757 admin client model.
758
759 Because there can be any number of admin clients that this call
760 will go out do, it does not make sense to have one return value.
761 This function will return None always.
762 """
763 if self.medium:
764 self.medium.callRemote("adminCallRemote", methodName,
765 *args, **kwargs)
766 else:
767 self.debug('asked to adminCallRemote(%s, *%r, **%r), but '
768 'no manager.'
769 % (methodName, args, kwargs))
770
772 try:
773 namespace = plug.get_namespace()
774 except AttributeError:
775 self.debug("Plug %r does not provide namespace, "
776 "its interface will not be exposed", plug)
777 return
778
779 self.debug("Exposing plug's %r interface in namespace %r",
780 plug, namespace)
781 for method in filter(callable,
782 [getattr(plug, m) for m in dir(plug)
783 if m.startswith('remote_')]):
784 if namespace:
785 name = "".join(("remote_", namespace, "_",
786 method.__name__[len("remote_"):]))
787 else:
788 name = method.__name__
789 self.debug("Exposing method %r as %r in %r", method, name, medium)
790 setattr(medium, name, method)
791
793 self._cpuPollerDC = None
794
795 nowTime = time.time()
796 nowClock = time.clock()
797 deltaTime = nowTime - self._lastTime
798 deltaClock = nowClock - self._lastClock
799 self._lastTime = nowTime
800 self._lastClock = nowClock
801
802 if deltaClock >= 0:
803 CPU = deltaClock/deltaTime
804 self.log('latest CPU use: %r', CPU)
805 self.uiState.set('cpu-percent', CPU)
806
807 self.uiState.set('current-time', nowTime)
808
810 self._memoryPollerDC = None
811
812
813 handle = open('/proc/%d/stat' % os.getpid())
814 line = handle.read()
815 handle.close()
816 fields = line.split()
817
818
819 vsize = int(fields[22])
820 self.log('vsize is %d', vsize)
821 self.uiState.set('virtual-size', vsize)
822
824 f = open("/proc/meminfo")
825 memtotal = f.readline()
826 f.close()
827 return int(memtotal[memtotal.index(":") + 1: -3]) * 1024
828
830 try:
831 return open('/proc/cpuinfo').read().count('processor\t:')
832 except IOError:
833 self.debug('Can not determine number of CPUs on this system')
834 return 1
835