21 #include "session_p.h"
23 #include "imapparser_p.h"
26 #include "servermanager.h"
27 #include "servermanager_p.h"
28 #include "xdgbasedirs_p.h"
31 #include <klocalizedstring.h>
33 #include <QCoreApplication>
34 #include <QtCore/QDir>
35 #include <QtCore/QQueue>
36 #include <QtCore/QThreadStorage>
37 #include <QtCore/QTimer>
38 #include <QtCore/QThread>
41 #include <QtNetwork/QLocalSocket>
42 #include <QtNetwork/QTcpSocket>
43 #include <QtNetwork/QHostAddress>
48 #define PIPELINE_LENGTH 0
51 using namespace Akonadi;
56 void SessionPrivate::startNext()
58 QTimer::singleShot( 0, mParent, SLOT(doStartNext()) );
63 QLocalSocket *localSocket = qobject_cast<QLocalSocket*>( socket );
64 if ( localSocket && (localSocket->state() == QLocalSocket::ConnectedState
65 || localSocket->state() == QLocalSocket::ConnectingState ) ) {
70 QTcpSocket *tcpSocket = qobject_cast<QTcpSocket*>( socket );
71 if ( tcpSocket && (tcpSocket->state() == QTcpSocket::ConnectedState
72 || tcpSocket->state() == QTcpSocket::ConnectingState ) ) {
78 QString serverAddress;
83 const QByteArray serverAddressEnvVar = qgetenv(
"AKONADI_SERVER_ADDRESS" );
84 if ( !serverAddressEnvVar.isEmpty() ) {
85 const int pos = serverAddressEnvVar.indexOf(
':' );
86 const QByteArray protocol = serverAddressEnvVar.left( pos );
87 QMap<QString, QString> options;
88 foreach (
const QString &entry, QString::fromLatin1( serverAddressEnvVar.mid( pos + 1 ) ).split( QLatin1Char(
',') ) ) {
89 const QStringList pair = entry.split( QLatin1Char(
'=') );
90 if ( pair.size() != 2 )
92 options.insert( pair.first(), pair.last() );
94 kDebug() << protocol << options;
96 if ( protocol ==
"tcp" ) {
97 serverAddress = options.value( QLatin1String(
"host" ) );
98 port = options.value( QLatin1String(
"port" ) ).toUInt();
100 }
else if ( protocol ==
"unix" ) {
101 serverAddress = options.value( QLatin1String(
"path" ) );
102 }
else if ( protocol ==
"pipe" ) {
103 serverAddress = options.value( QLatin1String(
"name" ) );
108 if ( serverAddress.isEmpty() ) {
110 const QFileInfo fileInfo( connectionConfigFile );
111 if ( !fileInfo.exists() ) {
112 kDebug() <<
"Akonadi Client Session: connection config file '"
113 "akonadi/akonadiconnectionrc' can not be found in"
114 << XdgBaseDirs::homePath(
"config" ) <<
"nor in any of"
115 << XdgBaseDirs::systemPathList(
"config" );
117 const QSettings connectionSettings( connectionConfigFile, QSettings::IniFormat );
119 #ifdef Q_OS_WIN //krazy:exclude=cpp
120 serverAddress = connectionSettings.value( QLatin1String(
"Data/NamedPipe" ), QLatin1String(
"Akonadi" ) ).toString();
122 const QString defaultSocketDir = Internal::xdgSaveDir(
"data" );
123 serverAddress = connectionSettings.value( QLatin1String(
"Data/UnixPath" ), QString(defaultSocketDir + QLatin1String(
"/akonadiserver.socket" )) ).toString();
134 socket = localSocket =
new QLocalSocket( mParent );
135 mParent->connect( localSocket, SIGNAL(error(QLocalSocket::LocalSocketError)), SLOT(socketError(QLocalSocket::LocalSocketError)) );
137 socket = tcpSocket =
new QTcpSocket( mParent );
138 mParent->connect( tcpSocket, SIGNAL(error(QAbstractSocket::SocketError)), SLOT(socketError(QAbstractSocket::SocketError)) );
140 mParent->connect( socket, SIGNAL(disconnected()), SLOT(socketDisconnected()) );
141 mParent->connect( socket, SIGNAL(readyRead()), SLOT(dataReceived()) );
145 kDebug() <<
"connectToServer" << serverAddress;
147 tcpSocket->connectToHost( QHostAddress::LocalHost, 31414 );
150 localSocket->connectToServer( serverAddress );
152 tcpSocket->connectToHost( serverAddress, port );
161 return Internal::xdgSaveDir(
"config" ) + QLatin1String(
"/akonadiconnectionrc");
164 void SessionPrivate::socketError( QLocalSocket::LocalSocketError )
166 Q_ASSERT( mParent->sender() == socket );
167 kWarning() <<
"Socket error occurred:" << qobject_cast<QLocalSocket*>( socket )->errorString();
168 socketDisconnected();
171 void SessionPrivate::socketError( QAbstractSocket::SocketError )
173 Q_ASSERT( mParent->sender() == socket );
174 kWarning() <<
"Socket error occurred:" << qobject_cast<QTcpSocket*>( socket )->errorString();
175 socketDisconnected();
178 void SessionPrivate::socketDisconnected()
181 currentJob->d_ptr->lostConnection();
185 void SessionPrivate::dataReceived()
187 while ( socket->bytesAvailable() > 0 ) {
188 if ( parser->continuationSize() > 1 ) {
189 const QByteArray data = socket->read( qMin( socket->bytesAvailable(), parser->continuationSize() - 1 ) );
190 parser->parseBlock( data );
191 }
else if ( socket->canReadLine() ) {
192 if ( !parser->parseNextLine( socket->readLine() ) )
196 if ( parser->tag() == QByteArray(
"0" ) ) {
197 if ( parser->data().startsWith(
"OK" ) ) {
200 kWarning() <<
"Unable to login to Akonadi server:" << parser->data();
202 QTimer::singleShot( 1000, mParent, SLOT(
reconnect()) );
207 if ( parser->tag() == QByteArray(
"1") ) {
208 if ( parser->data().startsWith(
"OK") ) {
212 kDebug() <<
"Unhandled server capability response:" << parser->data();
217 if ( parser->tag() ==
"*" && parser->data().startsWith(
"OK Akonadi" ) ) {
218 const int pos = parser->data().indexOf(
"[PROTOCOL" );
221 ImapParser::parseNumber( parser->data(), tmp, 0, pos + 9 );
222 protocolVersion = tmp;
223 Internal::setServerProtocolVersion( tmp );
225 kDebug() <<
"Server protocol version is:" << protocolVersion;
227 writeData(
"0 LOGIN " + ImapParser::quote( sessionId ) +
'\n' );
232 currentJob->d_ptr->handleResponse( parser->tag(), parser->data() );
243 bool SessionPrivate::canPipelineNext()
245 if ( queue.isEmpty() || pipeline.count() >= PIPELINE_LENGTH )
247 if ( pipeline.isEmpty() && currentJob )
248 return currentJob->d_ptr->mWriteFinished;
249 if ( !pipeline.isEmpty() )
250 return pipeline.last()->d_ptr->mWriteFinished;
254 void SessionPrivate::doStartNext()
256 if ( !connected || (queue.isEmpty() && pipeline.isEmpty()) )
258 if ( canPipelineNext() ) {
260 pipeline.enqueue( nextJob );
266 if ( !pipeline.isEmpty() ) {
267 currentJob = pipeline.dequeue();
269 currentJob = queue.dequeue();
270 startJob( currentJob );
274 void SessionPrivate::startJob(
Job *job )
276 if ( protocolVersion < minimumProtocolVersion() ) {
278 job->setErrorText( i18n(
"Protocol version %1 found, expected at least %2", protocolVersion, minimumProtocolVersion() ) );
281 job->d_ptr->startQueued();
290 void SessionPrivate::jobDone(KJob * job)
294 if ( job == currentJob ) {
295 if ( pipeline.isEmpty() ) {
299 currentJob = pipeline.dequeue();
304 queue.removeAll( static_cast<Akonadi::Job*>( job ) );
306 pipeline.removeAll( static_cast<Akonadi::Job*>( job ) );
310 void SessionPrivate::jobWriteFinished(
Akonadi::Job* job )
312 Q_ASSERT( (job == currentJob && pipeline.isEmpty()) || (job = pipeline.last()) );
318 void SessionPrivate::jobDestroyed(QObject * job)
321 jobDone( static_cast<KJob*>( job ) );
327 QObject::connect( job, SIGNAL(result(KJob*)), mParent, SLOT(jobDone(KJob*)) );
329 QObject::connect( job, SIGNAL(destroyed(QObject*)), mParent, SLOT(jobDestroyed(QObject*)) );
341 socket->write( data );
343 kWarning() <<
"Trying to write while session is disconnected!" << kBacktrace();
356 foreach (
Job *job, queue )
357 job->d_ptr->updateItemRevision( itemId, oldRevision, newRevision );
363 SessionPrivate::SessionPrivate(
Session *parent )
364 : mParent( parent ), socket( 0 ), protocolVersion( 0 ), currentJob( 0 ), parser( 0 )
368 void SessionPrivate::init(
const QByteArray &
id )
371 parser =
new ImapParser();
373 if ( !
id.isEmpty() ) {
376 sessionId = QCoreApplication::instance()->applicationName().toUtf8()
377 +
'-' + QByteArray::number( qrand() );
397 socket->disconnect( mParent );
401 QMetaObject::invokeMethod( mParent,
"reconnect", Qt::QueuedConnection );
409 d->init( sessionId );
416 d->init( sessionId );
430 static QThreadStorage<Session*> instances;
434 Q_ASSERT_X( !sessionId.isEmpty(),
"SessionPrivate::createDefaultSession",
435 "You tried to create a default session with empty session id!" );
436 Q_ASSERT_X( !instances.hasLocalData(),
"SessionPrivate::createDefaultSession",
437 "You tried to create a default session twice!" );
439 instances.setLocalData(
new Session( sessionId ) );
444 instances.setLocalData( session );
449 if ( !instances.hasLocalData() )
450 instances.setLocalData(
new Session() );
451 return instances.localData();
456 foreach (
Job* job, d->queue )
457 job->kill( KJob::EmitResult );
459 foreach (
Job* job, d->pipeline ) {
460 job->d_ptr->mStarted =
false;
461 job->kill( KJob::EmitResult );
464 if ( d->currentJob ) {
465 d->currentJob->d_ptr->mStarted =
false;
466 d->currentJob->kill( KJob::EmitResult );
471 #include "moc_session.cpp"
The server protocol version is too old or too new.
void forceReconnect()
Disconnects a previously existing connection and tries to reconnect.
int nextTag()
Returns the next IMAP tag.
static ServerManager * self()
Returns the singleton instance of this class, for connecting to its signals.
static void createDefaultSession(const QByteArray &sessionId)
Creates a new default session for this thread with the given sessionId.
Base class for all actions in the Akonadi storage.
static void setDefaultSession(Session *session)
Sets the default session.
static Session * defaultSession()
Returns the default session for this thread.
~Session()
Destroys the session.
QByteArray sessionId() const
Returns the session identifier.
void clear()
Stops all jobs queued for execution.
static State state()
Returns the state of the server.
A communication session with the Akonadi storage.
Server is not running, could be no one started it yet or it failed to start.
static bool start()
Starts the server.
virtual void addJob(Job *job)
Associates the given Job object with this session.
void reconnected()
This signal is emitted whenever the session has been reconnected to the server (e.g.
virtual void reconnect()
Attemps to establish a connections to the Akonadi server.
void itemRevisionChanged(Akonadi::Item::Id itemId, int oldRevision, int newRevision)
Propagate item revision changes to following jobs.
Server is running and operational.
void writeData(const QByteArray &data)
Sends the given raw data.
static QString connectionFile()
Default location for akonadiconnectionrc.
State
Enum for the various states the server can be in.
Session(const QByteArray &sessionId=QByteArray(), QObject *parent=0)
Creates a new session.