kbufferedsocket.cpp
00001 /* -*- C++ -*- 00002 * Copyright (C) 2003-2005 Thiago Macieira <thiago.macieira@kdemail.net> 00003 * 00004 * 00005 * Permission is hereby granted, free of charge, to any person obtaining 00006 * a copy of this software and associated documentation files (the 00007 * "Software"), to deal in the Software without restriction, including 00008 * without limitation the rights to use, copy, modify, merge, publish, 00009 * distribute, sublicense, and/or sell copies of the Software, and to 00010 * permit persons to whom the Software is furnished to do so, subject to 00011 * the following conditions: 00012 * 00013 * The above copyright notice and this permission notice shall be included 00014 * in all copies or substantial portions of the Software. 00015 * 00016 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 00017 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 00018 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 00019 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE 00020 * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION 00021 * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION 00022 * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. 00023 */ 00024 00025 #include <config.h> 00026 00027 #include <qmutex.h> 00028 #include <qtimer.h> 00029 00030 #include "ksocketdevice.h" 00031 #include "ksocketaddress.h" 00032 #include "ksocketbuffer_p.h" 00033 #include "kbufferedsocket.h" 00034 00035 using namespace KNetwork; 00036 using namespace KNetwork::Internal; 00037 00038 class KNetwork::KBufferedSocketPrivate 00039 { 00040 public: 00041 mutable KSocketBuffer *input, *output; 00042 00043 KBufferedSocketPrivate() 00044 { 00045 input = 0L; 00046 output = 0L; 00047 } 00048 }; 00049 00050 KBufferedSocket::KBufferedSocket(const QString& host, const QString& service, 00051 QObject *parent, const char *name) 00052 : KStreamSocket(host, service, parent, name), 00053 d(new KBufferedSocketPrivate) 00054 { 00055 setInputBuffering(true); 00056 setOutputBuffering(true); 00057 } 00058 00059 KBufferedSocket::~KBufferedSocket() 00060 { 00061 closeNow(); 00062 delete d->input; 00063 delete d->output; 00064 delete d; 00065 } 00066 00067 void KBufferedSocket::setSocketDevice(KSocketDevice* device) 00068 { 00069 KStreamSocket::setSocketDevice(device); 00070 device->setBlocking(false); 00071 } 00072 00073 bool KBufferedSocket::setSocketOptions(int opts) 00074 { 00075 if (opts == Blocking) 00076 return false; 00077 00078 opts &= ~Blocking; 00079 return KStreamSocket::setSocketOptions(opts); 00080 } 00081 00082 void KBufferedSocket::close() 00083 { 00084 if (!d->output || d->output->isEmpty()) 00085 closeNow(); 00086 else 00087 { 00088 setState(Closing); 00089 QSocketNotifier *n = socketDevice()->readNotifier(); 00090 if (n) 00091 n->setEnabled(false); 00092 emit stateChanged(Closing); 00093 } 00094 } 00095 00096 Q_LONG KBufferedSocket::bytesAvailable() const 00097 { 00098 if (!d->input) 00099 return KStreamSocket::bytesAvailable(); 00100 00101 return d->input->length(); 00102 } 00103 00104 Q_LONG KBufferedSocket::waitForMore(int msecs, bool *timeout) 00105 { 00106 Q_LONG retval = KStreamSocket::waitForMore(msecs, timeout); 00107 if (d->input) 00108 { 00109 resetError(); 00110 slotReadActivity(); 00111 return bytesAvailable(); 00112 } 00113 return retval; 00114 } 00115 00116 Q_LONG KBufferedSocket::readBlock(char *data, Q_ULONG maxlen) 00117 { 00118 if (d->input) 00119 { 00120 if (d->input->isEmpty()) 00121 { 00122 setError(IO_ReadError, WouldBlock); 00123 emit gotError(WouldBlock); 00124 return -1; 00125 } 00126 resetError(); 00127 return d->input->consumeBuffer(data, maxlen); 00128 } 00129 return KStreamSocket::readBlock(data, maxlen); 00130 } 00131 00132 Q_LONG KBufferedSocket::readBlock(char *data, Q_ULONG maxlen, KSocketAddress& from) 00133 { 00134 from = peerAddress(); 00135 return readBlock(data, maxlen); 00136 } 00137 00138 Q_LONG KBufferedSocket::peekBlock(char *data, Q_ULONG maxlen) 00139 { 00140 if (d->input) 00141 { 00142 if (d->input->isEmpty()) 00143 { 00144 setError(IO_ReadError, WouldBlock); 00145 emit gotError(WouldBlock); 00146 return -1; 00147 } 00148 resetError(); 00149 return d->input->consumeBuffer(data, maxlen, false); 00150 } 00151 return KStreamSocket::peekBlock(data, maxlen); 00152 } 00153 00154 Q_LONG KBufferedSocket::peekBlock(char *data, Q_ULONG maxlen, KSocketAddress& from) 00155 { 00156 from = peerAddress(); 00157 return peekBlock(data, maxlen); 00158 } 00159 00160 Q_LONG KBufferedSocket::writeBlock(const char *data, Q_ULONG len) 00161 { 00162 if (state() != Connected) 00163 { 00164 // cannot write now! 00165 setError(IO_WriteError, NotConnected); 00166 return -1; 00167 } 00168 00169 if (d->output) 00170 { 00171 if (d->output->isFull()) 00172 { 00173 setError(IO_WriteError, WouldBlock); 00174 emit gotError(WouldBlock); 00175 return -1; 00176 } 00177 resetError(); 00178 00179 // enable notifier to send data 00180 QSocketNotifier *n = socketDevice()->writeNotifier(); 00181 if (n) 00182 n->setEnabled(true); 00183 00184 return d->output->feedBuffer(data, len); 00185 } 00186 00187 return KStreamSocket::writeBlock(data, len); 00188 } 00189 00190 Q_LONG KBufferedSocket::writeBlock(const char *data, Q_ULONG maxlen, 00191 const KSocketAddress&) 00192 { 00193 // ignore the third parameter 00194 return writeBlock(data, maxlen); 00195 } 00196 00197 void KBufferedSocket::enableRead(bool enable) 00198 { 00199 KStreamSocket::enableRead(enable); 00200 if (!enable && d->input) 00201 { 00202 // reenable it 00203 QSocketNotifier *n = socketDevice()->readNotifier(); 00204 if (n) 00205 n->setEnabled(true); 00206 } 00207 00208 if (enable && state() != Connected && d->input && !d->input->isEmpty()) 00209 // this means the buffer is still dirty 00210 // allow the signal to be emitted 00211 QTimer::singleShot(0, this, SLOT(slotReadActivity())); 00212 } 00213 00214 void KBufferedSocket::enableWrite(bool enable) 00215 { 00216 KStreamSocket::enableWrite(enable); 00217 if (!enable && d->output && !d->output->isEmpty()) 00218 { 00219 // reenable it 00220 QSocketNotifier *n = socketDevice()->writeNotifier(); 00221 if (n) 00222 n->setEnabled(true); 00223 } 00224 } 00225 00226 void KBufferedSocket::stateChanging(SocketState newState) 00227 { 00228 if (newState == Connecting || newState == Connected) 00229 { 00230 // we're going to connect 00231 // make sure the buffers are clean 00232 if (d->input) 00233 d->input->clear(); 00234 if (d->output) 00235 d->output->clear(); 00236 00237 // also, turn on notifiers 00238 enableRead(emitsReadyRead()); 00239 enableWrite(emitsReadyWrite()); 00240 } 00241 KStreamSocket::stateChanging(newState); 00242 } 00243 00244 void KBufferedSocket::setInputBuffering(bool enable) 00245 { 00246 QMutexLocker locker(mutex()); 00247 if (!enable) 00248 { 00249 delete d->input; 00250 d->input = 0L; 00251 } 00252 else if (d->input == 0L) 00253 { 00254 d->input = new KSocketBuffer; 00255 } 00256 } 00257 00258 KIOBufferBase* KBufferedSocket::inputBuffer() 00259 { 00260 return d->input; 00261 } 00262 00263 void KBufferedSocket::setOutputBuffering(bool enable) 00264 { 00265 QMutexLocker locker(mutex()); 00266 if (!enable) 00267 { 00268 delete d->output; 00269 d->output = 0L; 00270 } 00271 else if (d->output == 0L) 00272 { 00273 d->output = new KSocketBuffer; 00274 } 00275 } 00276 00277 KIOBufferBase* KBufferedSocket::outputBuffer() 00278 { 00279 return d->output; 00280 } 00281 00282 Q_ULONG KBufferedSocket::bytesToWrite() const 00283 { 00284 if (!d->output) 00285 return 0; 00286 00287 return d->output->length(); 00288 } 00289 00290 void KBufferedSocket::closeNow() 00291 { 00292 KStreamSocket::close(); 00293 if (d->output) 00294 d->output->clear(); 00295 } 00296 00297 bool KBufferedSocket::canReadLine() const 00298 { 00299 if (!d->input) 00300 return false; 00301 00302 return d->input->canReadLine(); 00303 } 00304 00305 QCString KBufferedSocket::readLine() 00306 { 00307 return d->input->readLine(); 00308 } 00309 00310 void KBufferedSocket::waitForConnect() 00311 { 00312 if (state() != Connecting) 00313 return; // nothing to be waited on 00314 00315 KStreamSocket::setSocketOptions(socketOptions() | Blocking); 00316 connectionEvent(); 00317 KStreamSocket::setSocketOptions(socketOptions() & ~Blocking); 00318 } 00319 00320 void KBufferedSocket::slotReadActivity() 00321 { 00322 if (d->input && state() == Connected) 00323 { 00324 mutex()->lock(); 00325 Q_LONG len = d->input->receiveFrom(socketDevice()); 00326 00327 if (len == -1) 00328 { 00329 if (socketDevice()->error() != WouldBlock) 00330 { 00331 // nope, another error! 00332 copyError(); 00333 mutex()->unlock(); 00334 emit gotError(error()); 00335 closeNow(); // emits closed 00336 return; 00337 } 00338 } 00339 else if (len == 0) 00340 { 00341 // remotely closed 00342 setError(IO_ReadError, RemotelyDisconnected); 00343 mutex()->unlock(); 00344 emit gotError(error()); 00345 closeNow(); // emits closed 00346 return; 00347 } 00348 00349 // no error 00350 mutex()->unlock(); 00351 } 00352 00353 if (state() == Connected) 00354 KStreamSocket::slotReadActivity(); // this emits readyRead 00355 else if (emitsReadyRead()) // state() != Connected 00356 { 00357 if (d->input && !d->input->isEmpty()) 00358 { 00359 // buffer isn't empty 00360 // keep emitting signals till it is 00361 QTimer::singleShot(0, this, SLOT(slotReadActivity())); 00362 emit readyRead(); 00363 } 00364 } 00365 } 00366 00367 void KBufferedSocket::slotWriteActivity() 00368 { 00369 if (d->output && !d->output->isEmpty() && 00370 (state() == Connected || state() == Closing)) 00371 { 00372 mutex()->lock(); 00373 Q_LONG len = d->output->sendTo(socketDevice()); 00374 00375 if (len == -1) 00376 { 00377 if (socketDevice()->error() != WouldBlock) 00378 { 00379 // nope, another error! 00380 copyError(); 00381 mutex()->unlock(); 00382 emit gotError(error()); 00383 closeNow(); 00384 return; 00385 } 00386 } 00387 else if (len == 0) 00388 { 00389 // remotely closed 00390 setError(IO_ReadError, RemotelyDisconnected); 00391 mutex()->unlock(); 00392 emit gotError(error()); 00393 closeNow(); 00394 return; 00395 } 00396 00397 if (d->output->isEmpty()) 00398 // deactivate the notifier until we have something to send 00399 // writeNotifier can't return NULL here 00400 socketDevice()->writeNotifier()->setEnabled(false); 00401 00402 mutex()->unlock(); 00403 emit bytesWritten(len); 00404 } 00405 00406 if (state() != Closing) 00407 KStreamSocket::slotWriteActivity(); 00408 else if (d->output && d->output->isEmpty() && state() == Closing) 00409 { 00410 KStreamSocket::close(); // finished sending data 00411 } 00412 } 00413 00414 #include "kbufferedsocket.moc"