Fawkes API Fawkes Development Version
|
00001 00002 /*************************************************************************** 00003 * qa_bb_remote.cpp - BlackBoard remote access QA 00004 * 00005 * Created: Mon Mar 03 17:31:18 2008 00006 * Copyright 2006-2008 Tim Niemueller [www.niemueller.de] 00007 * 00008 ****************************************************************************/ 00009 00010 /* This program is free software; you can redistribute it and/or modify 00011 * it under the terms of the GNU General Public License as published by 00012 * the Free Software Foundation; either version 2 of the License, or 00013 * (at your option) any later version. A runtime exception applies to 00014 * this software (see LICENSE.GPL_WRE file mentioned below for details). 00015 * 00016 * This program is distributed in the hope that it will be useful, 00017 * but WITHOUT ANY WARRANTY; without even the implied warranty of 00018 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00019 * GNU Library General Public License for more details. 00020 * 00021 * Read the full text in the LICENSE.GPL_WRE file in the doc directory. 00022 */ 00023 00024 00025 /// @cond QA 00026 00027 #include <blackboard/local.h> 00028 #include <blackboard/remote.h> 00029 #include <blackboard/exceptions.h> 00030 #include <blackboard/bbconfig.h> 00031 #include <blackboard/interface_listener.h> 00032 00033 #include <interfaces/TestInterface.h> 00034 00035 #include <interface/interface_info.h> 00036 #include <core/exceptions/system.h> 00037 #include <netcomm/fawkes/client.h> 00038 #include <netcomm/fawkes/server_thread.h> 00039 #include <utils/time/time.h> 00040 00041 #include <signal.h> 00042 #include <cstdlib> 00043 #include <cstring> 00044 #include <cstdio> 00045 00046 #include <iostream> 00047 #include <vector> 00048 00049 using namespace std; 00050 using namespace fawkes; 00051 00052 00053 bool quit = false; 00054 00055 void 00056 signal_handler(int signum) 00057 { 00058 quit = true; 00059 } 00060 00061 00062 #define NUM_CHUNKS 5 00063 00064 void 00065 test_messaging(TestInterface *ti_reader, TestInterface *ti_writer) 00066 { 00067 while (! quit) { 00068 int expval = ti_reader->test_int() + 1; 00069 TestInterface::SetTestIntMessage *m = new TestInterface::SetTestIntMessage(expval); 00070 unsigned int msgid = ti_reader->msgq_enqueue(m); 00071 printf("Sent with message ID %u\n", msgid); 00072 00073 if ( ti_writer->msgq_size() > 1 ) { 00074 cout << "Error, more than one message! flushing." << endl; 00075 ti_writer->msgq_flush(); 00076 } 00077 00078 usleep(100000); 00079 00080 if ( ti_writer->msgq_first() != NULL ) { 00081 if ( ti_writer->msgq_first_is<TestInterface::SetTestStringMessage>() ) { 00082 TestInterface::SetTestStringMessage *msg = ti_writer->msgq_first(msg); 00083 printf("Received message of ID %u, Message improperly detected to be a SetTestStringMessage\n", msg->id()); 00084 } 00085 if ( ti_writer->msgq_first_is<TestInterface::SetTestIntMessage>() ) { 00086 TestInterface::SetTestIntMessage *m2 = ti_writer->msgq_first<TestInterface::SetTestIntMessage>(); 00087 printf("Received message with ID %u (enqueue time: %s)\n", m2->id(), m2->time_enqueued()->str()); 00088 ti_writer->set_test_int( m2->test_int() ); 00089 try { 00090 ti_writer->write(); 00091 } catch (InterfaceWriteDeniedException &e) { 00092 cout << "BUG: caught write denied exception" << endl; 00093 e.print_trace(); 00094 } 00095 ti_writer->msgq_pop(); 00096 } else { 00097 cout << "Illegal message '" << ti_writer->msgq_first()->type() << "' type received" << endl; 00098 } 00099 00100 usleep(100000); 00101 00102 //cout << "Reading value from reader interface.. " << flush; 00103 ti_reader->read(); 00104 int val = ti_reader->test_int(); 00105 if ( val == expval ) { 00106 //cout << " success, value is " << ti_reader->test_int() << " as expected" << endl; 00107 } else { 00108 cout << " failure, value is " << ti_reader->test_int() << ", expected " 00109 << expval << endl; 00110 } 00111 } else { 00112 printf("No message in queue, if network test this means the message was dropped\n"); 00113 } 00114 00115 usleep(10); 00116 } 00117 } 00118 00119 class SyncInterfaceListener : public fawkes::BlackBoardInterfaceListener 00120 { 00121 public: 00122 SyncInterfaceListener(fawkes::Interface *reader, 00123 fawkes::Interface *writer, 00124 fawkes::BlackBoard *reader_bb, 00125 fawkes::BlackBoard *writer_bb) 00126 : BlackBoardInterfaceListener("SyncInterfaceListener(%s-%s)", writer->uid(), reader->id()) 00127 { 00128 __reader = reader; 00129 __writer = writer; 00130 __reader_bb = reader_bb; 00131 __writer_bb = writer_bb; 00132 00133 bbil_add_data_interface(__reader); 00134 bbil_add_message_interface(__writer); 00135 00136 __reader_bb->register_listener(this, BlackBoard::BBIL_FLAG_DATA); 00137 __writer_bb->register_listener(this, BlackBoard::BBIL_FLAG_MESSAGES); 00138 } 00139 00140 00141 /** Destructor. */ 00142 ~SyncInterfaceListener() 00143 { 00144 __reader_bb->unregister_listener(this); 00145 __writer_bb->unregister_listener(this); 00146 } 00147 00148 00149 bool 00150 bb_interface_message_received(Interface *interface, 00151 Message *message) throw() 00152 { 00153 try { 00154 if ( interface == __writer ) { 00155 printf("%s: Forwarding message\n", bbil_name()); 00156 Message *m = message->clone(); 00157 m->set_hops(message->hops()); 00158 m->ref(); 00159 __reader->msgq_enqueue(m); 00160 message->set_id(m->id()); 00161 m->unref(); 00162 return false; 00163 } else { 00164 // Don't know why we were called, let 'em enqueue 00165 printf("%s: Message received for unknown interface\n", bbil_name()); 00166 return true; 00167 } 00168 } catch (Exception &e) { 00169 printf("%s: Exception when message received\n", bbil_name()); 00170 e.print_trace(); 00171 return false; 00172 } 00173 } 00174 00175 00176 void 00177 bb_interface_data_changed(Interface *interface) throw() 00178 { 00179 try { 00180 if ( interface == __reader ) { 00181 //__logger->log_debug(bbil_name(), "Copying data"); 00182 __reader->read(); 00183 __writer->copy_values(__reader); 00184 __writer->write(); 00185 } else { 00186 // Don't know why we were called, let 'em enqueue 00187 printf("%s: Data changed for unknown interface", bbil_name()); 00188 } 00189 } catch (Exception &e) { 00190 printf("%s: Exception when data changed\n", bbil_name()); 00191 e.print_trace(); 00192 } 00193 } 00194 00195 private: 00196 fawkes::Interface *__writer; 00197 fawkes::Interface *__reader; 00198 00199 fawkes::BlackBoard *__writer_bb; 00200 fawkes::BlackBoard *__reader_bb; 00201 00202 }; 00203 00204 00205 int 00206 main(int argc, char **argv) 00207 { 00208 signal(SIGINT, signal_handler); 00209 00210 LocalBlackBoard *llbb = new LocalBlackBoard(BLACKBOARD_MEMSIZE); 00211 BlackBoard *lbb = llbb; 00212 00213 FawkesNetworkServerThread *fns = new FawkesNetworkServerThread(1910); 00214 fns->start(); 00215 00216 llbb->start_nethandler(fns); 00217 00218 BlackBoard *rbb = new RemoteBlackBoard("localhost", 1910); 00219 00220 InterfaceInfoList *infl = rbb->list_all(); 00221 for (InterfaceInfoList::iterator i = infl->begin(); i != infl->end(); ++i) { 00222 const unsigned char *hash = (*i).hash(); 00223 char phash[__INTERFACE_HASH_SIZE * 2 + 1]; 00224 memset(phash, 0, sizeof(phash)); 00225 for (unsigned int j = 0; j < __INTERFACE_HASH_SIZE; ++j) { 00226 sprintf(&phash[j * 2], "%02x", hash[j]); 00227 } 00228 printf("%s::%s (%s), w:%i r:%u s:%u\n", 00229 (*i).type(), (*i).id(), phash, (*i).has_writer(), 00230 (*i).num_readers(), (*i).serial()); 00231 } 00232 delete infl; 00233 00234 //TestInterface *ti_writer; 00235 TestInterface *ti_reader; 00236 TestInterface *ti_writer; 00237 try { 00238 cout << "Opening interfaces.. " << flush; 00239 ti_writer = rbb->open_for_writing<TestInterface>("SomeID"); 00240 ti_reader = rbb->open_for_reading<TestInterface>("SomeID"); 00241 cout << "success, " 00242 << "writer hash=" << ti_writer->hash_printable() 00243 << " reader hash=" << ti_reader->hash_printable() 00244 << endl; 00245 } catch (Exception &e) { 00246 cout << "failed! Aborting" << endl; 00247 e.print_trace(); 00248 exit(1); 00249 } 00250 00251 try { 00252 cout << "Trying to open second writer.. " << flush; 00253 TestInterface *ti_writer_two; 00254 ti_writer_two = rbb->open_for_writing<TestInterface>("SomeID"); 00255 cout << "BUG: Detection of second writer did NOT work!" << endl; 00256 exit(2); 00257 } catch (BlackBoardWriterActiveException &e) { 00258 cout << "exception caught as expected, detected and prevented second writer!" << endl; 00259 } 00260 00261 try { 00262 cout << "Trying to open third writer.. " << flush; 00263 TestInterface *ti_writer_three; 00264 ti_writer_three = rbb->open_for_writing<TestInterface>("AnotherID"); 00265 cout << "No exception as expected, different ID ok!" << endl; 00266 rbb->close(ti_writer_three); 00267 } catch (BlackBoardWriterActiveException &e) { 00268 cout << "BUG: Third writer with different ID detected as another writer!" << endl; 00269 exit(3); 00270 } 00271 00272 cout << endl << endl 00273 << "Running data tests ==================================================" << endl; 00274 00275 cout << "Writing initial value (" 00276 << TestInterface::TEST_CONSTANT << ") into interface as TestInt" << endl; 00277 ti_writer->set_test_int( TestInterface::TEST_CONSTANT ); 00278 try { 00279 ti_writer->write(); 00280 } catch (InterfaceWriteDeniedException &e) { 00281 cout << "BUG: caught write denied exception" << endl; 00282 e.print_trace(); 00283 } 00284 00285 cout << "Giving some time to have value processed" << endl; 00286 usleep(100000); 00287 00288 cout << "Reading value from reader interface.. " << flush; 00289 ti_reader->read(); 00290 int val = ti_reader->test_int(); 00291 if ( val == TestInterface::TEST_CONSTANT ) { 00292 cout << " success, value is " << ti_reader->test_int() << " as expected" << endl; 00293 } else { 00294 cout << " failure, value is " << ti_reader->test_int() << ", expected " 00295 << TestInterface::TEST_CONSTANT << endl; 00296 } 00297 00298 cout << "Closing interfaces.. " << flush; 00299 try { 00300 rbb->close(ti_reader); 00301 rbb->close(ti_writer); 00302 cout << "done" << endl; 00303 } catch (Exception &e) { 00304 cout << "failed" << endl; 00305 e.print_trace(); 00306 } 00307 00308 cout << endl << endl << "Starting MESSAGING tests" << endl 00309 << "Press Ctrl-C to continue with next test" << endl << endl; 00310 00311 ti_writer = lbb->open_for_writing<TestInterface>("Messaging"); 00312 ti_reader = rbb->open_for_reading<TestInterface>("Messaging"); 00313 00314 printf("Writer serial: %u shifted: %u\n", ti_writer->serial(), ti_writer->serial() << 16); 00315 printf("Reader serial: %u shifted: %u\n", ti_reader->serial(), ti_reader->serial() << 16); 00316 00317 test_messaging(ti_reader, ti_writer); 00318 00319 rbb->close(ti_reader); 00320 lbb->close(ti_writer); 00321 00322 cout << endl << endl << "Starting MESSAGING tests, doing repeater scenario" << endl 00323 << "Press Ctrl-C to continue with next test" << endl << endl; 00324 quit = false; 00325 00326 delete rbb; 00327 00328 LocalBlackBoard *repllbb = new LocalBlackBoard(BLACKBOARD_MEMSIZE); 00329 00330 FawkesNetworkServerThread *repfns = new FawkesNetworkServerThread(1911); 00331 repfns->start(); 00332 00333 repllbb->start_nethandler(repfns); 00334 00335 BlackBoard *rep_rbb = new RemoteBlackBoard("localhost", 1911); 00336 rbb = new RemoteBlackBoard("localhost", 1911); 00337 00338 TestInterface *rep_reader; 00339 TestInterface *rep_writer; 00340 00341 ti_writer = rbb->open_for_writing<TestInterface>("Messaging"); 00342 ti_reader = lbb->open_for_reading<TestInterface>("Messaging"); 00343 00344 rep_reader = rep_rbb->open_for_reading<TestInterface>("Messaging"); 00345 rep_writer = lbb->open_for_writing<TestInterface>("Messaging"); 00346 00347 printf("Writer serial: %u shifted: %u\n", ti_writer->serial(), ti_writer->serial() << 16); 00348 printf("Reader serial: %u shifted: %u\n", ti_reader->serial(), ti_reader->serial() << 16); 00349 00350 SyncInterfaceListener *sil = new SyncInterfaceListener(rep_reader, rep_writer, rep_rbb, lbb); 00351 00352 test_messaging(ti_reader, ti_writer); 00353 00354 delete sil; 00355 lbb->close(ti_reader); 00356 rbb->close(ti_writer); 00357 rep_rbb->close(rep_reader); 00358 lbb->close(rep_writer); 00359 delete repllbb; 00360 delete rep_rbb; 00361 00362 cout << "Tests done" << endl; 00363 00364 delete rbb; 00365 delete llbb; 00366 delete fns; 00367 } 00368 00369 00370 /// @endcond