xrootd
XrdClStream.hh
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2012 by European Organization for Nuclear Research (CERN)
3 // Author: Lukasz Janyst <ljanyst@cern.ch>
4 //------------------------------------------------------------------------------
5 // XRootD is free software: you can redistribute it and/or modify
6 // it under the terms of the GNU Lesser General Public License as published by
7 // the Free Software Foundation, either version 3 of the License, or
8 // (at your option) any later version.
9 //
10 // XRootD is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 // GNU General Public License for more details.
14 //
15 // You should have received a copy of the GNU Lesser General Public License
16 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17 //------------------------------------------------------------------------------
18 
19 #ifndef __XRD_CL_STREAM_HH__
20 #define __XRD_CL_STREAM_HH__
21 
22 #include "XrdCl/XrdClPoller.hh"
23 #include "XrdCl/XrdClStatus.hh"
24 #include "XrdCl/XrdClURL.hh"
27 #include "XrdCl/XrdClJobManager.hh"
28 #include "XrdCl/XrdClInQueue.hh"
29 #include "XrdCl/XrdClUtils.hh"
30 
31 #include "XrdSys/XrdSysPthread.hh"
32 #include "XrdNet/XrdNetAddr.hh"
33 #include <list>
34 #include <vector>
35 #include <functional>
36 
37 namespace XrdCl
38 {
39  class Message;
40  class Channel;
41  class TransportHandler;
42  class TaskManager;
43  struct SubStreamData;
44 
45  //----------------------------------------------------------------------------
47  //----------------------------------------------------------------------------
48  class Stream
49  {
50  public:
51  //------------------------------------------------------------------------
53  //------------------------------------------------------------------------
55  {
57  Connected = 1,
58  Connecting = 2,
59  Error = 3
60  };
61 
62  //------------------------------------------------------------------------
64  //------------------------------------------------------------------------
65  Stream( const URL *url, const URL &prefer = URL() );
66 
67  //------------------------------------------------------------------------
69  //------------------------------------------------------------------------
71 
72  //------------------------------------------------------------------------
74  //------------------------------------------------------------------------
76 
77  //------------------------------------------------------------------------
79  //------------------------------------------------------------------------
81  OutgoingMsgHandler *handler,
82  bool stateful,
83  time_t expires );
84 
85  //------------------------------------------------------------------------
87  //------------------------------------------------------------------------
88  void SetTransport( TransportHandler *transport )
89  {
90  pTransport = transport;
91  }
92 
93  //------------------------------------------------------------------------
95  //------------------------------------------------------------------------
96  void SetPoller( Poller *poller )
97  {
98  pPoller = poller;
99  }
100 
101  //------------------------------------------------------------------------
103  //------------------------------------------------------------------------
104  void SetIncomingQueue( InQueue *incomingQueue )
105  {
106  pIncomingQueue = incomingQueue;
107  delete pQueueIncMsgJob;
108  pQueueIncMsgJob = new QueueIncMsgJob( incomingQueue );
109  }
110 
111  //------------------------------------------------------------------------
113  //------------------------------------------------------------------------
114  void SetChannelData( AnyObject *channelData )
115  {
116  pChannelData = channelData;
117  }
118 
119  //------------------------------------------------------------------------
121  //------------------------------------------------------------------------
122  void SetTaskManager( TaskManager *taskManager )
123  {
124  pTaskManager = taskManager;
125  }
126 
127  //------------------------------------------------------------------------
129  //------------------------------------------------------------------------
130  void SetJobManager( JobManager *jobManager )
131  {
132  pJobManager = jobManager;
133  }
134 
135  //------------------------------------------------------------------------
139  //------------------------------------------------------------------------
141 
142  //------------------------------------------------------------------------
144  //------------------------------------------------------------------------
145  void Disconnect( bool force = false );
146 
147  //------------------------------------------------------------------------
150  //------------------------------------------------------------------------
151  void Tick( time_t now );
152 
153  //------------------------------------------------------------------------
155  //------------------------------------------------------------------------
156  const URL *GetURL() const
157  {
158  return pUrl;
159  }
160 
161  //------------------------------------------------------------------------
163  //------------------------------------------------------------------------
164  void ForceConnect();
165 
166  //------------------------------------------------------------------------
168  //------------------------------------------------------------------------
169  const std::string &GetName() const
170  {
171  return pStreamName;
172  }
173 
174  //------------------------------------------------------------------------
176  //------------------------------------------------------------------------
177  void DisableIfEmpty( uint16_t subStream );
178 
179  //------------------------------------------------------------------------
181  //------------------------------------------------------------------------
182  void OnIncoming( uint16_t subStream,
183  Message *msg,
184  uint32_t bytesReceived );
185 
186  //------------------------------------------------------------------------
187  // Call when one of the sockets is ready to accept a new message
188  //------------------------------------------------------------------------
189  std::pair<Message *, OutgoingMsgHandler *>
190  OnReadyToWrite( uint16_t subStream );
191 
192  //------------------------------------------------------------------------
193  // Call when a message is written to the socket
194  //------------------------------------------------------------------------
195  void OnMessageSent( uint16_t subStream,
196  Message *msg,
197  uint32_t bytesSent );
198 
199  //------------------------------------------------------------------------
201  //------------------------------------------------------------------------
202  void OnConnect( uint16_t subStream );
203 
204  //------------------------------------------------------------------------
206  //------------------------------------------------------------------------
207  void OnConnectError( uint16_t subStream, XRootDStatus status );
208 
209  //------------------------------------------------------------------------
211  //------------------------------------------------------------------------
212  void OnError( uint16_t subStream, XRootDStatus status );
213 
214  //------------------------------------------------------------------------
216  //------------------------------------------------------------------------
217  void ForceError( XRootDStatus status );
218 
219  //------------------------------------------------------------------------
221  //------------------------------------------------------------------------
222  void OnReadTimeout( uint16_t subStream, bool &isBroken );
223 
224  //------------------------------------------------------------------------
226  //------------------------------------------------------------------------
227  void OnWriteTimeout( uint16_t subStream );
228 
229  //------------------------------------------------------------------------
231  //------------------------------------------------------------------------
233 
234  //------------------------------------------------------------------------
236  //------------------------------------------------------------------------
238 
239  //------------------------------------------------------------------------
248  //------------------------------------------------------------------------
249  std::pair<IncomingMsgHandler *, bool>
250  InstallIncHandler( Message *msg, uint16_t stream );
251 
252  //------------------------------------------------------------------------
256  //------------------------------------------------------------------------
257  uint16_t InspectStatusRsp( Message *msg, uint16_t stream, IncomingMsgHandler *&incHandler );
258 
259  //------------------------------------------------------------------------
261  //------------------------------------------------------------------------
262  void SetOnDataConnectHandler( std::shared_ptr<Job> &onConnJob )
263  {
264  pOnDataConnJob = onConnJob;
265  }
266 
267  //------------------------------------------------------------------------
270  //------------------------------------------------------------------------
271  bool CanCollapse( const URL &url );
272 
273  private:
274 
275  //------------------------------------------------------------------------
277  //------------------------------------------------------------------------
278  static bool IsPartial( Message *msg );
279 
280  //------------------------------------------------------------------------
282  //------------------------------------------------------------------------
283  inline static bool HasNetAddr( const XrdNetAddr &addr,
284  std::vector<XrdNetAddr> &addresses )
285  {
286  auto itr = addresses.begin();
287  for( ; itr != addresses.end() ; ++itr )
288  {
289  if( itr->Same( &addr ) ) return true;
290  }
291 
292  return false;
293  }
294 
295  //------------------------------------------------------------------------
296  // Job queuing the incoming messages
297  //------------------------------------------------------------------------
298  class QueueIncMsgJob: public Job
299  {
300  public:
301  QueueIncMsgJob( InQueue *queue ): pQueue( queue ) {};
302  virtual ~QueueIncMsgJob() {};
303  virtual void Run( void *arg )
304  {
305  Message *msg = (Message *)arg;
306  pQueue->AddMessage( msg );
307  }
308  private:
310  };
311 
312  //------------------------------------------------------------------------
313  // Job handling the incoming messages
314  //------------------------------------------------------------------------
315  class HandleIncMsgJob: public Job
316  {
317  public:
318  HandleIncMsgJob( IncomingMsgHandler *handler ): pHandler( handler ) {};
319  virtual ~HandleIncMsgJob() {};
320  virtual void Run( void *arg )
321  {
322  Message *msg = (Message *)arg;
323  pHandler->Process( msg );
324  delete this;
325  }
326  private:
328  };
329 
330  //------------------------------------------------------------------------
332  //------------------------------------------------------------------------
333  void OnFatalError( uint16_t subStream,
334  XRootDStatus status,
335  XrdSysMutexHelper &lock );
336 
337  //------------------------------------------------------------------------
339  //------------------------------------------------------------------------
341 
342  //------------------------------------------------------------------------
344  //------------------------------------------------------------------------
346 
347  typedef std::vector<SubStreamData*> SubStreamList;
348 
349  //------------------------------------------------------------------------
350  // Data members
351  //------------------------------------------------------------------------
352  const URL *pUrl;
353  const URL pPrefer;
354  std::string pStreamName;
370  std::vector<XrdNetAddr> pAddresses;
373  uint64_t pSessionId;
374 
375  //------------------------------------------------------------------------
376  // Jobs
377  //------------------------------------------------------------------------
379 
380  //------------------------------------------------------------------------
381  // Monitoring info
382  //------------------------------------------------------------------------
385  uint64_t pBytesSent;
386  uint64_t pBytesReceived;
387 
388  //------------------------------------------------------------------------
389  // Data stream on-connect handler
390  //------------------------------------------------------------------------
391  std::shared_ptr<Job> pOnDataConnJob;
392  };
393 }
394 
395 #endif // __XRD_CL_STREAM_HH__
XrdCl::Stream::OnMessageSent
void OnMessageSent(uint16_t subStream, Message *msg, uint32_t bytesSent)
XrdCl::Stream::pQueueIncMsgJob
QueueIncMsgJob * pQueueIncMsgJob
Definition: XrdClStream.hh:378
XrdCl::Stream::pJobManager
JobManager * pJobManager
Definition: XrdClStream.hh:358
XrdCl::Stream::pPoller
Poller * pPoller
Definition: XrdClStream.hh:356
XrdCl::OutgoingMsgHandler
Message status handler.
Definition: XrdClPostMasterInterfaces.hh:185
XrdCl::Stream::InstallIncHandler
std::pair< IncomingMsgHandler *, bool > InstallIncHandler(Message *msg, uint16_t stream)
XrdCl::Stream::pChannelEvHandlers
ChannelHandlerList pChannelEvHandlers
Definition: XrdClStream.hh:372
XrdCl::Stream::EnableLink
XRootDStatus EnableLink(PathID &path)
XrdCl::Stream::SetPoller
void SetPoller(Poller *poller)
Set the poller.
Definition: XrdClStream.hh:96
XrdClPostMasterInterfaces.hh
XrdCl::Stream::QueueIncMsgJob
Definition: XrdClStream.hh:299
XrdCl::Stream::HandleIncMsgJob::~HandleIncMsgJob
virtual ~HandleIncMsgJob()
Definition: XrdClStream.hh:319
XrdCl::Stream::OnConnectError
void OnConnectError(uint16_t subStream, XRootDStatus status)
On connect error.
XrdSysPthread.hh
XrdCl::Stream::SubStreamList
std::vector< SubStreamData * > SubStreamList
Definition: XrdClStream.hh:347
XrdCl::PathID
Definition: XrdClPostMasterInterfaces.hh:297
XrdCl::Stream::IsPartial
static bool IsPartial(Message *msg)
Check if message is a partial response.
XrdClChannelHandlerList.hh
XrdCl::Stream::OnWriteTimeout
void OnWriteTimeout(uint16_t subStream)
On write timeout.
XrdCl::Stream::pAddressType
Utils::AddressType pAddressType
Definition: XrdClStream.hh:371
XrdCl::Stream::pSubStreams
SubStreamList pSubStreams
Definition: XrdClStream.hh:369
XrdCl::Stream::ForceConnect
void ForceConnect()
Force connection.
XrdCl::Poller
Interface for socket pollers.
Definition: XrdClPoller.hh:87
XrdCl::Stream::Connecting
@ Connecting
In the process of being connected.
Definition: XrdClStream.hh:58
XrdCl::JobManager
A synchronized queue.
Definition: XrdClJobManager.hh:51
XrdCl::Stream::SetIncomingQueue
void SetIncomingQueue(InQueue *incomingQueue)
Set the incoming queue.
Definition: XrdClStream.hh:104
XrdCl::Stream::Tick
void Tick(time_t now)
XrdNetAddr.hh
XrdCl::Message
The message representation used throughout the system.
Definition: XrdClMessage.hh:30
XrdCl::InQueue::AddMessage
bool AddMessage(Message *msg)
Add a fully reconstructed message to the queue.
XrdCl::Stream::CanCollapse
bool CanCollapse(const URL &url)
XrdCl::Stream::Stream
Stream(const URL *url, const URL &prefer=URL())
Constructor.
XrdCl::Stream::pPrefer
const URL pPrefer
Definition: XrdClStream.hh:353
XrdCl::Stream::GetName
const std::string & GetName() const
Return stream name.
Definition: XrdClStream.hh:169
XrdCl::XRootDStatus
Request status.
Definition: XrdClXRootDResponses.hh:219
XrdCl::ChannelHandlerList
A helper for handling channel event handlers.
Definition: XrdClChannelHandlerList.hh:34
XrdCl::Stream::pConnectionWindow
uint16_t pConnectionWindow
Definition: XrdClStream.hh:368
XrdCl::Stream::OnReadyToWrite
std::pair< Message *, OutgoingMsgHandler * > OnReadyToWrite(uint16_t subStream)
XrdCl::Stream::RequestClose
XRootDStatus RequestClose(Message *resp)
Send close after an open request timed out.
XrdCl::Stream::HandleIncMsgJob::pHandler
IncomingMsgHandler * pHandler
Definition: XrdClStream.hh:327
XrdCl::Stream::OnIncoming
void OnIncoming(uint16_t subStream, Message *msg, uint32_t bytesReceived)
Call back when a message has been reconstructed.
XrdCl::Stream::SetOnDataConnectHandler
void SetOnDataConnectHandler(std::shared_ptr< Job > &onConnJob)
Set the on-connect handler for data streams.
Definition: XrdClStream.hh:262
XrdCl::Stream::Disconnect
void Disconnect(bool force=false)
Disconnect the stream.
XrdCl::Stream::pConnectionInitTime
time_t pConnectionInitTime
Definition: XrdClStream.hh:367
XrdCl::TaskManager
Definition: XrdClTaskManager.hh:76
XrdCl::Stream::SetTaskManager
void SetTaskManager(TaskManager *taskManager)
Set task manager.
Definition: XrdClStream.hh:122
XrdCl::Stream::SetJobManager
void SetJobManager(JobManager *jobManager)
Set job manager.
Definition: XrdClStream.hh:130
XrdCl::InQueue
A synchronize queue for incoming data.
Definition: XrdClInQueue.hh:36
XrdCl::Stream::GetURL
const URL * GetURL() const
Get the URL.
Definition: XrdClStream.hh:156
XrdCl::Stream::pSessionId
uint64_t pSessionId
Definition: XrdClStream.hh:373
XrdCl::Stream::OnConnect
void OnConnect(uint16_t subStream)
Call back when a message has been reconstructed.
XrdSysRecMutex
Definition: XrdSysPthread.hh:242
XrdCl::IncomingMsgHandler
Message handler.
Definition: XrdClPostMasterInterfaces.hh:72
XrdCl::Stream::QueueIncMsgJob::~QueueIncMsgJob
virtual ~QueueIncMsgJob()
Definition: XrdClStream.hh:302
XrdCl::Stream::SetChannelData
void SetChannelData(AnyObject *channelData)
Set the channel data.
Definition: XrdClStream.hh:114
XrdCl::Stream::pConnectionDone
timeval pConnectionDone
Definition: XrdClStream.hh:384
XrdCl::Stream::pTransport
TransportHandler * pTransport
Definition: XrdClStream.hh:355
XrdCl::Stream::Initialize
XRootDStatus Initialize()
Initializer.
XrdClJobManager.hh
XrdCl::Stream::OnFatalError
void OnFatalError(uint16_t subStream, XRootDStatus status, XrdSysMutexHelper &lock)
On fatal error - unlocks the stream.
XrdCl::Stream::pAddresses
std::vector< XrdNetAddr > pAddresses
Definition: XrdClStream.hh:370
XrdCl::Stream::~Stream
~Stream()
Destructor.
XrdCl::Stream::pLastFatalError
XRootDStatus pLastFatalError
Definition: XrdClStream.hh:363
XrdCl::Stream::Error
@ Error
Broken.
Definition: XrdClStream.hh:59
XrdClPoller.hh
XrdCl::Stream::DisableIfEmpty
void DisableIfEmpty(uint16_t subStream)
Disables respective uplink if empty.
XrdCl::Stream::pStreamName
std::string pStreamName
Definition: XrdClStream.hh:354
XrdCl::Stream::InspectStatusRsp
uint16_t InspectStatusRsp(Message *msg, uint16_t stream, IncomingMsgHandler *&incHandler)
XrdCl::Stream::MonitorDisconnection
void MonitorDisconnection(XRootDStatus status)
Inform the monitoring about disconnection.
XrdCl
Definition: XrdClAnyObject.hh:26
XrdCl::Stream::pBytesSent
uint64_t pBytesSent
Definition: XrdClStream.hh:385
XrdCl::Stream::Disconnected
@ Disconnected
Not connected.
Definition: XrdClStream.hh:56
XrdCl::Stream::pOnDataConnJob
std::shared_ptr< Job > pOnDataConnJob
Definition: XrdClStream.hh:391
XrdCl::Stream::ForceError
void ForceError(XRootDStatus status)
Force error.
XrdClUtils.hh
XrdCl::Stream::pIncomingQueue
InQueue * pIncomingQueue
Definition: XrdClStream.hh:360
XrdCl::Stream::HasNetAddr
static bool HasNetAddr(const XrdNetAddr &addr, std::vector< XrdNetAddr > &addresses)
Check if addresses contains given address.
Definition: XrdClStream.hh:283
XrdCl::Stream::RemoveEventHandler
void RemoveEventHandler(ChannelEventHandler *handler)
Remove a channel event handler.
XrdCl::Stream::HandleIncMsgJob::Run
virtual void Run(void *arg)
The job logic.
Definition: XrdClStream.hh:320
XrdClInQueue.hh
XrdCl::Stream::HandleIncMsgJob::HandleIncMsgJob
HandleIncMsgJob(IncomingMsgHandler *handler)
Definition: XrdClStream.hh:318
XrdClStatus.hh
XrdCl::Stream::QueueIncMsgJob::pQueue
InQueue * pQueue
Definition: XrdClStream.hh:309
XrdClURL.hh
XrdCl::Stream::StreamStatus
StreamStatus
Status of the stream.
Definition: XrdClStream.hh:55
XrdCl::Stream::pStreamErrorWindow
uint16_t pStreamErrorWindow
Definition: XrdClStream.hh:364
XrdCl::Stream::QueueIncMsgJob::QueueIncMsgJob
QueueIncMsgJob(InQueue *queue)
Definition: XrdClStream.hh:301
XrdCl::URL
URL representation.
Definition: XrdClURL.hh:31
XrdCl::Stream::Send
XRootDStatus Send(Message *msg, OutgoingMsgHandler *handler, bool stateful, time_t expires)
Queue the message for sending.
XrdSysMutexHelper
Definition: XrdSysPthread.hh:263
XrdCl::Job
Interface for a job to be run by the job manager.
Definition: XrdClJobManager.hh:34
XrdCl::Stream::pUrl
const URL * pUrl
Definition: XrdClStream.hh:352
XrdCl::Stream::RegisterEventHandler
void RegisterEventHandler(ChannelEventHandler *handler)
Register channel event handler.
XrdCl::Stream::pBytesReceived
uint64_t pBytesReceived
Definition: XrdClStream.hh:386
XrdCl::Stream::Connected
@ Connected
Connected.
Definition: XrdClStream.hh:57
XrdCl::Stream::OnError
void OnError(uint16_t subStream, XRootDStatus status)
On error.
XrdCl::ChannelEventHandler
Channel event handler.
Definition: XrdClPostMasterInterfaces.hh:237
XrdCl::Stream::pTaskManager
TaskManager * pTaskManager
Definition: XrdClStream.hh:357
XrdCl::Stream::QueueIncMsgJob::Run
virtual void Run(void *arg)
The job logic.
Definition: XrdClStream.hh:303
XrdCl::Stream::pLastStreamError
uint32_t pLastStreamError
Definition: XrdClStream.hh:362
XrdCl::Stream::pConnectionRetry
uint16_t pConnectionRetry
Definition: XrdClStream.hh:366
XrdCl::Stream::pConnectionStarted
timeval pConnectionStarted
Definition: XrdClStream.hh:383
XrdCl::Stream::SetTransport
void SetTransport(TransportHandler *transport)
Set the transport.
Definition: XrdClStream.hh:88
XrdCl::TransportHandler
Perform the handshake and the authentication for each physical stream.
Definition: XrdClPostMasterInterfaces.hh:317
XrdNetAddr
Definition: XrdNetAddr.hh:42
XrdCl::Stream::OnReadTimeout
void OnReadTimeout(uint16_t subStream, bool &isBroken)
On read timeout.
XrdCl::Stream::pMutex
XrdSysRecMutex pMutex
Definition: XrdClStream.hh:359
XrdCl::AnyObject
Definition: XrdClAnyObject.hh:33
XrdCl::IncomingMsgHandler::Process
virtual void Process(Message *msg)
Definition: XrdClPostMasterInterfaces.hh:145
XrdCl::Stream
Stream.
Definition: XrdClStream.hh:49
XrdCl::Stream::pConnectionCount
uint16_t pConnectionCount
Definition: XrdClStream.hh:365
XrdCl::Stream::HandleIncMsgJob
Definition: XrdClStream.hh:316
XrdCl::Stream::pChannelData
AnyObject * pChannelData
Definition: XrdClStream.hh:361
XrdCl::Utils::AddressType
AddressType
Address type.
Definition: XrdClUtils.hh:96