akonadi
session.cpp
00001 /* 00002 Copyright (c) 2007 Volker Krause <vkrause@kde.org> 00003 00004 This library is free software; you can redistribute it and/or modify it 00005 under the terms of the GNU Library General Public License as published by 00006 the Free Software Foundation; either version 2 of the License, or (at your 00007 option) any later version. 00008 00009 This library is distributed in the hope that it will be useful, but WITHOUT 00010 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 00011 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public 00012 License for more details. 00013 00014 You should have received a copy of the GNU Library General Public License 00015 along with this library; see the file COPYING.LIB. If not, write to the 00016 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 00017 02110-1301, USA. 00018 */ 00019 00020 #include "session.h" 00021 #include "session_p.h" 00022 00023 #include "imapparser_p.h" 00024 #include "job.h" 00025 #include "job_p.h" 00026 #include "servermanager.h" 00027 #include "servermanager_p.h" 00028 #include "xdgbasedirs_p.h" 00029 00030 #include <kdebug.h> 00031 #include <klocale.h> 00032 00033 #include <QCoreApplication> 00034 #include <QtCore/QDir> 00035 #include <QtCore/QQueue> 00036 #include <QtCore/QThreadStorage> 00037 #include <QtCore/QTimer> 00038 #include <QtCore/QThread> 00039 #include <QSettings> 00040 00041 #include <QtNetwork/QLocalSocket> 00042 #include <QtNetwork/QTcpSocket> 00043 #include <QtNetwork/QHostAddress> 00044 00045 // ### FIXME pipelining got broken by switching result emission in JobPrivate::handleResponse to delayed emission 00046 // in order to work around exec() deadlocks. As a result of that Session knows to late about a finished job and still 00047 // sends responses for the next one to the already finished one 00048 #define PIPELINE_LENGTH 0 00049 //#define PIPELINE_LENGTH 2 00050 00051 using namespace Akonadi; 00052 00053 00054 //@cond PRIVATE 00055 00056 void SessionPrivate::startNext() 00057 { 00058 QTimer::singleShot( 0, mParent, SLOT(doStartNext()) ); 00059 } 00060 00061 void SessionPrivate::reconnect() 00062 { 00063 QLocalSocket *localSocket = qobject_cast<QLocalSocket*>( socket ); 00064 if ( localSocket && (localSocket->state() == QLocalSocket::ConnectedState 00065 || localSocket->state() == QLocalSocket::ConnectingState ) ) { 00066 // nothing to do, we are still/already connected 00067 return; 00068 } 00069 00070 QTcpSocket *tcpSocket = qobject_cast<QTcpSocket*>( socket ); 00071 if ( tcpSocket && (tcpSocket->state() == QTcpSocket::ConnectedState 00072 || tcpSocket->state() == QTcpSocket::ConnectingState ) ) { 00073 // same here, but for TCP 00074 return; 00075 } 00076 00077 // try to figure out where to connect to 00078 QString serverAddress; 00079 quint16 port = 0; 00080 bool useTcp = false; 00081 00082 // env var has precedence 00083 const QByteArray serverAddressEnvVar = qgetenv( "AKONADI_SERVER_ADDRESS" ); 00084 if ( !serverAddressEnvVar.isEmpty() ) { 00085 const int pos = serverAddressEnvVar.indexOf( ':' ); 00086 const QByteArray protocol = serverAddressEnvVar.left( pos ); 00087 QMap<QString, QString> options; 00088 foreach ( const QString &entry, QString::fromLatin1( serverAddressEnvVar.mid( pos + 1 ) ).split( QLatin1Char(',') ) ) { 00089 const QStringList pair = entry.split( QLatin1Char('=') ); 00090 if ( pair.size() != 2 ) 00091 continue; 00092 options.insert( pair.first(), pair.last() ); 00093 } 00094 kDebug() << protocol << options; 00095 00096 if ( protocol == "tcp" ) { 00097 serverAddress = options.value( QLatin1String( "host" ) ); 00098 port = options.value( QLatin1String( "port" ) ).toUInt(); 00099 useTcp = true; 00100 } else if ( protocol == "unix" ) { 00101 serverAddress = options.value( QLatin1String( "path" ) ); 00102 } else if ( protocol == "pipe" ) { 00103 serverAddress = options.value( QLatin1String( "name" ) ); 00104 } 00105 } 00106 00107 // try config file next, fall back to defaults if that fails as well 00108 if ( serverAddress.isEmpty() ) { 00109 const QString connectionConfigFile = XdgBaseDirs::akonadiConnectionConfigFile(); 00110 const QFileInfo fileInfo( connectionConfigFile ); 00111 if ( !fileInfo.exists() ) { 00112 kDebug() << "Akonadi Client Session: connection config file '" 00113 "akonadi/akonadiconnectionrc' can not be found in" 00114 << XdgBaseDirs::homePath( "config" ) << "nor in any of" 00115 << XdgBaseDirs::systemPathList( "config" ); 00116 } 00117 const QSettings connectionSettings( connectionConfigFile, QSettings::IniFormat ); 00118 00119 #ifdef Q_OS_WIN //krazy:exclude=cpp 00120 serverAddress = connectionSettings.value( QLatin1String( "Data/NamedPipe" ), QLatin1String( "Akonadi" ) ).toString(); 00121 #else 00122 const QString defaultSocketDir = XdgBaseDirs::saveDir( "data", QLatin1String( "akonadi" ) ); 00123 serverAddress = connectionSettings.value( QLatin1String( "Data/UnixPath" ), QString(defaultSocketDir + QLatin1String( "/akonadiserver.socket" )) ).toString(); 00124 #endif 00125 } 00126 #ifdef Q_OS_WINCE 00127 useTcp = true; 00128 #endif 00129 00130 // create sockets if not yet done, note that this does not yet allow changing socket types on the fly 00131 // but that's probably not something we need to support anyway 00132 if ( !socket ) { 00133 if ( !useTcp ) { 00134 socket = localSocket = new QLocalSocket( mParent ); 00135 mParent->connect( localSocket, SIGNAL(error(QLocalSocket::LocalSocketError)), SLOT(socketError(QLocalSocket::LocalSocketError)) ); 00136 } else { 00137 socket = tcpSocket = new QTcpSocket( mParent ); 00138 mParent->connect( tcpSocket, SIGNAL(error(QAbstractSocket::SocketError)), SLOT(socketError(QAbstractSocket::SocketError)) ); 00139 } 00140 mParent->connect( socket, SIGNAL(disconnected()), SLOT(socketDisconnected()) ); 00141 mParent->connect( socket, SIGNAL(readyRead()), SLOT(dataReceived()) ); 00142 } 00143 00144 // actually do connect 00145 kDebug() << "connectToServer" << serverAddress; 00146 #ifdef Q_OS_WINCE 00147 tcpSocket->connectToHost( QHostAddress::LocalHost, 31414 ); 00148 #else 00149 if ( !useTcp ) { 00150 localSocket->connectToServer( serverAddress ); 00151 } else { 00152 tcpSocket->connectToHost( serverAddress, port ); 00153 } 00154 #endif 00155 00156 emit mParent->reconnected(); 00157 } 00158 00159 void SessionPrivate::socketError( QLocalSocket::LocalSocketError ) 00160 { 00161 Q_ASSERT( mParent->sender() == socket ); 00162 kWarning() << "Socket error occurred:" << qobject_cast<QLocalSocket*>( socket )->errorString(); 00163 socketDisconnected(); 00164 } 00165 00166 void SessionPrivate::socketError( QAbstractSocket::SocketError ) 00167 { 00168 Q_ASSERT( mParent->sender() == socket ); 00169 kWarning() << "Socket error occurred:" << qobject_cast<QTcpSocket*>( socket )->errorString(); 00170 socketDisconnected(); 00171 } 00172 00173 void SessionPrivate::socketDisconnected() 00174 { 00175 if ( currentJob ) 00176 currentJob->d_ptr->lostConnection(); 00177 connected = false; 00178 } 00179 00180 void SessionPrivate::dataReceived() 00181 { 00182 while ( socket->bytesAvailable() > 0 ) { 00183 if ( parser->continuationSize() > 1 ) { 00184 const QByteArray data = socket->read( qMin( socket->bytesAvailable(), parser->continuationSize() - 1 ) ); 00185 parser->parseBlock( data ); 00186 } else if ( socket->canReadLine() ) { 00187 if ( !parser->parseNextLine( socket->readLine() ) ) 00188 continue; // response not yet completed 00189 00190 // handle login response 00191 if ( parser->tag() == QByteArray( "0" ) ) { 00192 if ( parser->data().startsWith( "OK" ) ) { //krazy:exclude=strings 00193 connected = true; 00194 startNext(); 00195 } else { 00196 kWarning() << "Unable to login to Akonadi server:" << parser->data(); 00197 socket->close(); 00198 QTimer::singleShot( 1000, mParent, SLOT(reconnect()) ); 00199 } 00200 } 00201 00202 // send login command 00203 if ( parser->tag() == "*" && parser->data().startsWith( "OK Akonadi" ) ) { 00204 const int pos = parser->data().indexOf( "[PROTOCOL" ); 00205 if ( pos > 0 ) { 00206 qint64 tmp = 0; 00207 ImapParser::parseNumber( parser->data(), tmp, 0, pos + 9 ); 00208 protocolVersion = tmp; 00209 Internal::setServerProtocolVersion( tmp ); 00210 } 00211 kDebug() << "Server protocol version is:" << protocolVersion; 00212 00213 writeData( "0 LOGIN " + ImapParser::quote( sessionId ) + '\n' ); 00214 00215 // work for the current job 00216 } else { 00217 if ( currentJob ) 00218 currentJob->d_ptr->handleResponse( parser->tag(), parser->data() ); 00219 } 00220 00221 // reset parser stuff 00222 parser->reset(); 00223 } else { 00224 break; // nothing we can do for now 00225 } 00226 } 00227 } 00228 00229 bool SessionPrivate::canPipelineNext() 00230 { 00231 if ( queue.isEmpty() || pipeline.count() >= PIPELINE_LENGTH ) 00232 return false; 00233 if ( pipeline.isEmpty() && currentJob ) 00234 return currentJob->d_ptr->mWriteFinished; 00235 if ( !pipeline.isEmpty() ) 00236 return pipeline.last()->d_ptr->mWriteFinished; 00237 return false; 00238 } 00239 00240 void SessionPrivate::doStartNext() 00241 { 00242 if ( !connected || (queue.isEmpty() && pipeline.isEmpty()) ) 00243 return; 00244 if ( canPipelineNext() ) { 00245 Akonadi::Job *nextJob = queue.dequeue(); 00246 pipeline.enqueue( nextJob ); 00247 startJob( nextJob ); 00248 } 00249 if ( jobRunning ) 00250 return; 00251 jobRunning = true; 00252 if ( !pipeline.isEmpty() ) { 00253 currentJob = pipeline.dequeue(); 00254 } else { 00255 currentJob = queue.dequeue(); 00256 startJob( currentJob ); 00257 } 00258 } 00259 00260 void SessionPrivate::startJob( Job *job ) 00261 { 00262 if ( protocolVersion < minimumProtocolVersion() ) { 00263 job->setError( Job::ProtocolVersionMismatch ); 00264 job->setErrorText( i18n( "Protocol version %1 found, expected at least %2", protocolVersion, minimumProtocolVersion() ) ); 00265 job->emitResult(); 00266 } else { 00267 job->d_ptr->startQueued(); 00268 } 00269 } 00270 00271 void SessionPrivate::endJob( Job *job ) 00272 { 00273 job->emitResult(); 00274 } 00275 00276 void SessionPrivate::jobDone(KJob * job) 00277 { 00278 // ### careful, this method can be called from the QObject dtor of job (see jobDestroyed() below) 00279 // so don't call any methods on job itself 00280 if ( job == currentJob ) { 00281 if ( pipeline.isEmpty() ) { 00282 jobRunning = false; 00283 currentJob = 0; 00284 } else { 00285 currentJob = pipeline.dequeue(); 00286 } 00287 startNext(); 00288 } else { 00289 // non-current job finished, likely canceled while still in the queue 00290 queue.removeAll( static_cast<Akonadi::Job*>( job ) ); 00291 // ### likely not enough to really cancel already running jobs 00292 pipeline.removeAll( static_cast<Akonadi::Job*>( job ) ); 00293 } 00294 } 00295 00296 void SessionPrivate::jobWriteFinished( Akonadi::Job* job ) 00297 { 00298 Q_ASSERT( (job == currentJob && pipeline.isEmpty()) || (job = pipeline.last()) ); 00299 Q_UNUSED( job ); 00300 00301 startNext(); 00302 } 00303 00304 void SessionPrivate::jobDestroyed(QObject * job) 00305 { 00306 // careful, accessing non-QObject methods of job will fail here already 00307 jobDone( static_cast<KJob*>( job ) ); 00308 } 00309 00310 void SessionPrivate::addJob(Job * job) 00311 { 00312 queue.append( job ); 00313 QObject::connect( job, SIGNAL(result(KJob*)), mParent, SLOT(jobDone(KJob*)) ); 00314 QObject::connect( job, SIGNAL(writeFinished(Akonadi::Job*)), mParent, SLOT(jobWriteFinished(Akonadi::Job*)) ); 00315 QObject::connect( job, SIGNAL(destroyed(QObject*)), mParent, SLOT(jobDestroyed(QObject*)) ); 00316 startNext(); 00317 } 00318 00319 int SessionPrivate::nextTag() 00320 { 00321 return theNextTag++; 00322 } 00323 00324 void SessionPrivate::writeData(const QByteArray & data) 00325 { 00326 if ( socket ) 00327 socket->write( data ); 00328 else 00329 kWarning() << "Trying to write while session is disconnected!" << kBacktrace(); 00330 } 00331 00332 void SessionPrivate::serverStateChanged( ServerManager::State state ) 00333 { 00334 if ( state == ServerManager::Running && !connected ) 00335 reconnect(); 00336 } 00337 00338 void SessionPrivate::itemRevisionChanged( Akonadi::Item::Id itemId, int oldRevision, int newRevision ) 00339 { 00340 // only deal with the queue, for the guys in the pipeline it's too late already anyway 00341 // and they shouldn't have gotten there if they depend on a preceding job anyway. 00342 foreach ( Job *job, queue ) 00343 job->d_ptr->updateItemRevision( itemId, oldRevision, newRevision ); 00344 } 00345 00346 //@endcond 00347 00348 00349 SessionPrivate::SessionPrivate( Session *parent ) 00350 : mParent( parent ), socket( 0 ), protocolVersion( 0 ), currentJob( 0 ), parser( 0 ) 00351 { 00352 } 00353 00354 void SessionPrivate::init( const QByteArray &id ) 00355 { 00356 kDebug() << id; 00357 parser = new ImapParser(); 00358 00359 if ( !id.isEmpty() ) { 00360 sessionId = id; 00361 } else { 00362 sessionId = QCoreApplication::instance()->applicationName().toUtf8() 00363 + '-' + QByteArray::number( qrand() ); 00364 } 00365 00366 connected = false; 00367 theNextTag = 1; 00368 jobRunning = false; 00369 00370 if ( ServerManager::state() == ServerManager::NotRunning ) 00371 ServerManager::start(); 00372 mParent->connect( ServerManager::self(), SIGNAL(stateChanged(Akonadi::ServerManager::State)), 00373 SLOT(serverStateChanged(Akonadi::ServerManager::State)) ); 00374 00375 reconnect(); 00376 } 00377 00378 Session::Session(const QByteArray & sessionId, QObject * parent) : 00379 QObject( parent ), 00380 d( new SessionPrivate( this ) ) 00381 { 00382 d->init( sessionId ); 00383 } 00384 00385 Session::Session( SessionPrivate *dd, const QByteArray & sessionId, QObject * parent) 00386 : QObject( parent ), 00387 d( dd ) 00388 { 00389 d->init( sessionId ); 00390 } 00391 00392 Session::~Session() 00393 { 00394 clear(); 00395 delete d; 00396 } 00397 00398 QByteArray Session::sessionId() const 00399 { 00400 return d->sessionId; 00401 } 00402 00403 static QThreadStorage<Session*> instances; 00404 00405 void SessionPrivate::createDefaultSession( const QByteArray &sessionId ) 00406 { 00407 Q_ASSERT_X( !sessionId.isEmpty(), "SessionPrivate::createDefaultSession", 00408 "You tried to create a default session with empty session id!" ); 00409 Q_ASSERT_X( !instances.hasLocalData(), "SessionPrivate::createDefaultSession", 00410 "You tried to create a default session twice!" ); 00411 00412 instances.setLocalData( new Session( sessionId ) ); 00413 } 00414 00415 Session* Session::defaultSession() 00416 { 00417 if ( !instances.hasLocalData() ) 00418 instances.setLocalData( new Session() ); 00419 return instances.localData(); 00420 } 00421 00422 void Session::clear() 00423 { 00424 foreach ( Job* job, d->queue ) 00425 job->kill( KJob::EmitResult ); 00426 d->queue.clear(); 00427 foreach ( Job* job, d->pipeline ) 00428 job->kill( KJob::EmitResult ); 00429 d->pipeline.clear(); 00430 if ( d->currentJob ) 00431 d->currentJob->kill( KJob::EmitResult ); 00432 d->jobRunning = false; 00433 d->connected = false; 00434 if ( d->socket ) 00435 d->socket->disconnect( this ); // prevent signal emitted from close() causing mayhem - we might be called from ~QThreadStorage! 00436 delete d->socket; 00437 d->socket = 0; 00438 QMetaObject::invokeMethod( this, "reconnect", Qt::QueuedConnection ); // avoids reconnecting in the dtor 00439 } 00440 00441 #include "session.moc"
This file is part of the KDE documentation.
Documentation copyright © 1996-2012 The KDE developers.
Generated on Mon Aug 27 2012 22:09:24 by doxygen 1.7.5 written by Dimitri van Heesch, © 1997-2006
Documentation copyright © 1996-2012 The KDE developers.
Generated on Mon Aug 27 2012 22:09:24 by doxygen 1.7.5 written by Dimitri van Heesch, © 1997-2006
KDE's Doxygen guidelines are available online.