Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
observer_proxy.cpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2005-2019 Intel Corporation
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 
16 
17 
18 
19 */
20 
21 #include "tbb/tbb_config.h"
22 
23 #if __TBB_SCHEDULER_OBSERVER
24 
25 #include "observer_proxy.h"
26 #include "tbb_main.h"
27 #include "governor.h"
28 #include "scheduler.h"
29 #include "arena.h"
30 
31 namespace tbb {
32 namespace internal {
33 
34 padded<observer_list> the_global_observer_list;
35 
36 #if TBB_USE_ASSERT
37 static atomic<int> observer_proxy_count;
38 
39 struct check_observer_proxy_count {
40  ~check_observer_proxy_count() {
41  if( observer_proxy_count!=0 ) {
42  runtime_warning( "Leaked %ld observer_proxy objects\n", long(observer_proxy_count) );
43  }
44  }
45 };
46 
47 static check_observer_proxy_count the_check_observer_proxy_count;
48 #endif /* TBB_USE_ASSERT */
49 
50 #if __TBB_ARENA_OBSERVER
51 interface6::task_scheduler_observer* observer_proxy::get_v6_observer() {
52  if(my_version != 6) return NULL;
53  return static_cast<interface6::task_scheduler_observer*>(my_observer);
54 }
55 #endif
56 
57 #if __TBB_ARENA_OBSERVER
58 bool observer_proxy::is_global() {
59  return !get_v6_observer() || get_v6_observer()->my_context_tag == interface6::task_scheduler_observer::global_tag;
60 }
61 #endif /* __TBB_ARENA_OBSERVER */
62 
63 observer_proxy::observer_proxy( task_scheduler_observer_v3& tso )
64  : my_list(NULL), my_next(NULL), my_prev(NULL), my_observer(&tso)
65 {
66 #if TBB_USE_ASSERT
67  ++observer_proxy_count;
68 #endif /* TBB_USE_ASSERT */
69  // 1 for observer
70  my_ref_count = 1;
71  my_version =
72 #if __TBB_ARENA_OBSERVER
73  load<relaxed>(my_observer->my_busy_count)
75 #endif
76  0;
77  __TBB_ASSERT( my_version >= 6 || !load<relaxed>(my_observer->my_busy_count), NULL );
78 }
79 
80 #if TBB_USE_ASSERT
81 observer_proxy::~observer_proxy () {
82  __TBB_ASSERT( !my_ref_count, "Attempt to destroy proxy still in use" );
83  poison_value(my_ref_count);
84  poison_pointer(my_prev);
85  poison_pointer(my_next);
86  --observer_proxy_count;
87 }
88 #endif /* TBB_USE_ASSERT */
89 
90 template<memory_semantics M, class T, class V>
91 T atomic_fetch_and_store ( T* addr, const V& val ) {
92  return (T)atomic_traits<sizeof(T), M>::fetch_and_store( addr, (T)val );
93 }
94 
95 void observer_list::clear () {
96  __TBB_ASSERT( this != &the_global_observer_list, "Method clear() cannot be used on the list of global observers" );
97  // Though the method will work fine for the empty list, we require the caller
98  // to check for the list emptiness before invoking it to avoid extra overhead.
99  __TBB_ASSERT( !empty(), NULL );
100  {
101  scoped_lock lock(mutex(), /*is_writer=*/true);
102  observer_proxy *next = my_head;
103  while ( observer_proxy *p = next ) {
104  __TBB_ASSERT( p->my_version >= 6, NULL );
105  next = p->my_next;
106  // Both proxy p and observer p->my_observer (if non-null) are guaranteed
107  // to be alive while the list is locked.
108  task_scheduler_observer_v3 *obs = p->my_observer;
109  // Make sure that possible concurrent observer destruction does not
110  // conflict with the proxy list cleanup.
111  if ( !obs || !(p = (observer_proxy*)__TBB_FetchAndStoreW(&obs->my_proxy, 0)) )
112  continue;
113  // accessing 'obs' after detaching of obs->my_proxy leads to the race with observer destruction
114  __TBB_ASSERT( !next || p == next->my_prev, NULL );
115  __TBB_ASSERT( is_alive(p->my_ref_count), "Observer's proxy died prematurely" );
116  __TBB_ASSERT( p->my_ref_count == 1, "Reference for observer is missing" );
117 #if TBB_USE_ASSERT
118  p->my_observer = NULL;
119  p->my_ref_count = 0;
120 #endif /* TBB_USE_ASSERT */
121  remove(p);
122  delete p;
123  }
124  }
125  while( my_head )
126  __TBB_Yield();
127 }
128 
129 void observer_list::insert ( observer_proxy* p ) {
130  scoped_lock lock(mutex(), /*is_writer=*/true);
131  if ( my_head ) {
132  p->my_prev = my_tail;
133  my_tail->my_next = p;
134  }
135  else
136  my_head = p;
137  my_tail = p;
138 }
139 
140 void observer_list::remove ( observer_proxy* p ) {
141  __TBB_ASSERT( my_head, "Attempt to remove an item from an empty list" );
142  __TBB_ASSERT( !my_tail->my_next, "Last item's my_next must be NULL" );
143  if( p == my_tail ) {
144  __TBB_ASSERT( !p->my_next, NULL );
145  my_tail = p->my_prev;
146  }
147  else {
148  __TBB_ASSERT( p->my_next, NULL );
149  p->my_next->my_prev = p->my_prev;
150  }
151  if ( p == my_head ) {
152  __TBB_ASSERT( !p->my_prev, NULL );
153  my_head = p->my_next;
154  }
155  else {
156  __TBB_ASSERT( p->my_prev, NULL );
157  p->my_prev->my_next = p->my_next;
158  }
159  __TBB_ASSERT( (my_head && my_tail) || (!my_head && !my_tail), NULL );
160 }
161 
162 void observer_list::remove_ref( observer_proxy* p ) {
163  int r = p->my_ref_count;
164  __TBB_ASSERT( is_alive(r), NULL );
165  while(r>1) {
166  __TBB_ASSERT( r!=0, NULL );
167  int r_old = p->my_ref_count.compare_and_swap(r-1,r);
168  if( r_old==r ) {
169  // Successfully decremented count.
170  return;
171  }
172  r = r_old;
173  }
174  __TBB_ASSERT( r==1, NULL );
175  // Reference count might go to zero
176  {
177  // Use lock to avoid resurrection by a thread concurrently walking the list
178  observer_list::scoped_lock lock(mutex(), /*is_writer=*/true);
179  r = --p->my_ref_count;
180  if( !r )
181  remove(p);
182  }
183  __TBB_ASSERT( r || !p->my_ref_count, NULL );
184  if( !r )
185  delete p;
186 }
187 
188 void observer_list::do_notify_entry_observers( observer_proxy*& last, bool worker ) {
189  // Pointer p marches though the list from last (exclusively) to the end.
190  observer_proxy *p = last, *prev = p;
191  for(;;) {
192  task_scheduler_observer_v3* tso=NULL;
193  // Hold lock on list only long enough to advance to the next proxy in the list.
194  {
195  scoped_lock lock(mutex(), /*is_writer=*/false);
196  do {
197  if( p ) {
198  // We were already processing the list.
199  if( observer_proxy* q = p->my_next ) {
200  if( p == prev )
201  remove_ref_fast(prev); // sets prev to NULL if successful
202  p = q;
203  }
204  else {
205  // Reached the end of the list.
206  if( p == prev ) {
207  // Keep the reference as we store the 'last' pointer in scheduler
208  __TBB_ASSERT(p->my_ref_count >= 1 + (p->my_observer?1:0), NULL);
209  } else {
210  // The last few proxies were empty
211  __TBB_ASSERT(p->my_ref_count, NULL);
212  ++p->my_ref_count;
213  if( prev ) {
214  lock.release();
215  remove_ref(prev);
216  }
217  }
218  last = p;
219  return;
220  }
221  } else {
222  // Starting pass through the list
223  p = my_head;
224  if( !p )
225  return;
226  }
227  tso = p->my_observer;
228  } while( !tso );
229  ++p->my_ref_count;
230  ++tso->my_busy_count;
231  }
232  __TBB_ASSERT( !prev || p!=prev, NULL );
233  // Release the proxy pinned before p
234  if( prev )
235  remove_ref(prev);
236  // Do not hold any locks on the list while calling user's code.
237  // Do not intercept any exceptions that may escape the callback so that
238  // they are either handled by the TBB scheduler or passed to the debugger.
239  tso->on_scheduler_entry(worker);
240  __TBB_ASSERT(p->my_ref_count, NULL);
241  intptr_t bc = --tso->my_busy_count;
242  __TBB_ASSERT_EX( bc>=0, "my_busy_count underflowed" );
243  prev = p;
244  }
245 }
246 
247 void observer_list::do_notify_exit_observers( observer_proxy* last, bool worker ) {
248  // Pointer p marches though the list from the beginning to last (inclusively).
249  observer_proxy *p = NULL, *prev = NULL;
250  for(;;) {
251  task_scheduler_observer_v3* tso=NULL;
252  // Hold lock on list only long enough to advance to the next proxy in the list.
253  {
254  scoped_lock lock(mutex(), /*is_writer=*/false);
255  do {
256  if( p ) {
257  // We were already processing the list.
258  if( p != last ) {
259  __TBB_ASSERT( p->my_next, "List items before 'last' must have valid my_next pointer" );
260  if( p == prev )
261  remove_ref_fast(prev); // sets prev to NULL if successful
262  p = p->my_next;
263  } else {
264  // remove the reference from the last item
265  remove_ref_fast(p);
266  if( p ) {
267  lock.release();
268  remove_ref(p);
269  }
270  return;
271  }
272  } else {
273  // Starting pass through the list
274  p = my_head;
275  __TBB_ASSERT( p, "Nonzero 'last' must guarantee that the global list is non-empty" );
276  }
277  tso = p->my_observer;
278  } while( !tso );
279  // The item is already refcounted
280  if ( p != last ) // the last is already referenced since entry notification
281  ++p->my_ref_count;
282  ++tso->my_busy_count;
283  }
284  __TBB_ASSERT( !prev || p!=prev, NULL );
285  if( prev )
286  remove_ref(prev);
287  // Do not hold any locks on the list while calling user's code.
288  // Do not intercept any exceptions that may escape the callback so that
289  // they are either handled by the TBB scheduler or passed to the debugger.
290  tso->on_scheduler_exit(worker);
291  __TBB_ASSERT(p->my_ref_count || p == last, NULL);
292  intptr_t bc = --tso->my_busy_count;
293  __TBB_ASSERT_EX( bc>=0, "my_busy_count underflowed" );
294  prev = p;
295  }
296 }
297 
298 void task_scheduler_observer_v3::observe( bool enable ) {
299  if( enable ) {
300  if( !my_proxy ) {
301  my_proxy = new observer_proxy( *this );
302  my_busy_count = 0; // proxy stores versioning information, clear it
303 #if __TBB_ARENA_OBSERVER
304  if ( !my_proxy->is_global() ) {
305  // Local observer activation
306  generic_scheduler* s = governor::local_scheduler_if_initialized();
307  __TBB_ASSERT( my_proxy->get_v6_observer(), NULL );
308  intptr_t tag = my_proxy->get_v6_observer()->my_context_tag;
309  if( tag != interface6::task_scheduler_observer::implicit_tag ) { // explicit arena
310  task_arena *a = reinterpret_cast<task_arena*>(tag);
311  a->initialize();
312  my_proxy->my_list = &a->my_arena->my_observers;
313  } else {
314  if( !(s && s->my_arena) )
317  __TBB_ASSERT( s && s->my_arena, NULL );
318  my_proxy->my_list = &s->my_arena->my_observers;
319  }
320  my_proxy->my_list->insert(my_proxy);
321  // Notify newly activated observer and other pending ones if it belongs to current arena
322  if(s && &s->my_arena->my_observers == my_proxy->my_list )
323  my_proxy->my_list->notify_entry_observers( s->my_last_local_observer, s->is_worker() );
324  } else
325 #endif /* __TBB_ARENA_OBSERVER */
326  {
327  // Obsolete. Global observer activation
330  my_proxy->my_list = &the_global_observer_list;
331  my_proxy->my_list->insert(my_proxy);
332  if( generic_scheduler* s = governor::local_scheduler_if_initialized() ) {
333  // Notify newly created observer of its own thread.
334  // Any other pending observers are notified too.
335  the_global_observer_list.notify_entry_observers( s->my_last_global_observer, s->is_worker() );
336  }
337  }
338  }
339  } else {
340  // Make sure that possible concurrent proxy list cleanup does not conflict
341  // with the observer destruction here.
342  if ( observer_proxy* proxy = (observer_proxy*)__TBB_FetchAndStoreW(&my_proxy, 0) ) {
343  // List destruction should not touch this proxy after we've won the above interlocked exchange.
344  __TBB_ASSERT( proxy->my_observer == this, NULL );
345  __TBB_ASSERT( is_alive(proxy->my_ref_count), "Observer's proxy died prematurely" );
346  __TBB_ASSERT( proxy->my_ref_count >= 1, "reference for observer missing" );
347  observer_list &list = *proxy->my_list;
348  {
349  // Ensure that none of the list walkers relies on observer pointer validity
350  observer_list::scoped_lock lock(list.mutex(), /*is_writer=*/true);
351  proxy->my_observer = NULL;
352  // Proxy may still be held by other threads (to track the last notified observer)
353  if( !--proxy->my_ref_count ) {// nobody can increase it under exclusive lock
354  list.remove(proxy);
355  __TBB_ASSERT( !proxy->my_ref_count, NULL );
356  delete proxy;
357  }
358  }
359  while( my_busy_count ) // other threads are still accessing the callback
360  __TBB_Yield();
361  }
362  }
363 }
364 
365 } // namespace internal
366 } // namespace tbb
367 
368 #endif /* __TBB_SCHEDULER_OBSERVER */
void __TBB_EXPORTED_FUNC runtime_warning(const char *format,...)
Report a runtime warning.
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:169
static const int automatic
Typedef for number of threads that is automatic.
static generic_scheduler * local_scheduler_if_initialized()
Definition: governor.h:136
#define __TBB_Yield()
Definition: ibm_aix51.h:48
void * addr
#define __TBB_ASSERT_EX(predicate, comment)
"Extended" version is useful to suppress warnings if a variable is only used with an assert
Definition: tbb_stddef.h:171
void const char const char int ITT_FORMAT __itt_group_sync p
#define poison_value(g)
auto last(Container &c) -> decltype(begin(c))
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void * lock
static bool initialization_done()
Definition: tbb_main.h:68
The graph class.
observer_proxy * my_proxy
Pointer to the proxy holding this observer.
void DoOneTimeInitializations()
Performs thread-safe lazy one-time general TBB initialization.
Definition: tbb_main.cpp:218
void poison_pointer(T *__TBB_atomic &)
Definition: tbb_stddef.h:309
void const char const char int ITT_FORMAT __itt_group_sync s
void __TBB_EXPORTED_METHOD observe(bool state=true)
Enable or disable observation.
static generic_scheduler * init_scheduler(int num_threads, stack_size_type stack_size, bool auto_init)
Processes scheduler initialization request (possibly nested) in a master thread.
Definition: governor.cpp:176
atomic< intptr_t > my_busy_count
Counter preventing the observer from being destroyed while in use by the scheduler.

Copyright © 2005-2019 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.