1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """
23 Feed components, participating in the stream
24 """
25
26 import os
27
28 import gst
29 import gst.interfaces
30 import gobject
31
32 from twisted.internet import reactor, defer
33 from twisted.spread import pb
34 from zope.interface import implements
35
36 from flumotion.configure import configure
37 from flumotion.component import component as basecomponent
38 from flumotion.component import feed
39 from flumotion.common import common, interfaces, errors, log, pygobject, \
40 messages
41 from flumotion.common import gstreamer
42 from flumotion.common.i18n import N_, gettexter
43 from flumotion.common.planet import moods
44 from flumotion.common.pygobject import gsignal
45
46 __version__ = "$Rev: 7970 $"
47 T_ = gettexter()
48
49
51 """
52 I am a component-side medium for a FeedComponent to interface with
53 the manager-side ComponentAvatar.
54 """
55 implements(interfaces.IComponentMedium)
56 logCategory = 'feedcompmed'
57 remoteLogName = 'feedserver'
58
60 """
61 @param component: L{flumotion.component.feedcomponent.FeedComponent}
62 """
63 basecomponent.BaseComponentMedium.__init__(self, component)
64
65 self._feederFeedServer = {}
66
67 self._feederPendingConnections = {}
68 self._eaterFeedServer = {}
69
70 self._eaterPendingConnections = {}
71 self.logName = component.name
72
73
74
77
79 """
80 Sets the GStreamer debugging levels based on the passed debug string.
81
82 @since: 0.4.2
83 """
84 self.debug('Setting GStreamer debug level to %s' % debug)
85 if not debug:
86 return
87
88 for part in debug.split(','):
89 glob = None
90 value = None
91 pair = part.split(':')
92 if len(pair) == 1:
93
94 value = int(pair[0])
95 elif len(pair) == 2:
96 glob, value = pair
97 value = int(value)
98 else:
99 self.warning("Cannot parse GStreamer debug setting '%s'." %
100 part)
101 continue
102
103 if glob:
104 try:
105
106 gst.debug_set_threshold_for_name(glob, value)
107 except TypeError:
108 self.warning("Cannot set glob %s to value %s" % (
109 glob, value))
110 else:
111 gst.debug_set_default_threshold(value)
112
114 """
115 Tell the component the host and port for the FeedServer through which
116 it can connect a local eater to a remote feeder to eat the given
117 fullFeedId.
118
119 Called on by the manager-side ComponentAvatar.
120 """
121 if self._feederFeedServer.get(eaterAlias):
122 if self._feederFeedServer[eaterAlias] == (fullFeedId, host, port):
123 self.debug("Feed:%r is the same as the current one. "\
124 "Request ignored.", (fullFeedId, host, port))
125 return
126 self._feederFeedServer[eaterAlias] = (fullFeedId, host, port)
127 return self.connectEater(eaterAlias)
128
141
143 """
144 Connect one of the medium's component's eaters to a remote feed.
145 Called by the component, both on initial connection and for
146 reconnecting.
147
148 @returns: deferred that will fire with a value of None
149 """
150
151
152 def gotFeed((feedId, fd)):
153 self._feederPendingConnections.pop(eaterAlias, None)
154 self.comp.eatFromFD(eaterAlias, feedId, fd)
155
156 if eaterAlias not in self._feederFeedServer:
157 self.debug("eatFrom() hasn't been called yet for eater %s",
158 eaterAlias)
159
160
161 return defer.succeed(None)
162
163 (fullFeedId, host, port) = self._feederFeedServer[eaterAlias]
164
165 cancel = self._feederPendingConnections.pop(eaterAlias, None)
166 if cancel:
167 self.debug('cancelling previous connection attempt on %s',
168 eaterAlias)
169 cancel()
170
171 client = feed.FeedMedium(logName=self.comp.name)
172
173 d = client.requestFeed(host, port,
174 self._getAuthenticatorForFeed(eaterAlias),
175 fullFeedId)
176 self._feederPendingConnections[eaterAlias] = client.stopConnecting
177 d.addCallback(gotFeed)
178 return d
179
181 """
182 Tell the component to feed the given feed to the receiving component
183 accessible through the FeedServer on the given host and port.
184
185 Called on by the manager-side ComponentAvatar.
186 """
187 self._eaterFeedServer[fullFeedId] = (host, port)
188 self.connectFeeder(feederName, fullFeedId)
189
191 """
192 Tell the component to feed the given feed to the receiving component
193 accessible through the FeedServer on the given host and port.
194
195 Called on by the manager-side ComponentAvatar.
196 """
197
198 def gotFeed((fullFeedId, fd)):
199 self._eaterPendingConnections.pop(feederName, None)
200 self.comp.feedToFD(feederName, fd, os.close, fullFeedId)
201
202 if fullFeedId not in self._eaterFeedServer:
203 self.debug("feedTo() hasn't been called yet for feeder %s",
204 feederName)
205
206
207 return defer.succeed(None)
208
209 host, port = self._eaterFeedServer[fullFeedId]
210
211
212 cancel = self._eaterPendingConnections.pop(fullFeedId, None)
213 if cancel:
214 self.debug('cancelling previous connection attempt on %s',
215 feederName)
216 cancel()
217
218 client = feed.FeedMedium(logName=self.comp.name)
219
220 d = client.sendFeed(host, port,
221 self._getAuthenticatorForFeed(feederName),
222 fullFeedId)
223 self._eaterPendingConnections[feederName] = client.stopConnecting
224 d.addCallback(gotFeed)
225 return d
226
228 """
229 Tells the component to start providing a master clock on the given
230 UDP port.
231 Can only be called if setup() has been called on the component.
232
233 The IP address returned is the local IP the clock is listening on.
234
235 @returns: (ip, port, base_time)
236 @rtype: tuple of (str, int, long)
237 """
238 self.debug('remote_provideMasterClock(port=%r)' % port)
239 return self.comp.provide_master_clock(port)
240
242 """
243 Return the clock master info created by a previous call
244 to provideMasterClock.
245
246 @returns: (ip, port, base_time)
247 @rtype: tuple of (str, int, long)
248 """
249 return self.comp.get_master_clock()
250
253
254 - def remote_effect(self, effectName, methodName, *args, **kwargs):
255 """
256 Invoke the given methodName on the given effectName in this component.
257 The effect should implement effect_(methodName) to receive the call.
258 """
259 self.debug("calling %s on effect %s" % (methodName, effectName))
260 if not effectName in self.comp.effects:
261 raise errors.UnknownEffectError(effectName)
262 effect = self.comp.effects[effectName]
263 if not hasattr(effect, "effect_%s" % methodName):
264 raise errors.NoMethodError("%s on effect %s" % (methodName,
265 effectName))
266 method = getattr(effect, "effect_%s" % methodName)
267 try:
268 result = method(*args, **kwargs)
269 except TypeError:
270 msg = "effect method %s did not accept %s and %s" % (
271 methodName, args, kwargs)
272 self.debug(msg)
273 raise errors.RemoteRunError(msg)
274 self.debug("effect: result: %r" % result)
275 return result
276
277 from feedcomponent010 import FeedComponent
278
279 FeedComponent.componentMediumClass = FeedComponentMedium
280
281
283 """A component using gst-launch syntax
284
285 @cvar checkTimestamp: whether to check continuity of timestamps for eaters
286 @cvar checkOffset: whether to check continuity of offsets for
287 eaters
288 """
289
290 DELIMITER = '@'
291
292
293 checkTimestamp = False
294 checkOffset = False
295
296
297 FDSRC_TMPL = 'fdsrc name=%(name)s'
298 DEPAY_TMPL = 'gdpdepay name=%(name)s-depay'
299 FEEDER_TMPL = 'gdppay name=%(name)s-pay ! multifdsink sync=false '\
300 'name=%(name)s buffers-max=500 buffers-soft-max=450 '\
301 'recover-policy=1'
302 EATER_TMPL = None
303
323
324
325
351
360
361
362
364 """
365 Method that must be implemented by subclasses to produce the
366 gstparse string for the component's pipeline. Subclasses should
367 not chain up; this method raises a NotImplemented error.
368
369 Returns: a new pipeline string representation.
370 """
371 raise NotImplementedError('subclasses should implement '
372 'get_pipeline_string')
373
383
384
385
396
398 """
399 Expand the given pipeline string representation by substituting
400 blocks between '@' with a filled-in template.
401
402 @param pipeline: a pipeline string representation with variables
403 @param templatizers: A dict of prefix => procedure. Template
404 blocks in the pipeline will be replaced
405 with the result of calling the procedure
406 with what is left of the template after
407 taking off the prefix.
408 @returns: a new pipeline string representation.
409 """
410 assert pipeline != ''
411
412
413 if pipeline.count(self.DELIMITER) % 2 != 0:
414 raise TypeError("'%s' contains an odd number of '%s'"
415 % (pipeline, self.DELIMITER))
416
417 out = []
418 for i, block in enumerate(pipeline.split(self.DELIMITER)):
419
420
421 if i % 2 == 0:
422 out.append(block)
423 else:
424 block = block.strip()
425 try:
426 pos = block.index(':')
427 except ValueError:
428 raise TypeError("Template %r has no colon" % (block, ))
429 prefix = block[:pos+1]
430 if prefix not in templatizers:
431 raise TypeError("Template %r has invalid prefix %r"
432 % (block, prefix))
433 out.append(templatizers[prefix](block[pos+1:]))
434 return ''.join(out)
435
457
459 queue = self.get_queue_string(eaterAlias)
460 elementName = self.eaters[eaterAlias].elementName
461
462 return self.EATER_TMPL % {'name': elementName, 'queue': queue}
463
465 elementName = self.feeders[feederName].elementName
466 return self.FEEDER_TMPL % {'name': elementName}
467
469 """
470 Return a parse-launch string to join the fdsrc eater element and
471 the depayer, for example '!' or '! queue !'. The string may have
472 no format strings.
473 """
474 return '!'
475
476
478 """
479 I am a part of a feed component for a specific group
480 of functionality.
481
482 @ivar name: name of the effect
483 @type name: string
484 @ivar component: component owning the effect
485 @type component: L{FeedComponent}
486 """
487 logCategory = "effect"
488
490 """
491 @param name: the name of the effect
492 """
493 self.name = name
494 self.setComponent(None)
495
497 """
498 Set the given component as the effect's owner.
499
500 @param component: the component to set as an owner of this effect
501 @type component: L{FeedComponent}
502 """
503 self.component = component
504 self.setUIState(component and component.uiState or None)
505
507 """
508 Set the given UI state on the effect. This method is ideal for
509 adding keys to the UI state.
510
511 @param state: the UI state for the component to use.
512 @type state: L{flumotion.common.componentui.WorkerComponentUIState}
513 """
514 self.uiState = state
515
517 """
518 Get the component owning this effect.
519
520 @rtype: L{FeedComponent}
521 """
522 return self.component
523
524
595
596 signalid = queue.connect("underrun", _underrun_cb)
597