22 #include "openprs_thread.h" 24 #include "utils/openprs_mp_proxy.h" 25 #include "utils/openprs_server_proxy.h" 27 #include <baseapp/run.h> 28 #include <logging/logger.h> 29 #include <netcomm/fawkes/network_manager.h> 30 #include <utils/sub_process/proc.h> 32 #include <boost/bind.hpp> 33 #include <boost/format.hpp> 34 #include <boost/lambda/bind.hpp> 35 #include <boost/lambda/lambda.hpp> 55 server_socket_(io_service_),
56 deadline_(io_service_)
70 openprs_server_proxy_ = NULL;
71 openprs_mp_proxy_ = NULL;
73 char hostname[HOST_NAME_MAX];
74 if (gethostname(hostname, HOST_NAME_MAX) == -1) {
75 strcpy(hostname,
"localhost");
83 cfg_mp_host_ = hostname;
85 cfg_mp_port_ =
config->
get_uint(
"/openprs/message-passer/tcp-port");
86 cfg_mp_port_s_ = boost::str(boost::format(
"%u") % cfg_mp_port_);
87 cfg_mp_use_proxy_ =
config->
get_bool(
"/openprs/message-passer/use-proxy");
88 cfg_mp_proxy_port_ =
config->
get_uint(
"/openprs/message-passer/proxy-tcp-port");
95 cfg_server_host_ = hostname;
98 cfg_server_port_s_ = boost::str(boost::format(
"%u") % cfg_server_port_);
99 cfg_server_proxy_port_ =
config->
get_uint(
"/openprs/server/proxy-tcp-port");
102 cfg_kernel_timeout_ =
config->
get_float(
"/openprs/kernels/start-timeout");
108 const char *filename = cfg_mp_bin_.c_str();
109 const char *argv[] = {filename,
"-j", cfg_mp_port_s_.c_str(), NULL};
115 if (cfg_server_run_) {
117 const char *filename = cfg_server_bin_.c_str();
118 const char *argv[] = {filename,
120 cfg_mp_port_s_.c_str(),
122 cfg_server_port_s_.c_str(),
131 #if BOOST_VERSION >= 104800 134 boost::asio::ip::tcp::resolver resolver(io_service_);
135 boost::asio::ip::tcp::resolver::query query(cfg_server_host_, cfg_server_port_s_);
136 boost::asio::ip::tcp::resolver::iterator iter = resolver.resolve(query);
141 deadline_.expires_at(boost::posix_time::pos_infin);
142 check_deadline(deadline_, server_socket_);
144 deadline_.expires_from_now(boost::posix_time::seconds(cfg_server_timeout_));
146 boost::system::error_code ec = boost::asio::error::would_block;
147 server_socket_.async_connect(iter->endpoint(), boost::lambda::var(ec) = boost::lambda::_1);
151 io_service_.run_one();
152 # if BOOST_VERSION >= 105400 && BOOST_VERSION < 105500 158 server_socket_.remote_endpoint(ec);
159 if (ec == boost::system::errc::not_connected) {
161 ec = boost::asio::error::would_block;
162 server_socket_.async_connect(iter->endpoint(), boost::lambda::var(ec) = boost::lambda::_1);
166 }
while (ec == boost::asio::error::would_block);
169 if (ec || !server_socket_.is_open()) {
171 if (ec.value() == boost::system::errc::operation_canceled) {
172 throw Exception(
"OpenPRS waiting for server to come up timed out");
174 throw Exception(
"OpenPRS waiting for server failed: %s", ec.message().c_str());
181 boost::asio::socket_base::keep_alive keep_alive_option(
true);
182 server_socket_.set_option(keep_alive_option);
185 std::string greeting = OpenPRSServerProxy::read_string_from_socket(server_socket_);
188 OpenPRSServerProxy::write_string_to_socket(server_socket_,
"fawkes");
189 OpenPRSServerProxy::write_int_to_socket(server_socket_, getpid());
190 OpenPRSServerProxy::write_int_to_socket(server_socket_, 0);
192 io_service_thread_ = std::thread([
this]() { this->io_service_.run(); });
196 openprs_server_proxy_ =
199 if (cfg_mp_use_proxy_) {
204 openprs_mp_proxy_ = NULL;
208 openprs_kernel_mgr_ =
210 cfg_server_proxy_port_,
211 cfg_mp_use_proxy_ ? hostname : cfg_mp_host_,
212 cfg_mp_use_proxy_ ? cfg_mp_proxy_port_ : cfg_mp_port_,
216 openprs_aspect_inifin_.
prepare(
"localhost",
217 fawkes::runtime::network_manager->fawkes_port(),
219 openprs_server_proxy_,
221 openprs_manager_aspect_inifin_.
set_manager(openprs_kernel_mgr_);
227 server_socket_.close();
229 if (io_service_thread_.joinable()) {
230 io_service_thread_.join();
235 proc_srv_->
kill(SIGINT);
239 proc_mp_->
kill(SIGINT);
245 delete openprs_server_proxy_;
246 delete openprs_mp_proxy_;
247 openprs_kernel_mgr_.
clear();
259 const std::list<AspectIniFin *>
260 OpenPRSThread::inifin_list()
262 std::list<AspectIniFin *> rv;
263 rv.push_back(&openprs_aspect_inifin_);
264 rv.push_back(&openprs_manager_aspect_inifin_);
269 OpenPRSThread::server_alive()
271 if (server_socket_.is_open()) {
272 boost::system::error_code ec;
273 server_socket_.remote_endpoint(ec);
281 OpenPRSThread::check_deadline(boost::asio::deadline_timer & deadline,
282 boost::asio::ip::tcp::socket &socket)
284 if (deadline.expires_at() <= boost::asio::deadline_timer::traits_type::now()) {
286 deadline.expires_at(boost::posix_time::pos_infin);
289 #if BOOST_VERSION >= 104800 290 deadline.async_wait(boost::lambda::bind(
291 &OpenPRSThread::check_deadline,
this, boost::ref(deadline), boost::ref(socket)));
294 boost::bind(&OpenPRSThread::check_deadline,
this, boost::ref(deadline), boost::ref(socket)));
OpenPRSThread()
Constructor.
virtual void log_info(const char *component, const char *format,...)=0
Log informational message.
Fawkes library namespace.
virtual bool get_bool(const char *path)=0
Get value from configuration which is of type bool.
void kill(int signum)
Send a signal to the process.
void check_proc()
Check if the process is still alive.
void set_manager(LockPtr< OpenPRSKernelManager > &clips_kernel_mgr)
Set OpenPRS environment manger.
Thread class encapsulation of pthreads.
Logger * logger
This is the Logger member used to access the logger.
virtual void init()
Initialize the thread.
Clock * clock
By means of this member access to the clock is given.
Thread aspect to use blocked timing.
Base class for exceptions in Fawkes.
virtual void finalize()
Finalize the thread.
const char * name() const
Get name of thread.
void prepare(const std::string &fawkes_host, unsigned short fawkes_port, LockPtr< OpenPRSKernelManager > &openprs_kernel_mgr, OpenPRSServerProxy *openprs_server_proxy, OpenPRSMessagePasserProxy *openprs_mp_proxy)
Prepare OpenPRS aspect initializer.
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
Thread aspect provide a new aspect.
void clear()
Set underlying instance to 0, decrementing reference count of existing instance appropriately.
Sub-process execution with stdin/stdout/stderr redirection.
void set_kernel_timeout(float timeout_sec)
Set timeout for kernel creation.
virtual void loop()
Code to execute in the thread.
virtual unsigned int get_uint(const char *path)=0
Get value from configuration which is of type unsigned int.
Proxy for the OpenPRS server communication.
Configuration * config
This is the Configuration member used to access the configuration.
virtual ~OpenPRSThread()
Destructor.
virtual float get_float(const char *path)=0
Get value from configuration which is of type float.
virtual std::string get_string(const char *path)=0
Get value from configuration which is of type string.