SourceXtractorPlusPlus
0.15
Please provide a description of the project.
SEImplementation
src
lib
Measurement
MultithreadedMeasurement.cpp
Go to the documentation of this file.
1
17
/*
18
* MultiThreadedMeasurement.cpp
19
*
20
* Created on: May 23, 2018
21
* Author: mschefer
22
*/
23
24
#include <chrono>
25
#include <
ElementsKernel/Logging.h
>
26
#include <csignal>
27
28
#include "
SEImplementation/Plugin/SourceIDs/SourceID.h
"
29
#include "
SEImplementation/Measurement/MultithreadedMeasurement.h
"
30
31
using namespace
SourceXtractor
;
32
33
static
Elements::Logging
logger
=
Elements::Logging::getLogger
(
"Multithreading"
);
34
35
36
MultithreadedMeasurement::~MultithreadedMeasurement
() {
37
if
(
m_output_thread
->
joinable
()) {
38
m_output_thread
->
join
();
39
}
40
}
41
42
void
MultithreadedMeasurement::startThreads
() {
43
m_output_thread
= Euclid::make_unique<std::thread>(
outputThreadStatic
,
this
);
44
}
45
46
void
MultithreadedMeasurement::waitForThreads
() {
47
m_input_done
=
true
;
48
m_thread_pool
->
block
();
49
m_output_thread
->
join
();
50
logger
.debug() <<
"All worker threads done!"
;
51
}
52
53
void
54
MultithreadedMeasurement::handleMessage
(
const
std::shared_ptr<SourceGroupInterface>
& source_group) {
55
// Force computation of SourceID here, where the order is still deterministic
56
for
(
auto
& source : *source_group) {
57
source.getProperty<
SourceID
>();
58
}
59
60
// Put the new SourceGroup into the input queue
61
auto
order_number =
m_group_counter
;
62
m_thread_pool
->
submit
([
this
, order_number, source_group]() {
63
// Trigger measurements
64
for
(
auto
& source : *source_group) {
65
m_source_to_row
(source);
66
}
67
// Pass to the output thread
68
{
69
std::unique_lock<std::mutex>
output_lock(
m_output_queue_mutex
);
70
m_output_queue
.emplace_back(order_number, source_group);
71
}
72
m_new_output
.
notify_one
();
73
});
74
++
m_group_counter
;
75
}
76
77
void
MultithreadedMeasurement::outputThreadStatic
(
MultithreadedMeasurement
*measurement) {
78
logger
.debug() <<
"Starting output thread"
;
79
try
{
80
measurement->
outputThreadLoop
();
81
}
82
catch
(
const
Elements::Exception
&
e
) {
83
logger
.fatal() <<
"Output thread got an exception!"
;
84
logger
.fatal() <<
e
.what();
85
if
(!measurement->
m_abort_raised
.exchange(
true
)) {
86
logger
.fatal() <<
"Aborting the execution"
;
87
::raise(SIGTERM);
88
}
89
}
90
logger
.debug() <<
"Stopping output thread"
;
91
}
92
93
void
MultithreadedMeasurement::outputThreadLoop
() {
94
while
(
m_thread_pool
->
activeThreads
() > 0) {
95
std::unique_lock<std::mutex>
output_lock(
m_output_queue_mutex
);
96
97
// Wait for something in the output queue
98
if
(
m_output_queue
.empty()) {
99
m_new_output
.
wait_for
(output_lock,
std::chrono::milliseconds
(100));
100
}
101
102
// Process the output queue
103
while
(!
m_output_queue
.empty()) {
104
notifyObservers
(
m_output_queue
.front().second);
105
m_output_queue
.pop_front();
106
}
107
108
if
(
m_input_done
&&
m_thread_pool
->
running
() +
m_thread_pool
->
queued
() == 0 &&
109
m_output_queue
.empty()) {
110
break
;
111
}
112
}
113
}
SourceXtractor::Observable< std::shared_ptr< SourceGroupInterface > >::notifyObservers
void notifyObservers(const std::shared_ptr< SourceGroupInterface > &message) const
Definition:
Observable.h:71
std::shared_ptr< SourceGroupInterface >
Elements::Logging
Euclid::ThreadPool::running
size_t running() const
std::chrono::milliseconds
SourceXtractor::MultithreadedMeasurement::handleMessage
void handleMessage(const std::shared_ptr< SourceGroupInterface > &source_group) override
Definition:
MultithreadedMeasurement.cpp:54
SourceXtractor::MultithreadedMeasurement::outputThreadStatic
static void outputThreadStatic(MultithreadedMeasurement *measurement)
Definition:
MultithreadedMeasurement.cpp:77
SourceXtractor::MultithreadedMeasurement::m_input_done
std::atomic_bool m_input_done
Definition:
MultithreadedMeasurement.h:63
SourceXtractor::MultithreadedMeasurement::m_thread_pool
std::shared_ptr< Euclid::ThreadPool > m_thread_pool
Definition:
MultithreadedMeasurement.h:59
SourceID.h
SourceXtractor::SourceID
Definition:
SourceID.h:33
SourceXtractor::MultithreadedMeasurement::outputThreadLoop
void outputThreadLoop()
Definition:
MultithreadedMeasurement.cpp:93
std::thread::joinable
T joinable(T... args)
SourceXtractor
Definition:
Aperture.h:30
Euclid::ThreadPool::queued
size_t queued() const
SourceXtractor::MultithreadedMeasurement::m_abort_raised
std::atomic_bool m_abort_raised
Definition:
MultithreadedMeasurement.h:63
std::unique_lock
STL class.
SourceXtractor::MultithreadedMeasurement::m_source_to_row
SourceToRowConverter m_source_to_row
Definition:
MultithreadedMeasurement.h:58
Elements::Exception
SourceXtractor::MultithreadedMeasurement::m_output_queue_mutex
std::mutex m_output_queue_mutex
Definition:
MultithreadedMeasurement.h:67
SourceXtractor::MultithreadedMeasurement::m_new_output
std::condition_variable m_new_output
Definition:
MultithreadedMeasurement.h:65
std::condition_variable::wait_for
T wait_for(T... args)
SourceXtractor::logger
static auto logger
Definition:
WCS.cpp:44
e
constexpr double e
std::condition_variable::notify_one
T notify_one(T... args)
Elements::Logging::getLogger
static Logging getLogger(const std::string &name="")
SourceXtractor::MultithreadedMeasurement::~MultithreadedMeasurement
virtual ~MultithreadedMeasurement()
Definition:
MultithreadedMeasurement.cpp:36
Euclid::ThreadPool::block
void block()
SourceXtractor::MultithreadedMeasurement::m_output_queue
std::list< std::pair< int, std::shared_ptr< SourceGroupInterface > > > m_output_queue
Definition:
MultithreadedMeasurement.h:66
SourceXtractor::MultithreadedMeasurement::waitForThreads
void waitForThreads() override
Definition:
MultithreadedMeasurement.cpp:46
MultithreadedMeasurement.h
Logging.h
Euclid::ThreadPool::submit
void submit(Task task)
SourceXtractor::MultithreadedMeasurement::m_output_thread
std::unique_ptr< std::thread > m_output_thread
Definition:
MultithreadedMeasurement.h:60
SourceXtractor::MultithreadedMeasurement::startThreads
void startThreads() override
Definition:
MultithreadedMeasurement.cpp:42
Euclid::ThreadPool::activeThreads
size_t activeThreads() const
SourceXtractor::MultithreadedMeasurement
Definition:
MultithreadedMeasurement.h:37
SourceXtractor::MultithreadedMeasurement::m_group_counter
int m_group_counter
Definition:
MultithreadedMeasurement.h:62
std::thread::join
T join(T... args)
Generated by
1.8.20