Fawkes API  Fawkes Development Version
openprs_thread.cpp
1 
2 /***************************************************************************
3  * openprs_thread.cpp - OpenPRS environment providing Thread
4  *
5  * Created: Thu Aug 14 15:52:35 2014
6  * Copyright 2014-2015 Tim Niemueller [www.niemueller.de]
7  ****************************************************************************/
8 
9 /* This program is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 2 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17  * GNU Library General Public License for more details.
18  *
19  * Read the full text in the LICENSE.GPL file in the doc directory.
20  */
21 
22 #include "openprs_thread.h"
23 
24 #include "utils/openprs_mp_proxy.h"
25 #include "utils/openprs_server_proxy.h"
26 
27 #include <baseapp/run.h>
28 #include <logging/logger.h>
29 #include <netcomm/fawkes/network_manager.h>
30 #include <utils/sub_process/proc.h>
31 
32 #include <boost/bind.hpp>
33 #include <boost/format.hpp>
34 #include <boost/lambda/bind.hpp>
35 #include <boost/lambda/lambda.hpp>
36 #include <cerrno>
37 #include <csignal>
38 #include <cstdio>
39 #include <cstdlib>
40 #include <unistd.h>
41 
42 using namespace fawkes;
43 
44 /** @class OpenPRSThread "openprs_thread.h"
45  * OpenPRS environment thread.
46  *
47  * @author Tim Niemueller
48  */
49 
50 /** Constructor. */
52 : Thread("OpenPRSThread", Thread::OPMODE_WAITFORWAKEUP),
53  BlockedTimingAspect(BlockedTimingAspect::WAKEUP_HOOK_WORLDSTATE),
54  AspectProviderAspect(inifin_list()),
55  server_socket_(io_service_),
56  deadline_(io_service_)
57 {
58 }
59 
60 /** Destructor. */
62 {
63 }
64 
65 void
67 {
68  proc_srv_ = NULL;
69  proc_mp_ = NULL;
70  openprs_server_proxy_ = NULL;
71  openprs_mp_proxy_ = NULL;
72 
73  char hostname[HOST_NAME_MAX];
74  if (gethostname(hostname, HOST_NAME_MAX) == -1) {
75  strcpy(hostname, "localhost");
76  }
77 
78  cfg_mp_run_ = config->get_bool("/openprs/message-passer/run");
79  cfg_mp_bin_ = config->get_string("/openprs/message-passer/binary");
80  try {
81  cfg_mp_host_ = config->get_string("/openprs/message-passer/hostname");
82  } catch (Exception &e) {
83  cfg_mp_host_ = hostname;
84  }
85  cfg_mp_port_ = config->get_uint("/openprs/message-passer/tcp-port");
86  cfg_mp_port_s_ = boost::str(boost::format("%u") % cfg_mp_port_);
87  cfg_mp_use_proxy_ = config->get_bool("/openprs/message-passer/use-proxy");
88  cfg_mp_proxy_port_ = config->get_uint("/openprs/message-passer/proxy-tcp-port");
89 
90  cfg_server_run_ = config->get_bool("/openprs/server/run");
91  cfg_server_bin_ = config->get_string("/openprs/server/binary");
92  try {
93  cfg_server_host_ = config->get_string("/openprs/server/hostname");
94  } catch (Exception &e) {
95  cfg_server_host_ = hostname;
96  }
97  cfg_server_port_ = config->get_uint("/openprs/server/tcp-port");
98  cfg_server_port_s_ = boost::str(boost::format("%u") % cfg_server_port_);
99  cfg_server_proxy_port_ = config->get_uint("/openprs/server/proxy-tcp-port");
100 
101  cfg_server_timeout_ = config->get_float("/openprs/server/timeout");
102  cfg_kernel_timeout_ = config->get_float("/openprs/kernels/start-timeout");
103 
104  openprs_aspect_inifin_.set_kernel_timeout(cfg_kernel_timeout_);
105 
106  if (cfg_mp_run_) {
107  logger->log_warn(name(), "Running OPRS-mp");
108  const char *filename = cfg_mp_bin_.c_str();
109  const char *argv[] = {filename, "-j", cfg_mp_port_s_.c_str(), NULL};
110  proc_mp_ = new SubProcess("OPRS-mp", filename, argv, NULL, logger);
111  } else {
112  proc_mp_ = NULL;
113  }
114 
115  if (cfg_server_run_) {
116  logger->log_warn(name(), "Running OPRS-server");
117  const char *filename = cfg_server_bin_.c_str();
118  const char *argv[] = {filename,
119  "-j",
120  cfg_mp_port_s_.c_str(),
121  "-i",
122  cfg_server_port_s_.c_str(),
123  "-l",
124  "lower",
125  NULL};
126  proc_srv_ = new SubProcess("OPRS-server", filename, argv, NULL, logger);
127  } else {
128  proc_srv_ = NULL;
129  }
130 
131 #if BOOST_VERSION >= 104800
132  logger->log_info(name(), "Verifying OPRS-server availability");
133 
134  boost::asio::ip::tcp::resolver resolver(io_service_);
135  boost::asio::ip::tcp::resolver::query query(cfg_server_host_, cfg_server_port_s_);
136  boost::asio::ip::tcp::resolver::iterator iter = resolver.resolve(query);
137 
138  // this is just the overly complicated way to get a timeout on
139  // a synchronous connect, cf.
140  // http://www.boost.org/doc/libs/1_55_0/doc/html/boost_asio/example/cpp03/timeouts/blocking_tcp_client.cpp
141  deadline_.expires_at(boost::posix_time::pos_infin);
142  check_deadline(deadline_, server_socket_);
143 
144  deadline_.expires_from_now(boost::posix_time::seconds(cfg_server_timeout_));
145 
146  boost::system::error_code ec = boost::asio::error::would_block;
147  server_socket_.async_connect(iter->endpoint(), boost::lambda::var(ec) = boost::lambda::_1);
148 
149  // Block until the asynchronous operation has completed.
150  do {
151  io_service_.run_one();
152 # if BOOST_VERSION >= 105400 && BOOST_VERSION < 105500
153  // Boost 1.54 has a bug that causes async_connect to report success
154  // if it cannot connect at all to the other side, cf.
155  // https://svn.boost.org/trac/boost/ticket/8795
156  // Work around by explicitly checking for connected status
157  if (!ec) {
158  server_socket_.remote_endpoint(ec);
159  if (ec == boost::system::errc::not_connected) {
160  // continue waiting for timeout
161  ec = boost::asio::error::would_block;
162  server_socket_.async_connect(iter->endpoint(), boost::lambda::var(ec) = boost::lambda::_1);
163  }
164  }
165 # endif
166  } while (ec == boost::asio::error::would_block);
167 
168  // Determine whether a connection was successfully established.
169  if (ec || !server_socket_.is_open()) {
170  finalize();
171  if (ec.value() == boost::system::errc::operation_canceled) {
172  throw Exception("OpenPRS waiting for server to come up timed out");
173  } else {
174  throw Exception("OpenPRS waiting for server failed: %s", ec.message().c_str());
175  }
176  }
177 #else
178  logger->log_warn(name(), "Cannot verify server aliveness, Boost too old");
179 #endif
180 
181  boost::asio::socket_base::keep_alive keep_alive_option(true);
182  server_socket_.set_option(keep_alive_option);
183 
184  // receive greeting
185  std::string greeting = OpenPRSServerProxy::read_string_from_socket(server_socket_);
186  //logger->log_info(name(), "Received server greeting: %s", greeting.c_str());
187  // send our greeting
188  OpenPRSServerProxy::write_string_to_socket(server_socket_, "fawkes");
189  OpenPRSServerProxy::write_int_to_socket(server_socket_, getpid());
190  OpenPRSServerProxy::write_int_to_socket(server_socket_, 0);
191 
192  io_service_thread_ = std::thread([this]() { this->io_service_.run(); });
193 
194  logger->log_info(name(), "Starting OpenPRS server proxy");
195 
196  openprs_server_proxy_ =
197  new OpenPRSServerProxy(cfg_server_proxy_port_, cfg_server_host_, cfg_server_port_, logger);
198 
199  if (cfg_mp_use_proxy_) {
200  logger->log_info(name(), "Starting OpenPRS message passer proxy");
201  openprs_mp_proxy_ =
202  new OpenPRSMessagePasserProxy(cfg_mp_proxy_port_, cfg_mp_host_, cfg_mp_port_, logger);
203  } else {
204  openprs_mp_proxy_ = NULL;
205  }
206 
207  logger->log_warn(name(), "Initializing kernel manager");
208  openprs_kernel_mgr_ =
209  new OpenPRSKernelManager(hostname,
210  cfg_server_proxy_port_,
211  cfg_mp_use_proxy_ ? hostname : cfg_mp_host_,
212  cfg_mp_use_proxy_ ? cfg_mp_proxy_port_ : cfg_mp_port_,
213  logger,
214  clock,
215  config);
216  openprs_aspect_inifin_.prepare("localhost",
217  fawkes::runtime::network_manager->fawkes_port(),
218  openprs_kernel_mgr_,
219  openprs_server_proxy_,
220  openprs_mp_proxy_);
221  openprs_manager_aspect_inifin_.set_manager(openprs_kernel_mgr_);
222 }
223 
224 void
226 {
227  server_socket_.close();
228  io_service_.stop();
229  if (io_service_thread_.joinable()) {
230  io_service_thread_.join();
231  }
232 
233  if (proc_srv_) {
234  logger->log_info(name(), "Killing OpenPRS server");
235  proc_srv_->kill(SIGINT);
236  }
237  if (proc_mp_) {
238  logger->log_info(name(), "Killing OpenPRS message passer");
239  proc_mp_->kill(SIGINT);
240  }
241 
242  delete proc_srv_;
243  delete proc_mp_;
244 
245  delete openprs_server_proxy_;
246  delete openprs_mp_proxy_;
247  openprs_kernel_mgr_.clear();
248 }
249 
250 void
252 {
253  if (proc_srv_)
254  proc_srv_->check_proc();
255  if (proc_mp_)
256  proc_mp_->check_proc();
257 }
258 
259 const std::list<AspectIniFin *>
260 OpenPRSThread::inifin_list()
261 {
262  std::list<AspectIniFin *> rv;
263  rv.push_back(&openprs_aspect_inifin_);
264  rv.push_back(&openprs_manager_aspect_inifin_);
265  return rv;
266 }
267 
268 bool
269 OpenPRSThread::server_alive()
270 {
271  if (server_socket_.is_open()) {
272  boost::system::error_code ec;
273  server_socket_.remote_endpoint(ec);
274  return !ec;
275  } else {
276  return false;
277  }
278 }
279 
280 void
281 OpenPRSThread::check_deadline(boost::asio::deadline_timer & deadline,
282  boost::asio::ip::tcp::socket &socket)
283 {
284  if (deadline.expires_at() <= boost::asio::deadline_timer::traits_type::now()) {
285  socket.close();
286  deadline.expires_at(boost::posix_time::pos_infin);
287  }
288 
289 #if BOOST_VERSION >= 104800
290  deadline.async_wait(boost::lambda::bind(
291  &OpenPRSThread::check_deadline, this, boost::ref(deadline), boost::ref(socket)));
292 #else
293  deadline.async_wait(
294  boost::bind(&OpenPRSThread::check_deadline, this, boost::ref(deadline), boost::ref(socket)));
295 #endif
296 }
OpenPRSThread()
Constructor.
virtual void log_info(const char *component, const char *format,...)=0
Log informational message.
Proxy for the OpenPRS server communication.
Fawkes library namespace.
virtual bool get_bool(const char *path)=0
Get value from configuration which is of type bool.
void kill(int signum)
Send a signal to the process.
Definition: proc.cpp:188
void check_proc()
Check if the process is still alive.
Definition: proc.cpp:375
void set_manager(LockPtr< OpenPRSKernelManager > &clips_kernel_mgr)
Set OpenPRS environment manger.
Thread class encapsulation of pthreads.
Definition: thread.h:45
Logger * logger
This is the Logger member used to access the logger.
Definition: logging.h:41
virtual void init()
Initialize the thread.
Clock * clock
By means of this member access to the clock is given.
Definition: clock.h:42
Thread aspect to use blocked timing.
Base class for exceptions in Fawkes.
Definition: exception.h:35
virtual void finalize()
Finalize the thread.
const char * name() const
Get name of thread.
Definition: thread.h:100
void prepare(const std::string &fawkes_host, unsigned short fawkes_port, LockPtr< OpenPRSKernelManager > &openprs_kernel_mgr, OpenPRSServerProxy *openprs_server_proxy, OpenPRSMessagePasserProxy *openprs_mp_proxy)
Prepare OpenPRS aspect initializer.
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
Thread aspect provide a new aspect.
void clear()
Set underlying instance to 0, decrementing reference count of existing instance appropriately.
Definition: lockptr.h:497
Sub-process execution with stdin/stdout/stderr redirection.
Definition: proc.h:36
void set_kernel_timeout(float timeout_sec)
Set timeout for kernel creation.
virtual void loop()
Code to execute in the thread.
virtual unsigned int get_uint(const char *path)=0
Get value from configuration which is of type unsigned int.
Proxy for the OpenPRS server communication.
Configuration * config
This is the Configuration member used to access the configuration.
Definition: configurable.h:41
virtual ~OpenPRSThread()
Destructor.
virtual float get_float(const char *path)=0
Get value from configuration which is of type float.
virtual std::string get_string(const char *path)=0
Get value from configuration which is of type string.