pion-net
4.0.9
|
00001 // ------------------------------------------------------------------ 00002 // pion-net: a C++ framework for building lightweight HTTP interfaces 00003 // ------------------------------------------------------------------ 00004 // Copyright (C) 2007-2008 Atomic Labs, Inc. (http://www.atomiclabs.com) 00005 // 00006 // Distributed under the Boost Software License, Version 1.0. 00007 // See http://www.boost.org/LICENSE_1_0.txt 00008 // 00009 00010 #ifndef __PION_TCPSTREAM_HEADER__ 00011 #define __PION_TCPSTREAM_HEADER__ 00012 00013 #include <cstring> 00014 #include <istream> 00015 #include <streambuf> 00016 #include <boost/bind.hpp> 00017 #include <boost/thread/mutex.hpp> 00018 #include <boost/thread/condition.hpp> 00019 #include <pion/PionConfig.hpp> 00020 #include <pion/net/TCPConnection.hpp> 00021 00022 00023 namespace pion { // begin namespace pion 00024 namespace net { // begin namespace net (Pion Network Library) 00025 00026 00032 class TCPStreamBuffer 00033 : public std::basic_streambuf<char, std::char_traits<char> > 00034 { 00035 public: 00036 00037 // data type definitions required for iostream compatability 00038 typedef char char_type; 00039 typedef std::char_traits<char>::int_type int_type; 00040 typedef std::char_traits<char>::off_type off_type; 00041 typedef std::char_traits<char>::pos_type pos_type; 00042 typedef std::char_traits<char> traits_type; 00043 00044 // some integer constants used within TCPStreamBuffer 00045 enum { 00046 PUT_BACK_MAX = 10, //< number of bytes that can be put back into the read buffer 00047 WRITE_BUFFER_SIZE = 8192 //< size of the write buffer 00048 }; 00049 00050 00056 explicit TCPStreamBuffer(TCPConnectionPtr& conn_ptr) 00057 : m_conn_ptr(conn_ptr), m_read_buf(m_conn_ptr->getReadBuffer().c_array()) 00058 { 00059 setupBuffers(); 00060 } 00061 00068 explicit TCPStreamBuffer(boost::asio::io_service& io_service, 00069 const bool ssl_flag = false) 00070 : m_conn_ptr(new TCPConnection(io_service, ssl_flag)), 00071 m_read_buf(m_conn_ptr->getReadBuffer().c_array()) 00072 { 00073 setupBuffers(); 00074 } 00075 00082 TCPStreamBuffer(boost::asio::io_service& io_service, 00083 TCPConnection::SSLContext& ssl_context) 00084 : m_conn_ptr(new TCPConnection(io_service, ssl_context)), 00085 m_read_buf(m_conn_ptr->getReadBuffer().c_array()) 00086 { 00087 setupBuffers(); 00088 } 00089 00091 virtual ~TCPStreamBuffer() { sync(); } 00092 00094 TCPConnection& getConnection(void) { return *m_conn_ptr; } 00095 00097 const TCPConnection& getConnection(void) const { return *m_conn_ptr; } 00098 00099 00100 protected: 00101 00103 inline void setupBuffers(void) { 00104 // use the TCP connection's read buffer and allow for bytes to be put back 00105 setg(m_read_buf+PUT_BACK_MAX, m_read_buf+PUT_BACK_MAX, m_read_buf+PUT_BACK_MAX); 00106 // set write buffer size-1 so that we have an extra char avail for overflow 00107 setp(m_write_buf, m_write_buf+(WRITE_BUFFER_SIZE-1)); 00108 } 00109 00115 inline int_type flushOutput(void) { 00116 const std::streamsize bytes_to_send = std::streamsize(pptr() - pbase()); 00117 int_type bytes_sent = 0; 00118 if (bytes_to_send > 0) { 00119 boost::mutex::scoped_lock async_lock(m_async_mutex); 00120 m_bytes_transferred = 0; 00121 m_conn_ptr->async_write(boost::asio::buffer(pbase(), bytes_to_send), 00122 boost::bind(&TCPStreamBuffer::operationFinished, this, 00123 boost::asio::placeholders::error, 00124 boost::asio::placeholders::bytes_transferred)); 00125 m_async_done.wait(async_lock); 00126 bytes_sent = m_bytes_transferred; 00127 pbump(-bytes_sent); 00128 if (m_async_error) 00129 bytes_sent = traits_type::eof(); 00130 } 00131 return bytes_sent; 00132 } 00133 00139 virtual int_type underflow(void) { 00140 // first check if we still have bytes available in the read buffer 00141 if (gptr() < egptr()) 00142 return traits_type::to_int_type(*gptr()); 00143 00144 // calculate the number of bytes we will allow to be put back 00145 std::streamsize put_back_num = std::streamsize(gptr() - eback()); 00146 if (put_back_num > PUT_BACK_MAX) 00147 put_back_num = PUT_BACK_MAX; 00148 00149 // copy the last bytes read to the beginning of the buffer (for put back) 00150 if (put_back_num > 0) 00151 memmove(m_read_buf+(PUT_BACK_MAX-put_back_num), gptr()-put_back_num, put_back_num); 00152 00153 // read data from the TCP connection 00154 // note that this has to be an ansynchronous call; otherwise, it cannot 00155 // be cancelled by other threads and will block forever (such as during shutdown) 00156 boost::mutex::scoped_lock async_lock(m_async_mutex); 00157 m_bytes_transferred = 0; 00158 m_conn_ptr->async_read_some(boost::asio::buffer(m_read_buf+PUT_BACK_MAX, 00159 TCPConnection::READ_BUFFER_SIZE-PUT_BACK_MAX), 00160 boost::bind(&TCPStreamBuffer::operationFinished, this, 00161 boost::asio::placeholders::error, 00162 boost::asio::placeholders::bytes_transferred)); 00163 m_async_done.wait(async_lock); 00164 if (m_async_error) 00165 return traits_type::eof(); 00166 00167 // reset buffer pointers now that data is available 00168 setg(m_read_buf+(PUT_BACK_MAX-put_back_num), //< beginning of putback bytes 00169 m_read_buf+PUT_BACK_MAX, //< read position 00170 m_read_buf+PUT_BACK_MAX+m_bytes_transferred); //< end of buffer 00171 00172 // return next character available 00173 return traits_type::to_int_type(*gptr()); 00174 } 00175 00182 virtual int_type overflow(int_type c) { 00183 if (! traits_type::eq_int_type(c, traits_type::eof())) { 00184 // character is not eof -> add it to the end of the write buffer 00185 // we can push this to the back of the write buffer because we set 00186 // the size of the write buffer to 1 less than the actual size using setp() 00187 *pptr() = c; 00188 pbump(1); 00189 } 00190 // flush data in the write buffer by sending it to the TCP connection 00191 return ((flushOutput() == traits_type::eof()) 00192 ? traits_type::eof() : traits_type::not_eof(c)); 00193 } 00194 00203 virtual std::streamsize xsputn(const char_type *s, std::streamsize n) { 00204 const std::streamsize bytes_available = std::streamsize(epptr() - pptr()); 00205 std::streamsize bytes_sent = 0; 00206 if (bytes_available >= n) { 00207 // there is enough room in the buffer -> just put it in there 00208 memcpy(pptr(), s, n); 00209 pbump(n); 00210 bytes_sent = n; 00211 } else { 00212 // there is not enough room left in the buffer 00213 if (bytes_available > 0) { 00214 // fill up the buffer 00215 memcpy(pptr(), s, bytes_available); 00216 pbump(bytes_available); 00217 } 00218 // flush data in the write buffer by sending it to the TCP connection 00219 if (flushOutput() == traits_type::eof()) 00220 return 0; 00221 if ((n-bytes_available) >= (WRITE_BUFFER_SIZE-1)) { 00222 // the remaining data to send is larger than the buffer available 00223 // send it all now rather than buffering 00224 boost::mutex::scoped_lock async_lock(m_async_mutex); 00225 m_bytes_transferred = 0; 00226 m_conn_ptr->async_write(boost::asio::buffer(s+bytes_available, 00227 n-bytes_available), 00228 boost::bind(&TCPStreamBuffer::operationFinished, this, 00229 boost::asio::placeholders::error, 00230 boost::asio::placeholders::bytes_transferred)); 00231 m_async_done.wait(async_lock); 00232 bytes_sent = bytes_available + m_bytes_transferred; 00233 } else { 00234 // the buffer is larger than the remaining data 00235 // put remaining data to the beginning of the output buffer 00236 memcpy(pbase(), s+bytes_available, n-bytes_available); 00237 pbump(n-bytes_available); 00238 bytes_sent = n; 00239 } 00240 } 00241 return bytes_sent; 00242 } 00243 00252 virtual std::streamsize xsgetn(char_type *s, std::streamsize n) { 00253 std::streamsize bytes_remaining = n; 00254 while (bytes_remaining > 0) { 00255 const std::streamsize bytes_available = std::streamsize(egptr() - gptr()); 00256 const std::streamsize bytes_next_read = ((bytes_available >= bytes_remaining) 00257 ? bytes_remaining : bytes_available); 00258 // copy available input data from buffer 00259 if (bytes_next_read > 0) { 00260 memcpy(s, gptr(), bytes_next_read); 00261 gbump(bytes_next_read); 00262 bytes_remaining -= bytes_next_read; 00263 s += bytes_next_read; 00264 } 00265 if (bytes_remaining > 0) { 00266 // call underflow() to read more data 00267 if (traits_type::eq_int_type(underflow(), traits_type::eof())) 00268 break; 00269 } 00270 } 00271 return(n-bytes_remaining); 00272 } 00273 00279 virtual int_type sync(void) { 00280 return ((flushOutput() == traits_type::eof()) ? -1 : 0); 00281 } 00282 00283 00284 private: 00285 00287 inline void operationFinished(const boost::system::error_code& error_code, 00288 std::size_t bytes_transferred) 00289 { 00290 boost::mutex::scoped_lock async_lock(m_async_mutex); 00291 m_async_error = error_code; 00292 m_bytes_transferred = bytes_transferred; 00293 m_async_done.notify_one(); 00294 } 00295 00296 00298 TCPConnectionPtr m_conn_ptr; 00299 00301 boost::mutex m_async_mutex; 00302 00304 boost::condition m_async_done; 00305 00307 boost::system::error_code m_async_error; 00308 00310 std::size_t m_bytes_transferred; 00311 00313 char_type * m_read_buf; 00314 00316 char_type m_write_buf[WRITE_BUFFER_SIZE]; 00317 }; 00318 00319 00323 class TCPStream 00324 : public std::basic_iostream<char, std::char_traits<char> > 00325 { 00326 public: 00327 00328 // data type definitions required for iostream compatability 00329 typedef char char_type; 00330 typedef std::char_traits<char>::int_type int_type; 00331 typedef std::char_traits<char>::off_type off_type; 00332 typedef std::char_traits<char>::pos_type pos_type; 00333 typedef std::char_traits<char> traits_type; 00334 00335 00341 explicit TCPStream(TCPConnectionPtr& conn_ptr) 00342 : m_tcp_buf(conn_ptr) 00343 #ifdef _MSC_VER 00344 , std::basic_iostream<char, std::char_traits<char> >(NULL) 00345 #endif 00346 { 00347 // initialize basic_iostream with pointer to the stream buffer 00348 std::basic_ios<char,std::char_traits<char> >::init(&m_tcp_buf); 00349 } 00350 00357 explicit TCPStream(boost::asio::io_service& io_service, 00358 const bool ssl_flag = false) 00359 : m_tcp_buf(io_service, ssl_flag) 00360 #ifdef _MSC_VER 00361 , std::basic_iostream<char, std::char_traits<char> >(NULL) 00362 #endif 00363 { 00364 // initialize basic_iostream with pointer to the stream buffer 00365 std::basic_ios<char,std::char_traits<char> >::init(&m_tcp_buf); 00366 } 00367 00374 TCPStream(boost::asio::io_service& io_service, 00375 TCPConnection::SSLContext& ssl_context) 00376 : m_tcp_buf(io_service, ssl_context) 00377 #ifdef _MSC_VER 00378 , std::basic_iostream<char, std::char_traits<char> >(NULL) 00379 #endif 00380 { 00381 // initialize basic_iostream with pointer to the stream buffer 00382 std::basic_ios<char,std::char_traits<char> >::init(&m_tcp_buf); 00383 } 00384 00393 inline boost::system::error_code accept(boost::asio::ip::tcp::acceptor& tcp_acceptor) 00394 { 00395 boost::system::error_code ec = m_tcp_buf.getConnection().accept(tcp_acceptor); 00396 if (! ec && getSSLFlag()) ec = m_tcp_buf.getConnection().handshake_server(); 00397 return ec; 00398 } 00399 00408 inline boost::system::error_code connect(boost::asio::ip::tcp::endpoint& tcp_endpoint) 00409 { 00410 boost::system::error_code ec = m_tcp_buf.getConnection().connect(tcp_endpoint); 00411 if (! ec && getSSLFlag()) ec = m_tcp_buf.getConnection().handshake_client(); 00412 return ec; 00413 } 00414 00424 inline boost::system::error_code connect(const boost::asio::ip::address& remote_addr, 00425 const unsigned int remote_port) 00426 { 00427 boost::asio::ip::tcp::endpoint tcp_endpoint(remote_addr, remote_port); 00428 boost::system::error_code ec = m_tcp_buf.getConnection().connect(tcp_endpoint); 00429 if (! ec && getSSLFlag()) ec = m_tcp_buf.getConnection().handshake_client(); 00430 return ec; 00431 } 00432 00434 inline void close(void) { m_tcp_buf.getConnection().close(); } 00435 00436 /* 00437 Use close instead; basic_socket::cancel is deprecated for Windows XP. 00438 00440 inline void cancel(void) { m_tcp_buf.getConnection().cancel(); } 00441 */ 00442 00444 inline bool is_open(void) const { return m_tcp_buf.getConnection().is_open(); } 00445 00447 inline bool getSSLFlag(void) const { return m_tcp_buf.getConnection().getSSLFlag(); } 00448 00450 inline boost::asio::ip::address getRemoteIp(void) const { 00451 return m_tcp_buf.getConnection().getRemoteIp(); 00452 } 00453 00455 TCPStreamBuffer *rdbuf(void) { return &m_tcp_buf; } 00456 00457 00458 private: 00459 00461 TCPStreamBuffer m_tcp_buf; 00462 }; 00463 00464 00465 } // end namespace net 00466 } // end namespace pion 00467 00468 #endif