10 #ifndef __PION_TCPSTREAM_HEADER__
11 #define __PION_TCPSTREAM_HEADER__
16 #include <boost/bind.hpp>
17 #include <boost/thread/mutex.hpp>
18 #include <boost/thread/condition.hpp>
19 #include <pion/PionConfig.hpp>
20 #include <pion/net/TCPConnection.hpp>
33 :
public std::basic_streambuf<char, std::char_traits<char> >
38 typedef char char_type;
39 typedef std::char_traits<char>::int_type int_type;
40 typedef std::char_traits<char>::off_type off_type;
41 typedef std::char_traits<char>::pos_type pos_type;
42 typedef std::char_traits<char> traits_type;
47 WRITE_BUFFER_SIZE = 8192
57 : m_conn_ptr(conn_ptr), m_read_buf(m_conn_ptr->getReadBuffer().c_array())
69 const bool ssl_flag =
false)
71 m_read_buf(m_conn_ptr->getReadBuffer().c_array())
83 TCPConnection::SSLContext& ssl_context)
85 m_read_buf(m_conn_ptr->getReadBuffer().c_array())
105 setg(m_read_buf+PUT_BACK_MAX, m_read_buf+PUT_BACK_MAX, m_read_buf+PUT_BACK_MAX);
107 setp(m_write_buf, m_write_buf+(WRITE_BUFFER_SIZE-1));
116 const std::streamsize bytes_to_send = std::streamsize(pptr() - pbase());
117 int_type bytes_sent = 0;
118 if (bytes_to_send > 0) {
119 boost::mutex::scoped_lock async_lock(m_async_mutex);
120 m_bytes_transferred = 0;
121 m_conn_ptr->async_write(boost::asio::buffer(pbase(), bytes_to_send),
122 boost::bind(&TCPStreamBuffer::operationFinished,
this,
123 boost::asio::placeholders::error,
124 boost::asio::placeholders::bytes_transferred));
125 m_async_done.wait(async_lock);
126 bytes_sent = m_bytes_transferred;
129 bytes_sent = traits_type::eof();
141 if (gptr() < egptr())
142 return traits_type::to_int_type(*gptr());
145 std::streamsize put_back_num = std::streamsize(gptr() - eback());
146 if (put_back_num > PUT_BACK_MAX)
147 put_back_num = PUT_BACK_MAX;
150 if (put_back_num > 0)
151 memmove(m_read_buf+(PUT_BACK_MAX-put_back_num), gptr()-put_back_num, put_back_num);
156 boost::mutex::scoped_lock async_lock(m_async_mutex);
157 m_bytes_transferred = 0;
158 m_conn_ptr->async_read_some(boost::asio::buffer(m_read_buf+PUT_BACK_MAX,
159 TCPConnection::READ_BUFFER_SIZE-PUT_BACK_MAX),
160 boost::bind(&TCPStreamBuffer::operationFinished,
this,
161 boost::asio::placeholders::error,
162 boost::asio::placeholders::bytes_transferred));
163 m_async_done.wait(async_lock);
165 return traits_type::eof();
168 setg(m_read_buf+(PUT_BACK_MAX-put_back_num),
169 m_read_buf+PUT_BACK_MAX,
170 m_read_buf+PUT_BACK_MAX+m_bytes_transferred);
173 return traits_type::to_int_type(*gptr());
183 if (! traits_type::eq_int_type(c, traits_type::eof())) {
192 ? traits_type::eof() : traits_type::not_eof(c));
203 virtual std::streamsize
xsputn(
const char_type *s, std::streamsize n) {
204 const std::streamsize bytes_available = std::streamsize(epptr() - pptr());
205 std::streamsize bytes_sent = 0;
206 if (bytes_available >= n) {
208 memcpy(pptr(), s, n);
213 if (bytes_available > 0) {
215 memcpy(pptr(), s, bytes_available);
216 pbump(bytes_available);
221 if ((n-bytes_available) >= (WRITE_BUFFER_SIZE-1)) {
224 boost::mutex::scoped_lock async_lock(m_async_mutex);
225 m_bytes_transferred = 0;
226 m_conn_ptr->async_write(boost::asio::buffer(s+bytes_available,
228 boost::bind(&TCPStreamBuffer::operationFinished,
this,
229 boost::asio::placeholders::error,
230 boost::asio::placeholders::bytes_transferred));
231 m_async_done.wait(async_lock);
232 bytes_sent = bytes_available + m_bytes_transferred;
236 memcpy(pbase(), s+bytes_available, n-bytes_available);
237 pbump(n-bytes_available);
252 virtual std::streamsize
xsgetn(char_type *s, std::streamsize n) {
253 std::streamsize bytes_remaining = n;
254 while (bytes_remaining > 0) {
255 const std::streamsize bytes_available = std::streamsize(egptr() - gptr());
256 const std::streamsize bytes_next_read = ((bytes_available >= bytes_remaining)
257 ? bytes_remaining : bytes_available);
259 if (bytes_next_read > 0) {
260 memcpy(s, gptr(), bytes_next_read);
261 gbump(bytes_next_read);
262 bytes_remaining -= bytes_next_read;
263 s += bytes_next_read;
265 if (bytes_remaining > 0) {
267 if (traits_type::eq_int_type(
underflow(), traits_type::eof()))
271 return(n-bytes_remaining);
280 return ((
flushOutput() == traits_type::eof()) ? -1 : 0);
287 inline void operationFinished(
const boost::system::error_code& error_code,
288 std::size_t bytes_transferred)
290 boost::mutex::scoped_lock async_lock(m_async_mutex);
291 m_async_error = error_code;
292 m_bytes_transferred = bytes_transferred;
293 m_async_done.notify_one();
298 TCPConnectionPtr m_conn_ptr;
301 boost::mutex m_async_mutex;
304 boost::condition m_async_done;
307 boost::system::error_code m_async_error;
310 std::size_t m_bytes_transferred;
313 char_type * m_read_buf;
316 char_type m_write_buf[WRITE_BUFFER_SIZE];
324 :
public std::basic_iostream<char, std::char_traits<char> >
329 typedef char char_type;
330 typedef std::char_traits<char>::int_type int_type;
331 typedef std::char_traits<char>::off_type off_type;
332 typedef std::char_traits<char>::pos_type pos_type;
333 typedef std::char_traits<char> traits_type;
342 : m_tcp_buf(conn_ptr)
344 ,
std::basic_iostream<char,
std::char_traits<char> >(NULL)
348 std::basic_ios<char,std::char_traits<char> >::init(&m_tcp_buf);
358 const bool ssl_flag =
false)
359 : m_tcp_buf(io_service, ssl_flag)
361 ,
std::basic_iostream<char,
std::char_traits<char> >(NULL)
365 std::basic_ios<char,std::char_traits<char> >::init(&m_tcp_buf);
375 TCPConnection::SSLContext& ssl_context)
376 : m_tcp_buf(io_service, ssl_context)
378 ,
std::basic_iostream<char,
std::char_traits<char> >(NULL)
382 std::basic_ios<char,std::char_traits<char> >::init(&m_tcp_buf);
393 inline boost::system::error_code
accept(boost::asio::ip::tcp::acceptor& tcp_acceptor)
408 inline boost::system::error_code
connect(boost::asio::ip::tcp::endpoint& tcp_endpoint)
424 inline boost::system::error_code
connect(
const boost::asio::ip::address& remote_addr,
425 const unsigned int remote_port)
427 boost::asio::ip::tcp::endpoint tcp_endpoint(remote_addr, remote_port);
TCPStream(boost::asio::io_service &io_service, const bool ssl_flag=false)
boost::system::error_code connect(boost::asio::ip::tcp::endpoint &tcp_endpoint)
virtual int_type underflow(void)
TCPStreamBuffer(TCPConnectionPtr &conn_ptr)
boost::system::error_code handshake_client(void)
virtual int_type overflow(int_type c)
boost::asio::ip::address getRemoteIp(void) const
returns the client's IP address
bool is_open(void) const
returns true if the connection is currently open
TCPStreamBuffer * rdbuf(void)
returns a pointer to the stream buffer in use
virtual std::streamsize xsputn(const char_type *s, std::streamsize n)
TCPStream(TCPConnectionPtr &conn_ptr)
boost::asio::ip::address getRemoteIp(void) const
returns the client's IP address
const TCPConnection & getConnection(void) const
returns a const reference to the current TCP connection
boost::system::error_code connect(boost::asio::ip::tcp::endpoint &tcp_endpoint)
bool getSSLFlag(void) const
returns true if the connection is encrypted using SSL
void setupBuffers(void)
sets up the read and write buffers for input and output
TCPStreamBuffer(boost::asio::io_service &io_service, TCPConnection::SSLContext &ssl_context)
boost::system::error_code accept(boost::asio::ip::tcp::acceptor &tcp_acceptor)
virtual std::streamsize xsgetn(char_type *s, std::streamsize n)
boost::system::error_code accept(boost::asio::ip::tcp::acceptor &tcp_acceptor)
virtual int_type sync(void)
int_type flushOutput(void)
void close(void)
closes the tcp socket and cancels any pending asynchronous operations
TCPStream(boost::asio::io_service &io_service, TCPConnection::SSLContext &ssl_context)
boost::system::error_code handshake_server(void)
the following enables use of the lock-free cache
bool getSSLFlag(void) const
returns true if the connection is encrypted using SSL
void close(void)
closes the tcp connection
TCPStreamBuffer(boost::asio::io_service &io_service, const bool ssl_flag=false)
bool is_open(void) const
returns true if the connection is currently open
TCPConnection & getConnection(void)
returns a reference to the current TCP connection
virtual ~TCPStreamBuffer()
virtual destructor flushes the write buffer
boost::system::error_code connect(const boost::asio::ip::address &remote_addr, const unsigned int remote_port)