xrootd
XrdClParallelOperation.hh
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2017 by European Organization for Nuclear Research (CERN)
3 // Author: Krzysztof Jamrog <krzysztof.piotr.jamrog@cern.ch>,
4 // Michal Simon <michal.simon@cern.ch>
5 //------------------------------------------------------------------------------
6 // This file is part of the XRootD software suite.
7 //
8 // XRootD is free software: you can redistribute it and/or modify
9 // it under the terms of the GNU Lesser General Public License as published by
10 // the Free Software Foundation, either version 3 of the License, or
11 // (at your option) any later version.
12 //
13 // XRootD is distributed in the hope that it will be useful,
14 // but WITHOUT ANY WARRANTY; without even the implied warranty of
15 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 // GNU General Public License for more details.
17 //
18 // You should have received a copy of the GNU Lesser General Public License
19 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
20 //
21 // In applying this licence, CERN does not waive the privileges and immunities
22 // granted to it by virtue of its status as an Intergovernmental Organization
23 // or submit itself to any jurisdiction.
24 //------------------------------------------------------------------------------
25 
26 #ifndef __XRD_CL_PARALLELOPERATION_HH__
27 #define __XRD_CL_PARALLELOPERATION_HH__
28 
29 #include "XrdCl/XrdClOperations.hh"
31 #include "XrdCl/XrdClDefaultEnv.hh"
32 #include "XrdCl/XrdClPostMaster.hh"
33 #include "XrdCl/XrdClJobManager.hh"
34 
35 #include <atomic>
36 #include <condition_variable>
37 #include <mutex>
38 
39 namespace XrdCl
40 {
41 
42  //----------------------------------------------------------------------------
43  // Interface for different execution policies:
44  // - all : all operations need to succeed in order for the parallel
45  // operation to be successful
46  // - any : just one of the operations needs to succeed in order for
47  // the parallel operation to be successful
48  // - some : n (user defined) operations need to succeed in order for
49  // the parallel operation to be successful
50  // - at least : at least n (user defined) operations need to succeed in
51  // order for the parallel operation to be successful (the
52  // user handler will be called only when all operations are
53  // resolved)
54  //
55  // @param status : status returned by one of the aggregated operations
56  //
57  // @return : true if the status should be passed to the user handler,
58  // false otherwise.
59  //----------------------------------------------------------------------------
61  {
62  virtual ~PolicyExecutor()
63  {
64  }
65 
66  virtual bool Examine( const XrdCl::XRootDStatus &status ) = 0;
67  };
68 
69  //----------------------------------------------------------------------------
75  //----------------------------------------------------------------------------
76  template<bool HasHndl>
77  class ParallelOperation: public ConcreteOperation<ParallelOperation, HasHndl, Resp<void>>
78  {
79  template<bool> friend class ParallelOperation;
80 
81  public:
82 
83  //------------------------------------------------------------------------
85  //------------------------------------------------------------------------
86  template<bool from>
88  ConcreteOperation<ParallelOperation, HasHndl, Resp<void>>( std::move( obj ) ),
89  pipelines( std::move( obj.pipelines ) ),
90  policy( std::move( obj.policy ) )
91  {
92  }
93 
94  //------------------------------------------------------------------------
100  //------------------------------------------------------------------------
101  template<class Container>
102  ParallelOperation( Container &&container )
103  {
104  static_assert( !HasHndl, "Constructor is available only operation without handler");
105 
106  pipelines.reserve( container.size() );
107  auto begin = std::make_move_iterator( container.begin() );
108  auto end = std::make_move_iterator( container.end() );
109  std::copy( begin, end, std::back_inserter( pipelines ) );
110  container.clear(); // there's junk inside so we clear it
111  }
112 
114  {
115  }
116 
117  //------------------------------------------------------------------------
119  //------------------------------------------------------------------------
120  std::string ToString()
121  {
122  std::ostringstream oss;
123  oss << "Parallel(";
124  for( size_t i = 0; i < pipelines.size(); i++ )
125  {
126  oss << pipelines[i]->ToString();
127  if( i + 1 != pipelines.size() )
128  {
129  oss << " && ";
130  }
131  }
132  oss << ")";
133  return oss.str();
134  }
135 
136  //------------------------------------------------------------------------
141  //------------------------------------------------------------------------
143  {
144  policy.reset( new AllPolicy() );
145  return std::move( *this );
146  }
147 
148  //------------------------------------------------------------------------
153  //------------------------------------------------------------------------
155  {
156  policy.reset( new AnyPolicy( pipelines.size() ) );
157  return std::move( *this );
158  }
159 
160  //------------------------------------------------------------------------
161  // Set policy to `Some`
165  //------------------------------------------------------------------------
166  ParallelOperation<HasHndl> Some( size_t threshold )
167  {
168  policy.reset( new SomePolicy( pipelines.size(), threshold ) );
169  return std::move( *this );
170  }
171 
172  //------------------------------------------------------------------------
178  //------------------------------------------------------------------------
180  {
181  policy.reset( new AtLeastPolicy( pipelines.size(), threshold ) );
182  return std::move( *this );
183  }
184 
185  private:
186 
187  //------------------------------------------------------------------------
192  //------------------------------------------------------------------------
193  struct AllPolicy : public PolicyExecutor
194  {
195  bool Examine( const XrdCl::XRootDStatus &status )
196  {
197  if( status.IsOK() ) return false;
198  // we require all request to succeed
199  return true;
200  }
201  };
202 
203  //------------------------------------------------------------------------
208  //------------------------------------------------------------------------
209  struct AnyPolicy : public PolicyExecutor
210  {
211  AnyPolicy( size_t size) : cnt( size )
212  {
213  }
214 
215  bool Examine( const XrdCl::XRootDStatus &status )
216  {
217  // decrement the counter
218  size_t nb = cnt.fetch_sub( 1 );
219  // we require just one operation to be successful
220  if( status.IsOK() ) return true;
221  // lets see if this is the last one?
222  if( nb == 1 ) return true;
223  // we still have a chance there will be one that is successful
224  return false;
225  }
226 
227  private:
228  std::atomic<size_t> cnt;
229  };
230 
231  //------------------------------------------------------------------------
236  //------------------------------------------------------------------------
238  {
239  SomePolicy( size_t size, size_t threshold ) : failed( 0 ), succeeded( 0 ),
241  {
242  }
243 
244  bool Examine( const XrdCl::XRootDStatus &status )
245  {
246  if( status.IsOK() )
247  {
248  size_t s = succeeded.fetch_add( 1 );
249  if( s + 1 == threshold ) return true; // we reached the threshold
250  // we are not yet there
251  return false;
252  }
253  size_t f = failed.fetch_add( 1 );
254  // did we dropped bellow the threshold
255  if( f == size - threshold ) return true;
256  // we still have a chance there will be enough of successful operations
257  return false;
258  }
259 
260  private:
261  std::atomic<size_t> failed;
262  std::atomic<size_t> succeeded;
263  const size_t threshold;
264  const size_t size;
265  };
266 
267  //------------------------------------------------------------------------
273  //------------------------------------------------------------------------
275  {
276  AtLeastPolicy( size_t size, size_t threshold ) : failed_cnt( 0 ),
277  failed_threshold( size - threshold )
278  {
279  }
280 
281  bool Examine( const XrdCl::XRootDStatus &status )
282  {
283  // although we might have the minimum to succeed we wait for the rest
284  if( status.IsOK() ) return false;
285  size_t nb = failed_cnt.fetch_add( 1 );
286  if( nb == failed_threshold ) return true; // we dropped bellow the threshold
287  // we still have a chance there will be enough of successful operations
288  return false;
289  }
290 
291  private:
292  std::atomic<size_t> failed_cnt;
293  const size_t failed_threshold;
294  };
295 
296  //------------------------------------------------------------------------
298  //------------------------------------------------------------------------
299  struct barrier_t
300  {
301  barrier_t() : on( true ) { }
302 
303  void wait()
304  {
305  std::unique_lock<std::mutex> lck( mtx );
306  if( on ) cv.wait( lck );
307  }
308 
309  void lift()
310  {
311  on = false;
312  cv.notify_all();
313  }
314 
315  private:
316  std::condition_variable cv;
317  std::mutex mtx;
318  bool on;
319  };
320 
321  //------------------------------------------------------------------------
326  //------------------------------------------------------------------------
327  struct Ctx
328  {
329  //----------------------------------------------------------------------
333  //----------------------------------------------------------------------
335  policy( policy )
336  {
337  }
338 
339  //----------------------------------------------------------------------
341  //----------------------------------------------------------------------
343  {
344  Handle( XRootDStatus() );
345  }
346 
347  //----------------------------------------------------------------------
352  //----------------------------------------------------------------------
353  inline void Examine( const XRootDStatus &st )
354  {
355  if( policy->Examine( st ) )
356  Handle( st );
357  }
358 
359  //----------------------------------------------------------------------
364  //---------------------------------------------------------------------
365  inline void Handle( const XRootDStatus &st )
366  {
367  PipelineHandler* hdlr = handler.exchange( nullptr );
368  if( hdlr )
369  {
370  barrier.wait();
371  hdlr->HandleResponse( new XRootDStatus( st ), nullptr );
372  }
373  }
374 
375  //----------------------------------------------------------------------
377  //----------------------------------------------------------------------
378  std::atomic<PipelineHandler*> handler;
379 
380  //----------------------------------------------------------------------
382  //----------------------------------------------------------------------
383  std::unique_ptr<PolicyExecutor> policy;
384 
385  //----------------------------------------------------------------------
388  //----------------------------------------------------------------------
390  };
391 
392  //------------------------------------------------------------------------
394  //------------------------------------------------------------------------
395  struct PipelineEnd : public Job
396  {
397  //----------------------------------------------------------------------
398  // Constructor
399  //----------------------------------------------------------------------
400  PipelineEnd( std::shared_ptr<Ctx> &ctx,
401  const XrdCl::XRootDStatus &st ) : ctx( ctx ), st( st )
402  {
403  }
404 
405  //----------------------------------------------------------------------
406  // Run Ctx::Examine in the thread-pool
407  //----------------------------------------------------------------------
408  void Run( void* )
409  {
410  ctx->Examine( st );
411  delete this;
412  }
413 
414  private:
415  std::shared_ptr<Ctx> ctx; //< ParallelOperaion context
416  XrdCl::XRootDStatus st; //< final status of the ParallelOperation
417  };
418 
419  //------------------------------------------------------------------------
421  //------------------------------------------------------------------------
422  inline static
423  void Schedule( std::shared_ptr<Ctx> &ctx, const XrdCl::XRootDStatus &st)
424  {
426  PipelineEnd *end = new PipelineEnd( ctx, st );
427  mgr->QueueJob( end, nullptr );
428  }
429 
430  //------------------------------------------------------------------------
436  //------------------------------------------------------------------------
437  XRootDStatus RunImpl( PipelineHandler *handler, uint16_t pipelineTimeout )
438  {
439  // make sure we have a valid policy for the parallel operation
440  if( !policy ) policy.reset( new AllPolicy() );
441 
442  std::shared_ptr<Ctx> ctx =
443  std::make_shared<Ctx>( handler, policy.release() );
444 
445  uint16_t timeout = pipelineTimeout < this->timeout ?
446  pipelineTimeout : this->timeout;
447 
448  for( size_t i = 0; i < pipelines.size(); ++i )
449  {
450  if( !pipelines[i] ) continue;
451  pipelines[i].Run( timeout,
452  [ctx]( const XRootDStatus &st ) mutable { Schedule( ctx, st ); } );
453  }
454 
455  ctx->barrier.lift();
456  return XRootDStatus();
457  }
458 
459  std::vector<Pipeline> pipelines;
460  std::unique_ptr<PolicyExecutor> policy;
461  };
462 
463  //----------------------------------------------------------------------------
465  //----------------------------------------------------------------------------
466  template<class Container>
467  inline ParallelOperation<false> Parallel( Container &&container )
468  {
469  return ParallelOperation<false>( container );
470  }
471 
472  //----------------------------------------------------------------------------
474  //----------------------------------------------------------------------------
475  inline void PipesToVec( std::vector<Pipeline>& )
476  {
477  // base case
478  }
479 
480  //----------------------------------------------------------------------------
481  // Declare PipesToVec (we need to do declare those functions ahead of
482  // definitions, as they may call each other.
483  //----------------------------------------------------------------------------
484  template<typename ... Others>
485  inline void PipesToVec( std::vector<Pipeline> &v, Operation<false> &operation,
486  Others&... others );
487 
488  template<typename ... Others>
489  inline void PipesToVec( std::vector<Pipeline> &v, Operation<true> &operation,
490  Others&... others );
491 
492  template<typename ... Others>
493  inline void PipesToVec( std::vector<Pipeline> &v, Pipeline &pipeline,
494  Others&... others );
495 
496  //----------------------------------------------------------------------------
497  // Define PipesToVec
498  //----------------------------------------------------------------------------
499  template<typename ... Others>
500  void PipesToVec( std::vector<Pipeline> &v, Operation<false> &operation,
501  Others&... others )
502  {
503  v.emplace_back( operation );
504  PipesToVec( v, others... );
505  }
506 
507  template<typename ... Others>
508  void PipesToVec( std::vector<Pipeline> &v, Operation<true> &operation,
509  Others&... others )
510  {
511  v.emplace_back( operation );
512  PipesToVec( v, others... );
513  }
514 
515  template<typename ... Others>
516  void PipesToVec( std::vector<Pipeline> &v, Pipeline &pipeline,
517  Others&... others )
518  {
519  v.emplace_back( std::move( pipeline ) );
520  PipesToVec( v, others... );
521  }
522 
523  //----------------------------------------------------------------------------
528  //----------------------------------------------------------------------------
529  template<typename ... Operations>
530  inline ParallelOperation<false> Parallel( Operations&& ... operations )
531  {
532  constexpr size_t size = sizeof...( operations );
533  std::vector<Pipeline> v;
534  v.reserve( size );
535  PipesToVec( v, operations... );
536  return Parallel( v );
537  }
538 }
539 
540 #endif // __XRD_CL_OPERATIONS_HH__
static void Schedule(std::shared_ptr< Ctx > &ctx, const XrdCl::XRootDStatus &st)
Schedule Ctx::Examine to be executed in the client thread-pool.
Definition: XrdClParallelOperation.hh:423
A synchronized queue.
Definition: XrdClJobManager.hh:50
void PipesToVec(std::vector< Pipeline > &)
Helper function for converting parameter pack into a vector.
Definition: XrdClParallelOperation.hh:475
virtual bool Examine(const XrdCl::XRootDStatus &status)=0
barrier_t barrier
Definition: XrdClParallelOperation.hh:389
std::atomic< size_t > succeeded
Definition: XrdClParallelOperation.hh:262
SomePolicy(size_t size, size_t threshold)
Definition: XrdClParallelOperation.hh:239
XrdCl::XRootDStatus st
Definition: XrdClParallelOperation.hh:416
Definition: XrdClParallelOperation.hh:193
A wait barrier helper class.
Definition: XrdClParallelOperation.hh:299
AtLeastPolicy(size_t size, size_t threshold)
Definition: XrdClParallelOperation.hh:276
std::string ToString()
Definition: XrdClParallelOperation.hh:120
ParallelOperation< HasHndl > All()
Definition: XrdClParallelOperation.hh:142
XRootDStatus RunImpl(PipelineHandler *handler, uint16_t pipelineTimeout)
Definition: XrdClParallelOperation.hh:437
uint16_t timeout
Operation timeout.
Definition: XrdClOperations.hh:746
bool IsOK() const
We&#39;re fine.
Definition: XrdClStatus.hh:120
ParallelOperation< HasHndl > Any()
Definition: XrdClParallelOperation.hh:154
const size_t threshold
Definition: XrdClParallelOperation.hh:263
std::mutex mtx
Definition: XrdClParallelOperation.hh:317
bool Examine(const XrdCl::XRootDStatus &status)
Definition: XrdClParallelOperation.hh:215
std::condition_variable cv
Definition: XrdClParallelOperation.hh:316
bool Examine(const XrdCl::XRootDStatus &status)
Definition: XrdClParallelOperation.hh:281
The thread-pool job for schedule Ctx::Examine.
Definition: XrdClParallelOperation.hh:395
void QueueJob(Job *job, void *arg=0)
Add a job to be run.
Definition: XrdClJobManager.hh:92
void Run(void *)
The job logic.
Definition: XrdClParallelOperation.hh:408
Definition: XrdClParallelOperation.hh:274
Definition: XrdClOperationHandlers.hh:623
Definition: XrdClParallelOperation.hh:237
std::atomic< size_t > cnt
Definition: XrdClParallelOperation.hh:228
void Handle(const XRootDStatus &st)
Definition: XrdClParallelOperation.hh:365
std::unique_ptr< PolicyExecutor > policy
Policy defining when the user handler should be called.
Definition: XrdClParallelOperation.hh:383
ParallelOperation(ParallelOperation< from > &&obj)
Constructor: copy-move a ParallelOperation in different state.
Definition: XrdClParallelOperation.hh:87
const size_t size
Definition: XrdClParallelOperation.hh:264
barrier_t()
Definition: XrdClParallelOperation.hh:301
AnyPolicy(size_t size)
Definition: XrdClParallelOperation.hh:211
bool Examine(const XrdCl::XRootDStatus &status)
Definition: XrdClParallelOperation.hh:195
~ParallelOperation()
Definition: XrdClParallelOperation.hh:113
Request status.
Definition: XrdClXRootDResponses.hh:214
Definition: XrdClAnyObject.hh:25
Definition: XrdClOperations.hh:44
PipelineEnd(std::shared_ptr< Ctx > &ctx, const XrdCl::XRootDStatus &st)
Definition: XrdClParallelOperation.hh:400
ParallelOperation< HasHndl > Some(size_t threshold)
Definition: XrdClParallelOperation.hh:166
Ctx(PipelineHandler *handler, PolicyExecutor *policy)
Definition: XrdClParallelOperation.hh:334
bool Examine(const XrdCl::XRootDStatus &status)
Definition: XrdClParallelOperation.hh:244
std::atomic< PipelineHandler * > handler
PipelineHandler of the ParallelOperation.
Definition: XrdClParallelOperation.hh:378
Definition: XrdClOperations.hh:58
std::vector< Pipeline > pipelines
Definition: XrdClParallelOperation.hh:459
ParallelOperation(Container &&container)
Definition: XrdClParallelOperation.hh:102
bool on
Definition: XrdClParallelOperation.hh:318
ParallelOperation< false > Parallel(Container &&container)
Factory function for creating parallel operation from a vector.
Definition: XrdClParallelOperation.hh:467
virtual ~PolicyExecutor()
Definition: XrdClParallelOperation.hh:62
void HandleResponse(XRootDStatus *status, AnyObject *response)
Callback function.
void wait()
Definition: XrdClParallelOperation.hh:303
~Ctx()
Destructor.
Definition: XrdClParallelOperation.hh:342
ParallelOperation< HasHndl > AtLeast(size_t threshold)
Definition: XrdClParallelOperation.hh:179
std::shared_ptr< Ctx > ctx
Definition: XrdClParallelOperation.hh:415
JobManager * GetJobManager()
Get the job manager object user by the post master.
Definition: XrdClParallelOperation.hh:209
Definition: XrdClParallelOperation.hh:60
std::atomic< size_t > failed_cnt
Definition: XrdClParallelOperation.hh:292
void Examine(const XRootDStatus &st)
Definition: XrdClParallelOperation.hh:353
static PostMaster * GetPostMaster()
Get default post master.
std::atomic< size_t > failed
Definition: XrdClParallelOperation.hh:261
Interface for a job to be run by the job manager.
Definition: XrdClJobManager.hh:33
Definition: XrdClOperations.hh:319
Definition: XrdClParallelOperation.hh:77
std::unique_ptr< PolicyExecutor > policy
Definition: XrdClParallelOperation.hh:460
Definition: XrdClParallelOperation.hh:327
std::unique_ptr< PipelineHandler > handler
Operation handler.
Definition: XrdClOperations.hh:304
void lift()
Definition: XrdClParallelOperation.hh:309
const size_t failed_threshold
Definition: XrdClParallelOperation.hh:293
Definition: XrdClOperations.hh:521