22 #include <gtest/gtest.h> 26 # include <pthread_np.h> 28 #include <core/threading/barrier.h> 29 #include <core/threading/mutex.h> 30 #include <core/threading/mutex_locker.h> 31 #include <core/threading/wait_condition.h> 32 #include <core/utils/refptr.h> 33 #include <libs/syncpoint/exceptions.h> 34 #include <libs/syncpoint/syncpoint.h> 35 #include <libs/syncpoint/syncpoint_manager.h> 36 #include <logging/cache.h> 37 #include <logging/multi.h> 105 logger_->add_logger(cache_logger_);
108 pthread_attr_init(&attrs);
116 pthread_attr_destroy(&attrs);
150 ASSERT_TRUE(*sp1 != NULL);
158 ASSERT_NE(*sp1, *sp2);
160 ASSERT_EQ(**sp1, **sp2);
165 ASSERT_LT(**sp1, **sp3);
166 ASSERT_FALSE(**sp3 < **sp1);
167 ASSERT_FALSE(**sp1 < **sp2);
168 ASSERT_FALSE(**sp2 < **sp1);
175 pair<set<RefPtr<SyncPoint>>::iterator,
bool> ret;
178 ret = sp_set.insert(sp1);
179 ASSERT_TRUE(ret.second);
180 ASSERT_EQ(sp1->get_identifier(), (*(ret.first))->get_identifier());
183 ret = sp_set.insert(sp3);
184 ASSERT_TRUE(ret.second);
185 ASSERT_EQ(sp3->get_identifier(), (*(ret.first))->get_identifier());
188 ret = sp_set.insert(sp1);
189 ASSERT_FALSE(ret.second);
190 ASSERT_EQ(sp1->get_identifier(), (*(ret.first))->get_identifier());
193 ret = sp_set.insert(sp2);
194 ASSERT_FALSE(ret.second);
195 ASSERT_EQ(sp2->get_identifier(), (*(ret.first))->get_identifier());
200 ASSERT_EQ(0u, manager->get_syncpoints().size());
201 manager->get_syncpoint(
"test",
"/test/1");
202 ASSERT_EQ(3u, manager->get_syncpoints().size());
205 manager->get_syncpoint(
"test2",
"/test/2");
206 ASSERT_EQ(4u, manager->get_syncpoints().size());
211 manager->get_syncpoint(
"test3",
"/test/1");
212 ASSERT_EQ(4u, manager->get_syncpoints().size());
224 ASSERT_NO_THROW(manager->get_syncpoint(
"component 1",
"/test"));
225 ASSERT_NO_THROW(manager->get_syncpoint(
"component 2",
"/test"));
226 ASSERT_NO_THROW(manager->get_syncpoint(
"component 3",
"/test"));
235 string comp =
"component";
236 string id =
"/test/sp1";
240 for (set<
RefPtr<SyncPoint>>::const_iterator sp_it = syncpoints.begin(); sp_it != syncpoints.end();
242 EXPECT_EQ(1, (*sp_it)->get_watchers().count(comp))
243 <<
"for component '" << comp <<
"' and SyncPoint '" << (*sp_it)->get_identifier() <<
"'";
245 manager->release_syncpoint(comp, sp);
246 for (set<
RefPtr<SyncPoint>>::const_iterator sp_it = syncpoints.begin(); sp_it != syncpoints.end();
248 EXPECT_EQ(0, (*sp_it)->get_watchers().count(comp))
249 <<
"for component '" << comp <<
"' and SyncPoint '" << (*sp_it)->get_identifier() <<
"'";
251 ASSERT_NO_THROW(manager->get_syncpoint(comp,
id));
262 EXPECT_NO_THROW(sp1 =
new SyncPoint(
"/", NULL));
269 ASSERT_THROW(invalid_sp = manager->get_syncpoint(
"",
"/test/sp1"),
273 ASSERT_THROW(invalid_sp = manager->get_syncpoint(
"waiter",
""),
275 ASSERT_THROW(invalid_sp = manager->get_syncpoint(
"waiter",
"invalid"),
281 string comp =
"component1";
282 string id =
"/test/sp1";
285 set<RefPtr<SyncPoint>>::iterator sp_test_it =
287 set<RefPtr<SyncPoint>>::iterator sp_root_it =
289 ASSERT_NE(syncpoints.end(), sp_test_it);
290 ASSERT_NE(syncpoints.end(), sp_root_it);
293 EXPECT_EQ(1, syncpoints.count(sp_test));
294 EXPECT_EQ(1, syncpoints.count(sp_root));
296 EXPECT_EQ(1, sp_test->get_watchers().count(comp));
297 EXPECT_EQ(0, sp_test->get_watchers().count(
id));
300 EXPECT_EQ(0, sp_root->
get_watchers().count(sp_test->get_identifier()));
302 manager->release_syncpoint(comp, sp);
303 EXPECT_EQ(0, sp_test->get_watchers().count(
id));
308 string comp =
"component1";
309 string sp1_id =
"/test/sp1";
310 string sp2_id =
"/test/sp2";
318 << comp <<
" is not registered for " << sp1->
get_identifier() <<
", but should be!";
320 << comp <<
" is not registered for " << sp2->
get_identifier() <<
", but should be!";
322 << comp <<
" is not registered for " << predecessor->
get_identifier() <<
", but should be!";
324 manager->release_syncpoint(comp, sp1);
327 << comp <<
" is not registered for " << predecessor->
get_identifier() <<
", but should be!";
330 enum ThreadStatus { PENDING, RUNNING, FINISHED };
346 string component =
"";
348 uint timeout_sec = 0;
350 uint timeout_nsec = 0;
370 const int wait_time_us = 1000;
371 for (uint i = 0; i < (sec * pow(10, 9) + nanosec) / (wait_time_us * pow(10, 3)); i++) {
375 usleep(wait_time_us);
385 if (params->
status == FINISHED) {
394 start_waiter_thread(
void *data)
409 params->
status = FINISHED;
425 pthread_create(&thread1, &attrs, start_waiter_thread, ¶ms);
426 wait_for_running(¶ms);
428 pthread_cancel(thread1);
429 pthread_join(thread1, NULL);
438 uint num_threads = 50;
439 pthread_t threads[num_threads];
441 string sp_identifier =
"/test/sp1";
442 for (uint i = 0; i < num_threads; i++) {
444 params[i]->
component =
"component " + to_string(i);
449 pthread_create(&threads[i], &attrs, start_waiter_thread, params[i]);
451 ASSERT_LE(manager->get_syncpoints().size(), 3u);
454 for (uint i = 0; i < num_threads; i++) {
455 pthread_join(threads[i], NULL);
466 uint num_threads = 50;
467 uint num_wait_calls = 10;
468 pthread_t threads[num_threads];
470 string sp_identifier =
"/test/sp1";
471 for (uint i = 0; i < num_threads; i++) {
473 params[i]->
component =
"component " + to_string(i);
478 pthread_create(&threads[i], &attrs, start_waiter_thread, params[i]);
480 ASSERT_LE(manager->get_syncpoints().size(), 3u);
483 for (uint i = 0; i < num_threads; i++) {
484 EXPECT_TRUE(wait_for_running(params[i]));
486 for (uint i = 0; i < num_threads; i++) {
487 pthread_cancel(threads[i]);
488 ASSERT_EQ(0, pthread_join(threads[i], NULL));
498 uint num_threads = 10;
499 uint num_wait_calls = 5;
500 pthread_t threads[num_threads];
502 string sp_identifier =
"/test/sp1";
503 for (uint i = 0; i < num_threads; i++) {
505 params[i]->
component =
"component " + to_string(i);
510 pthread_create(&threads[i], &attrs, start_waiter_thread, params[i]);
514 for (uint i = 0; i < num_threads; i++) {
515 EXPECT_TRUE(wait_for_running(params[i]));
518 string component =
"emitter";
521 for (uint i = 0; i < num_wait_calls; i++) {
526 for (uint i = 0; i < num_threads; i++) {
527 ASSERT_TRUE(wait_for_finished(params[i]));
528 pthread_join(threads[i], NULL);
538 uint num_threads = 50;
539 pthread_t threads[num_threads];
541 for (uint i = 0; i < num_threads; i++) {
543 params[i]->
component =
"component " + to_string(i);
548 pthread_create(&threads[i], &attrs, start_waiter_thread, params[i]);
551 for (uint i = 0; i < num_threads; i++) {
552 EXPECT_TRUE(wait_for_running(params[i]));
555 for (uint i = 0; i < num_threads; i++) {
556 EXPECT_EQ(RUNNING, params[i]->status);
557 pthread_cancel(threads[i]);
558 ASSERT_EQ(0, pthread_join(threads[i], NULL));
571 vector<string> identifiers = {
"/test/topic",
"/test",
"/",
"/other/topic"};
572 uint num_threads = identifiers.size();
573 pthread_t threads[num_threads];
575 for (uint i = 0; i < num_threads; i++) {
577 params[i]->
component =
"component " + to_string(i);
582 pthread_create(&threads[i], &attrs, start_waiter_thread, params[i]);
585 for (uint i = 0; i < num_threads; i++) {
586 EXPECT_TRUE(wait_for_running(params[i]));
593 for (uint i = 0; i < num_threads - 1; i++) {
594 ASSERT_TRUE(wait_for_finished(params[i]));
595 pthread_join(threads[i], NULL);
600 pthread_t last_thread = threads[num_threads - 1];
601 EXPECT_FALSE(wait_for_finished(params[num_threads - 1], 0, pow(10, 6)));
602 pthread_cancel(last_thread);
603 ASSERT_EQ(0, pthread_join(last_thread, NULL));
609 string component =
"emitter";
618 string component =
"emitter";
626 start_barrier_emitter_thread(
void *data)
629 string component =
"emitter " + to_string(params->
thread_nr);
649 : identifier_(identifier), manager_(manager)
658 barrier_->unregister_emitter(identifier_);
659 manager_->release_syncpoint(identifier_, barrier_);
666 barrier_->emit(identifier_);
678 string barrier_id =
"/test/barrier";
680 const uint num_waiter_threads = 1;
681 const uint num_wait_calls = 1;
682 pthread_t waiter_threads[num_waiter_threads];
684 for (uint i = 0; i < num_waiter_threads; i++) {
686 params[i]->
type = SyncPoint::WAIT_FOR_ALL;
687 params[i]->
component =
"component " + to_string(i);
692 pthread_create(&waiter_threads[i], &attrs, start_waiter_thread, params[i]);
694 for (uint i = 0; i < num_waiter_threads; i++) {
695 ASSERT_TRUE(wait_for_finished(params[i]));
696 pthread_join(waiter_threads[i], NULL);
712 string barrier_id =
"/test/barrier";
713 Emitter em1(
"emitter 1", barrier_id, manager);
714 Emitter em2(
"emitter 2", barrier_id, manager);
718 const uint num_waiter_threads = 50;
719 const uint num_wait_calls = 1;
720 pthread_t waiter_threads[num_waiter_threads];
722 for (uint i = 0; i < num_waiter_threads; i++) {
724 params[i]->
component =
"component " + to_string(i);
725 params[i]->
type = SyncPoint::WAIT_FOR_ALL;
730 pthread_create(&waiter_threads[i], &attrs, start_waiter_thread, params[i]);
733 for (uint i = 0; i < num_waiter_threads; i++) {
734 EXPECT_TRUE(wait_for_running(params[i]));
739 for (uint i = 0; i < num_waiter_threads; i++) {
740 EXPECT_EQ(RUNNING, params[i]->status);
746 for (uint i = 0; i < num_waiter_threads; i++) {
747 ASSERT_TRUE(wait_for_finished(params[i]));
748 pthread_join(waiter_threads[i], NULL);
758 string barrier1_id =
"/test/barrier1";
759 string barrier2_id =
"/test/barrier2";
760 Emitter em1(
"em1", barrier1_id, manager);
761 Emitter em2(
"em2", barrier2_id, manager);
767 const uint num_waiter_threads = 50;
768 const uint num_wait_calls = 1;
769 pthread_t waiter_threads1[num_waiter_threads];
771 for (uint i = 0; i < num_waiter_threads; i++) {
773 params1[i]->
component =
"component " + to_string(i);
774 params1[i]->
type = SyncPoint::WAIT_FOR_ALL;
779 pthread_create(&waiter_threads1[i], &attrs, start_waiter_thread, params1[i]);
782 pthread_t waiter_threads2[num_waiter_threads];
784 for (uint i = 0; i < num_waiter_threads; i++) {
786 params2[i]->
component =
"component " + to_string(i);
787 params2[i]->
type = SyncPoint::WAIT_FOR_ALL;
789 params2[i]->
thread_nr = num_waiter_threads + i;
792 pthread_create(&waiter_threads2[i], &attrs, start_waiter_thread, params2[i]);
795 for (uint i = 0; i < num_waiter_threads; i++) {
796 EXPECT_TRUE(wait_for_running(params1[i]));
799 for (uint i = 0; i < num_waiter_threads; i++) {
800 EXPECT_TRUE(wait_for_running(params2[i]));
805 for (uint i = 0; i < num_waiter_threads; i++) {
806 ASSERT_TRUE(wait_for_finished(params1[i]));
807 pthread_join(waiter_threads1[i], NULL);
811 for (uint i = 0; i < num_waiter_threads; i++) {
812 EXPECT_EQ(RUNNING, params2[i]->status);
817 for (uint i = 0; i < num_waiter_threads; i++) {
818 ASSERT_TRUE(wait_for_finished(params2[i]));
819 pthread_join(waiter_threads2[i], NULL);
832 Emitter em1(
"emitter 1",
"/test/topic/b1", manager);
833 Emitter em2(
"emitter 2",
"/test/topic/b2", manager);
834 Emitter em3(
"emitter 3",
"/other/topic", manager);
836 vector<string> identifiers = {
"/test/topic",
"/test",
"/",
"/other/topic"};
837 uint num_threads = identifiers.size();
838 pthread_t threads[num_threads];
841 for (uint i = 0; i < num_threads; i++) {
843 params[i]->
component =
"component " + to_string(i);
844 params[i]->
type = SyncPoint::WAIT_FOR_ALL;
850 pthread_create(&threads[i], &attrs, start_waiter_thread, params[i]);
856 for (uint i = 0; i < num_threads; i++) {
857 EXPECT_TRUE(wait_for_running(params[i]));
861 for (uint i = 0; i < num_threads; i++) {
862 ASSERT_EQ(RUNNING, params[i]->status);
866 for (uint i = 0; i < num_threads - 2; i++) {
867 ASSERT_TRUE(wait_for_finished(params[i]));
868 pthread_join(threads[i], NULL);
872 for (uint i = num_threads - 2; i < num_threads; i++) {
873 EXPECT_EQ(RUNNING, params[i]->status);
874 pthread_cancel(threads[i]);
875 ASSERT_EQ(0, pthread_join(threads[i], NULL));
889 string id_sp1 =
"/test/sp1";
890 string id_sp2 =
"/test/sp2";
891 string id_sp_pred =
"/test";
892 string id_emitter =
"component_emitter";
893 string id_waiter1 =
"component_waiter1";
894 string id_waiter2 =
"component_waiter2";
895 string id_waiter3 =
"component_waiter_on_predecessor";
899 manager->get_syncpoint(id_waiter1, id_sp1);
900 manager->get_syncpoint(id_waiter2, id_sp2);
910 params1->manager = manager;
911 params1->component = id_waiter1;
912 params1->type = SyncPoint::WAIT_FOR_ALL;
913 params1->num_wait_calls = 1;
914 params1->sp_identifier = id_sp1;
919 params2->
type = SyncPoint::WAIT_FOR_ALL;
926 params3->
type = SyncPoint::WAIT_FOR_ALL;
931 pthread_create(&pthread1, &attrs, start_waiter_thread, params1);
933 pthread_create(&pthread2, &attrs, start_waiter_thread, params2);
935 pthread_create(&pthread3, &attrs, start_waiter_thread, params3);
936 EXPECT_TRUE(wait_for_running(params1));
937 EXPECT_TRUE(wait_for_running(params2));
938 EXPECT_TRUE(wait_for_running(params3));
940 sp1->
emit(id_emitter);
942 ASSERT_TRUE(wait_for_finished(params1));
943 ASSERT_FALSE(wait_for_finished(params2, 0, 10 * pow(10, 6)));
946 ASSERT_FALSE(wait_for_finished(params3, 0, 10 * pow(10, 6)));
947 sp2->
emit(id_emitter);
948 ASSERT_TRUE(wait_for_finished(params2, 0, 10 * pow(10, 6)));
949 ASSERT_TRUE(wait_for_finished(params3, 0, 10 * pow(10, 6)));
951 pthread_join(pthread1, NULL);
952 pthread_join(pthread2, NULL);
953 pthread_join(pthread3, NULL);
960 pthread_create(&pthread1, &attrs, start_waiter_thread, params1);
961 pthread_create(&pthread2, &attrs, start_waiter_thread, params2);
962 pthread_create(&pthread3, &attrs, start_waiter_thread, params3);
964 ASSERT_TRUE(wait_for_running(params1));
965 ASSERT_TRUE(wait_for_running(params3));
967 ASSERT_FALSE(wait_for_finished(params1, 0, 10 * pow(10, 6)));
968 ASSERT_TRUE(wait_for_finished(params2));
969 ASSERT_FALSE(wait_for_finished(params3, 0, 10 * pow(10, 6)));
971 sp1->
emit(id_emitter);
972 ASSERT_TRUE(wait_for_finished(params1));
973 ASSERT_TRUE(wait_for_finished(params3));
974 pthread_join(pthread1, NULL);
975 pthread_join(pthread2, NULL);
976 pthread_join(pthread3, NULL);
987 Emitter em1(
"em1",
"/barrier", manager);
989 Emitter em2(
"em2",
"/barrier", manager);
990 EXPECT_NO_THROW(em1.emit());
991 EXPECT_NO_THROW(em1.emit());
1010 params1->
type = SyncPoint::WAIT_FOR_ALL;
1015 pthread_create(&pthread1, &attrs, start_waiter_thread, params1);
1017 EXPECT_FALSE(wait_for_finished(params1, 0, 10 * pow(10, 6)));
1019 sp1->
emit(
"emitter");
1021 EXPECT_FALSE(wait_for_finished(params1, 0, 10 * pow(10, 6)));
1023 sp1->
emit(
"emitter");
1024 EXPECT_FALSE(wait_for_finished(params1, 0, 10 * pow(10, 6)));
1026 sp2->
emit(
"emitter");
1027 ASSERT_TRUE(wait_for_finished(params1));
1028 pthread_join(pthread1, NULL);
1037 manager->get_syncpoint(
"waiter",
"/test/sp1");
1042 params.
type = SyncPoint::WAIT_FOR_ALL;
1048 pthread_create(&thread, NULL, start_waiter_thread, ¶ms);
1049 ASSERT_TRUE(wait_for_finished(¶ms));
1051 ASSERT_GT(cache_logger_->get_messages().size(), 0);
1055 struct emitter_thread_data
1059 std::string sp_name;
1060 atomic<ThreadStatus> status;
1061 Mutex mutex_running;
1063 Mutex mutex_finished;
1070 call_emit(
void *data)
1072 emitter_thread_data *tdata = (emitter_thread_data *)data;
1073 tdata->status = RUNNING;
1074 tdata->mutex_running.lock();
1075 tdata->cond_running.wake_all();
1076 tdata->mutex_running.unlock();
1077 RefPtr<SyncPoint> sp = tdata->manager->get_syncpoint(tdata->name, tdata->sp_name);
1079 sp->
emit(tdata->name);
1080 tdata->status = FINISHED;
1081 tdata->mutex_finished.lock();
1082 tdata->cond_finished.wake_all();
1083 tdata->mutex_finished.unlock();
1094 emitter_thread_data *emitter_params =
new emitter_thread_data();
1095 emitter_params->manager = manager;
1096 emitter_params->name =
"emitter";
1097 emitter_params->sp_name =
"/test";
1098 pthread_create(&thread, NULL, call_emit, (
void *)emitter_params);
1100 emitter_params->mutex_running.lock();
1101 if (emitter_params->status != RUNNING) {
1102 ASSERT_TRUE(emitter_params->cond_running.reltimed_wait(1, 0));
1104 emitter_params->mutex_running.unlock();
1105 emitter_params->mutex_finished.lock();
1106 EXPECT_FALSE(emitter_params->cond_finished.reltimed_wait(0, 100000));
1107 emitter_params->mutex_finished.unlock();
1109 pthread_t waiter_thread;
1111 waiter_params.
manager = manager;
1115 pthread_create(&waiter_thread, NULL, start_waiter_thread, &waiter_params);
1117 emitter_params->mutex_finished.lock();
1118 ASSERT_TRUE(emitter_params->status == FINISHED
1119 || emitter_params->cond_finished.reltimed_wait(1, 0));
1120 emitter_params->mutex_finished.unlock();
1121 pthread_join(thread, NULL);
1122 pthread_join(waiter_thread, NULL);
1123 delete emitter_params;
1128 call_wait_for_all(
void *data)
1144 uint num_emitters = 100;
1145 pthread_t emitter_thread[num_emitters];
1146 emitter_thread_data *params[num_emitters];
1147 for (uint i = 0; i < num_emitters; i++) {
1148 params[i] =
new emitter_thread_data();
1149 params[i]->manager = manager;
1150 string emitter_name =
"emitter" + to_string(i);
1151 params[i]->name = emitter_name;
1152 params[i]->sp_name =
"/test";
1153 pthread_create(&emitter_thread[i], NULL, call_emit, (
void *)params[i]);
1156 for (uint i = 0; i < num_emitters; i++) {
1157 params[i]->mutex_running.lock();
1158 if (params[i]->status != RUNNING) {
1159 ASSERT_TRUE(params[i]->cond_running.reltimed_wait(1, 0));
1161 params[i]->mutex_running.unlock();
1164 pthread_t waiter_thread;
1172 pthread_create(&waiter_thread, &attrs, start_waiter_thread, &
thread_params);
1174 for (uint i = 0; i < num_emitters; i++) {
1175 params[i]->mutex_finished.lock();
1176 ASSERT_TRUE(params[i]->status == FINISHED || params[i]->cond_finished.reltimed_wait(1, 0));
1177 params[i]->mutex_finished.unlock();
1178 pthread_join(emitter_thread[i], NULL);
1183 pthread_join(waiter_thread, NULL);
1197 string sp_identifier =
"/test";
1199 manager->get_syncpoint(
"emitter2", sp_identifier);
1202 uint num_threads = 2;
1203 pthread_t threads[num_threads];
1205 for (uint i = 0; i < num_threads; i++) {
1206 params[i].
component =
"component " + to_string(i);
1208 params[i].
type = SyncPoint::WAIT_FOR_ALL;
1213 pthread_create(&threads[0], &attrs, start_waiter_thread, ¶ms[0]);
1214 ASSERT_FALSE(wait_for_finished(¶ms[0], 0, 10 * pow(10, 6)));
1215 sp->
emit(
"emitter1");
1216 ASSERT_FALSE(wait_for_finished(¶ms[0], 0, 10 * pow(10, 6)));
1217 pthread_create(&threads[1], &attrs, start_waiter_thread, ¶ms[1]);
1218 for (uint i = 0; i < num_threads; i++) {
1219 ASSERT_FALSE(wait_for_finished(¶ms[i], 0, 10 * pow(10, 6)));
1221 sp->
emit(
"emitter2");
1222 for (uint i = 0; i < num_threads; i++) {
1223 ASSERT_TRUE(wait_for_finished(¶ms[i]));
1224 pthread_join(threads[i], NULL);
1236 uint num_threads = 2;
1237 pthread_t threads[num_threads];
1238 string sp_identifier =
"/test";
1240 for (uint i = 0; i < num_threads; i++) {
1241 params[i].
component =
"component " + to_string(i);
1242 params[i].
type = SyncPoint::WAIT_FOR_ALL;
1250 pthread_create(&threads[0], &attrs, start_waiter_thread, ¶ms[0]);
1251 EXPECT_TRUE(wait_for_running(¶ms[0]));
1254 pthread_create(&threads[1], &attrs, start_waiter_thread, ¶ms[1]);
1255 for (uint i = 0; i < num_threads; i++) {
1256 EXPECT_TRUE(wait_for_running(¶ms[i]));
1258 wait_for_finished(¶ms[0], params[0].timeout_sec, params[0].timeout_nsec);
1259 wait_for_finished(¶ms[1], 0, pow(10, 6));
1260 for (uint i = 0; i < num_threads; i++) {
1261 pthread_join(threads[i], NULL);
1275 string sp_identifier =
"/test";
1276 uint num_threads = 2;
1278 pthread_t wait_for_one_thread;
1280 wait_for_one_params.
component =
"wait_for_one";
1281 wait_for_one_params.
type = SyncPoint::WAIT_FOR_ONE;
1282 wait_for_one_params.
manager = manager;
1287 wait_for_one_params.
status = PENDING;
1290 pthread_create(&wait_for_one_thread, &attrs, start_waiter_thread, &wait_for_one_params);
1291 pthread_t threads[num_threads];
1293 for (uint i = 0; i < num_threads; i++) {
1294 params[i].
component =
"component " + to_string(i);
1295 params[i].
type = SyncPoint::WAIT_FOR_ALL;
1303 pthread_create(&threads[i], &attrs, start_waiter_thread, ¶ms[i]);
1306 EXPECT_TRUE(wait_for_running(&wait_for_one_params));
1307 for (uint i = 0; i < num_threads; i++) {
1308 EXPECT_TRUE(wait_for_running(¶ms[i]));
1310 EXPECT_TRUE(wait_for_finished(&wait_for_one_params));
1311 for (uint i = 0; i < num_threads; i++) {
1312 EXPECT_EQ(RUNNING, params[i].status);
1314 for (uint i = 0; i < num_threads; i++) {
1315 EXPECT_TRUE(wait_for_finished(¶ms[i], params[i].timeout_sec, params[i].timeout_nsec));
1316 pthread_join(threads[i], NULL);
1318 pthread_join(wait_for_one_thread, NULL);
1324 pthread_t waiter_thread;
1332 pthread_create(&waiter_thread, &attrs, start_waiter_thread, &
thread_params);
1334 pthread_join(waiter_thread, NULL);
1357 pthread_t waiter_thread;
1366 pthread_create(&waiter_thread, &attrs, start_waiter_thread, &
thread_params);
1369 pthread_cancel(waiter_thread);
1370 pthread_join(waiter_thread, NULL);
1372 manager->release_syncpoint(
"component 1", sp);
1373 sp = manager->get_syncpoint(
"component 1",
"/test");
Invalid identifier used (i.e.
Wait until a given condition holds.
pthread_attr_t attrs
Thread attributes.
The component called release but is still registered as emitter.
virtual ~Emitter()
Destructor.
bool watcher_is_waiting(std::string watcher, WakeupType type) const
Check if the given waiter is currently waiting with the given type.
bool reltimed_wait(unsigned int sec, unsigned int nanosec)
Wait with relative timeout.
Test class for SyncPoint This class tests basic functionality of SyncPoints.
Fawkes library namespace.
void unlock()
Unlock the mutex.
virtual void wait()
Wait for other threads.
RefPtr< SyncPoint > sp1
Syncpoints for testing purposes.
SyncPointManagerTest()
Initialize the test class.
Emitter(string identifier, string syncbarrier, RefPtr< SyncPointManager > manager)
Constructor.
void wake_all()
Wake up all waiting threads.
virtual ~SyncPointManagerTest()
Deinitialize the test class.
virtual void emit(const std::string &component)
send a signal to all waiting threads
WaitCondition cond_finished
WaitCondition to indicate that the thread has finished.
MultiLogger * logger_
Logger used to initialize SyncPoints.
std::set< std::string > get_watchers() const
RefPtr< SyncPoint > get_syncpoint(const std::string &component, const std::string &identifier)
Get a SyncPoint.
Test class for SyncPointManager This class tests basic functionality of the SyncPointManager.
WaitCondition cond_running
WaitCondition to indicate that the thread is running.
std::multiset< std::string > get_emitters() const
virtual void SetUp()
Initialize the test class.
Log through multiple loggers.
atomic< ThreadStatus > status
current status of the thread
CacheLogger * cache_logger_
Cache Logger used for testing.
virtual void TearDown()
Clean up.
WakeupType
Type to define when a thread wakes up after waiting for a SyncPoint.
This class gives access to SyncPoints.
The parameters passed to the threads.
Mutex mutex_finished
Mutex to protect cond_finished.
void emit()
emit the SyncBarrier
struct used for multithreading tests
virtual void reltime_wait_for_all(const std::string &component, uint wait_sec, uint wait_nsec)
Wait for all registered emitters for the given time.
uint num_wait_calls
Number of wait calls the thread should make.
virtual void unregister_emitter(const std::string &component, bool emit_if_pending=true)
unregister as emitter
SyncBarrierTest()
Constructor.
void lock_until_next_wait(const std::string &component)
Lock the SyncPoint for emitters until the specified component does the next wait() call.
SyncPoint::WakeupType type
Wait type.
RefPtr< SyncPointManager > manager
A Pointer to a SyncPointManager.
Invalid component name used (i.e.
virtual void register_emitter(const std::string &component)
register as emitter
uint timeout_sec
timeout in sec
Helper class which registers and emits a given SyncBarrier.
virtual void wait_for_all(const std::string &component)
Wait for all registered emitters.
uint thread_nr
Thread number.
MultiLogger * logger_
Logger for testing.
A component called wait() but is already waiting.
Emit was called on a SyncBarrier but the calling component is not registered as emitter.
Compare sets of syncpoints.
std::string get_identifier() const
Mutex mutex_running
Mutex to protect cond_running.
Barrier * start_barrier
Barrier for startup synchronization.
void lock()
Lock this mutex.
RefPtr< SyncPointManager > manager
SyncPointManager passed to the thread.
Mutex mutual exclusion lock.
uint timeout_nsec
timeout in nsec
string component
Name of the component.
A barrier is a synchronization tool which blocks until a given number of threads have reached the bar...
virtual void wait(const std::string &component, WakeupType=WAIT_FOR_ONE, uint wait_sec=0, uint wait_nsec=0)
wait for the sync point to be emitted by any other component
string sp_identifier
Name of the SyncPoint.