1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """
23 implementation of a PB Client to interface with feedserver.py
24 """
25
26 import socket
27 import os
28
29 from twisted.internet import reactor, main, defer, tcp
30 from twisted.python import failure
31 from zope.interface import implements
32
33 from flumotion.common import log, common, interfaces
34 from flumotion.twisted import pb as fpb
35
36 __version__ = "$Rev$"
37
38
39
40
41
58
59
62
63
73
74
86
87
88
89
91 """
92 I am a client for a Feed Server.
93
94 I am used as the remote interface between a component and another
95 component.
96
97 @ivar component: the component this is a feed client for
98 @type component: L{flumotion.component.feedcomponent.FeedComponent}
99 @ivar remote: a reference to a
100 L{flumotion.worker.feedserver.FeedAvatar}
101 @type remote: L{twisted.spread.pb.RemoteReference}
102 """
103 logCategory = 'feedmedium'
104 remoteLogName = 'feedserver'
105 implements(interfaces.IFeedMedium)
106
107 remote = None
108
115
116 - def startConnecting(self, host, port, authenticator, timeout=30,
117 bindAddress=None):
118 """Optional helper method to connect to a remote feed server.
119
120 This method starts a client factory connecting via a
121 L{PassableClientConnector}. It offers the possibility of
122 cancelling an in-progress connection via the stopConnecting()
123 method.
124
125 @param host: the remote host name
126 @type host: str
127 @param port: the tcp port on which to connect
128 @param port int
129 @param authenticator: the authenticator, normally provided by
130 the worker
131 @type authenticator: L{flumotion.twisted.pb.Authenticator}
132
133 @returns: a deferred that will fire with the remote reference,
134 once we have authenticated.
135 """
136 assert self._factory is None
137 self._factory = FeedClientFactory(self)
138 reactor.connectWith(PassableClientConnector, host, port,
139 self._factory, timeout, bindAddress)
140 return self._factory.login(authenticator)
141
142 - def requestFeed(self, host, port, authenticator, fullFeedId):
143 """Request a feed from a remote feed server.
144
145 This helper method calls startConnecting() to make the
146 connection and authenticate, and will return the feed file
147 descriptor or an error. A pending connection attempt can be
148 cancelled via stopConnecting().
149
150 @param host: the remote host name
151 @type host: str
152 @param port: the tcp port on which to connect
153 @type port: int
154 @param authenticator: the authenticator, normally provided by
155 the worker
156 @type authenticator: L{flumotion.twisted.pb.Authenticator}
157 @param fullFeedId: the full feed id (/flow/component:feed)
158 offered by the remote side
159 @type fullFeedId: str
160
161 @returns: a deferred that, if successful, will fire with a pair
162 (feedId, fd). In an error case it will errback and close the
163 remote connection.
164 """
165
166 def connected(remote):
167 self.setRemoteReference(remote)
168 return remote.callRemote('sendFeed', fullFeedId)
169
170 def feedSent(res):
171
172
173
174
175 return self._feedToDeferred
176
177 def error(failure):
178 self.warning('failed to retrieve %s from %s:%d', fullFeedId,
179 host, port)
180 self.debug('failure: %s', log.getFailureMessage(failure))
181 self.debug('closing connection')
182 self.stopConnecting()
183 return failure
184
185 d = self.startConnecting(host, port, authenticator)
186 d.addCallback(connected)
187 d.addCallback(feedSent)
188 d.addErrback(error)
189 return d
190
191 - def sendFeed(self, host, port, authenticator, fullFeedId):
192 """Send a feed to a remote feed server.
193
194 This helper method calls startConnecting() to make the
195 connection and authenticate, and will return the feed file
196 descriptor or an error. A pending connection attempt can be
197 cancelled via stopConnecting().
198
199 @param host: the remote host name
200 @type host: str
201 @param port: the tcp port on which to connect
202 @type port: int
203 @param authenticator: the authenticator, normally provided by
204 the worker
205 @type authenticator: L{flumotion.twisted.pb.Authenticator}
206 @param fullFeedId: the full feed id (/flow/component:eaterAlias)
207 to feed to on the remote size
208 @type fullFeedId: str
209
210 @returns: a deferred that, if successful, will fire with a pair
211 (feedId, fd). In an error case it will errback and close the
212 remote connection.
213 """
214
215 def connected(remote):
216 assert isinstance(remote.broker.transport, _SocketMaybeCloser)
217 self.setRemoteReference(remote)
218 return remote.callRemote('receiveFeed', fullFeedId)
219
220 def feedSent(res):
221 t = self.remote.broker.transport
222 self.debug('stop reading from transport')
223 t.stopReading()
224
225 self.debug('flushing PB write queue')
226 t.doWrite()
227 self.debug('stop writing to transport')
228 t.stopWriting()
229
230 t.keepSocketAlive = True
231 fd = os.dup(t.fileno())
232
233
234 self.setRemoteReference(None)
235
236 d = defer.Deferred()
237
238 def loseConnection():
239 t.connectionLost(failure.Failure(main.CONNECTION_DONE))
240 d.callback((fullFeedId, fd))
241
242 reactor.callLater(0, loseConnection)
243 return d
244
245 def error(failure):
246 self.warning('failed to retrieve %s from %s:%d', fullFeedId,
247 host, port)
248 self.debug('failure: %s', log.getFailureMessage(failure))
249 self.debug('closing connection')
250 self.stopConnecting()
251 return failure
252
253 d = self.startConnecting(host, port, authenticator)
254 d.addCallback(connected)
255 d.addCallback(feedSent)
256 d.addErrback(error)
257 return d
258
260 """Stop a pending or established connection made via
261 startConnecting().
262
263 Stops any established or pending connection to a remote feed
264 server started via the startConnecting() method. Safe to call
265 even if connection has not been started.
266 """
267 if self._factory:
268 self._factory.disconnect()
269 self._factory = None
270
271
272 self.setRemoteReference(None)
273
274
275
277 self.remote = remoteReference
278
280 return self.remote is not None
281
284
286 t = self.remote.broker.transport
287
288 self.debug('stop reading from transport')
289 t.stopReading()
290 reactor.callLater(0, self._doFeedTo, fullFeedId, t)
291
318