• Skip to content
  • Skip to link menu
  • KDE API Reference
  • kdepimlibs-4.11.3 API Reference
  • KDE Home
  • Contact Us
 

akonadi

  • akonadi
session.cpp
1 /*
2  Copyright (c) 2007 Volker Krause <vkrause@kde.org>
3 
4  This library is free software; you can redistribute it and/or modify it
5  under the terms of the GNU Library General Public License as published by
6  the Free Software Foundation; either version 2 of the License, or (at your
7  option) any later version.
8 
9  This library is distributed in the hope that it will be useful, but WITHOUT
10  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public
12  License for more details.
13 
14  You should have received a copy of the GNU Library General Public License
15  along with this library; see the file COPYING.LIB. If not, write to the
16  Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
17  02110-1301, USA.
18 */
19 
20 #include "session.h"
21 #include "session_p.h"
22 
23 #include "imapparser_p.h"
24 #include "job.h"
25 #include "job_p.h"
26 #include "servermanager.h"
27 #include "servermanager_p.h"
28 #include "xdgbasedirs_p.h"
29 
30 #include <kdebug.h>
31 #include <klocalizedstring.h>
32 
33 #include <QCoreApplication>
34 #include <QtCore/QDir>
35 #include <QtCore/QQueue>
36 #include <QtCore/QThreadStorage>
37 #include <QtCore/QTimer>
38 #include <QtCore/QThread>
39 #include <QSettings>
40 
41 #include <QtNetwork/QLocalSocket>
42 #include <QtNetwork/QTcpSocket>
43 #include <QtNetwork/QHostAddress>
44 
45 // ### FIXME pipelining got broken by switching result emission in JobPrivate::handleResponse to delayed emission
46 // in order to work around exec() deadlocks. As a result of that Session knows to late about a finished job and still
47 // sends responses for the next one to the already finished one
48 #define PIPELINE_LENGTH 0
49 //#define PIPELINE_LENGTH 2
50 
51 using namespace Akonadi;
52 
53 
54 //@cond PRIVATE
55 
56 void SessionPrivate::startNext()
57 {
58  QTimer::singleShot( 0, mParent, SLOT(doStartNext()) );
59 }
60 
61 void SessionPrivate::reconnect()
62 {
63  QLocalSocket *localSocket = qobject_cast<QLocalSocket*>( socket );
64  if ( localSocket && (localSocket->state() == QLocalSocket::ConnectedState
65  || localSocket->state() == QLocalSocket::ConnectingState ) ) {
66  // nothing to do, we are still/already connected
67  return;
68  }
69 
70  QTcpSocket *tcpSocket = qobject_cast<QTcpSocket*>( socket );
71  if ( tcpSocket && (tcpSocket->state() == QTcpSocket::ConnectedState
72  || tcpSocket->state() == QTcpSocket::ConnectingState ) ) {
73  // same here, but for TCP
74  return;
75  }
76 
77  // try to figure out where to connect to
78  QString serverAddress;
79  quint16 port = 0;
80  bool useTcp = false;
81 
82  // env var has precedence
83  const QByteArray serverAddressEnvVar = qgetenv( "AKONADI_SERVER_ADDRESS" );
84  if ( !serverAddressEnvVar.isEmpty() ) {
85  const int pos = serverAddressEnvVar.indexOf( ':' );
86  const QByteArray protocol = serverAddressEnvVar.left( pos );
87  QMap<QString, QString> options;
88  foreach ( const QString &entry, QString::fromLatin1( serverAddressEnvVar.mid( pos + 1 ) ).split( QLatin1Char(',') ) ) {
89  const QStringList pair = entry.split( QLatin1Char('=') );
90  if ( pair.size() != 2 )
91  continue;
92  options.insert( pair.first(), pair.last() );
93  }
94  kDebug() << protocol << options;
95 
96  if ( protocol == "tcp" ) {
97  serverAddress = options.value( QLatin1String( "host" ) );
98  port = options.value( QLatin1String( "port" ) ).toUInt();
99  useTcp = true;
100  } else if ( protocol == "unix" ) {
101  serverAddress = options.value( QLatin1String( "path" ) );
102  } else if ( protocol == "pipe" ) {
103  serverAddress = options.value( QLatin1String( "name" ) );
104  }
105  }
106 
107  // try config file next, fall back to defaults if that fails as well
108  if ( serverAddress.isEmpty() ) {
109  const QString connectionConfigFile = connectionFile();
110  const QFileInfo fileInfo( connectionConfigFile );
111  if ( !fileInfo.exists() ) {
112  kDebug() << "Akonadi Client Session: connection config file '"
113  "akonadi/akonadiconnectionrc' can not be found in"
114  << XdgBaseDirs::homePath( "config" ) << "nor in any of"
115  << XdgBaseDirs::systemPathList( "config" );
116  }
117  const QSettings connectionSettings( connectionConfigFile, QSettings::IniFormat );
118 
119 #ifdef Q_OS_WIN //krazy:exclude=cpp
120  serverAddress = connectionSettings.value( QLatin1String( "Data/NamedPipe" ), QLatin1String( "Akonadi" ) ).toString();
121 #else
122  const QString defaultSocketDir = Internal::xdgSaveDir( "data" );
123  serverAddress = connectionSettings.value( QLatin1String( "Data/UnixPath" ), QString(defaultSocketDir + QLatin1String( "/akonadiserver.socket" )) ).toString();
124 #endif
125  }
126 #ifdef Q_OS_WINCE
127  useTcp = true;
128 #endif
129 
130  // create sockets if not yet done, note that this does not yet allow changing socket types on the fly
131  // but that's probably not something we need to support anyway
132  if ( !socket ) {
133  if ( !useTcp ) {
134  socket = localSocket = new QLocalSocket( mParent );
135  mParent->connect( localSocket, SIGNAL(error(QLocalSocket::LocalSocketError)), SLOT(socketError(QLocalSocket::LocalSocketError)) );
136  } else {
137  socket = tcpSocket = new QTcpSocket( mParent );
138  mParent->connect( tcpSocket, SIGNAL(error(QAbstractSocket::SocketError)), SLOT(socketError(QAbstractSocket::SocketError)) );
139  }
140  mParent->connect( socket, SIGNAL(disconnected()), SLOT(socketDisconnected()) );
141  mParent->connect( socket, SIGNAL(readyRead()), SLOT(dataReceived()) );
142  }
143 
144  // actually do connect
145  kDebug() << "connectToServer" << serverAddress;
146 #ifdef Q_OS_WINCE
147  tcpSocket->connectToHost( QHostAddress::LocalHost, 31414 );
148 #else
149  if ( !useTcp ) {
150  localSocket->connectToServer( serverAddress );
151  } else {
152  tcpSocket->connectToHost( serverAddress, port );
153  }
154 #endif
155 
156  emit mParent->reconnected();
157 }
158 
159 QString SessionPrivate::connectionFile()
160 {
161  return Internal::xdgSaveDir( "config" ) + QLatin1String("/akonadiconnectionrc");
162 }
163 
164 void SessionPrivate::socketError( QLocalSocket::LocalSocketError )
165 {
166  Q_ASSERT( mParent->sender() == socket );
167  kWarning() << "Socket error occurred:" << qobject_cast<QLocalSocket*>( socket )->errorString();
168  socketDisconnected();
169 }
170 
171 void SessionPrivate::socketError( QAbstractSocket::SocketError )
172 {
173  Q_ASSERT( mParent->sender() == socket );
174  kWarning() << "Socket error occurred:" << qobject_cast<QTcpSocket*>( socket )->errorString();
175  socketDisconnected();
176 }
177 
178 void SessionPrivate::socketDisconnected()
179 {
180  if ( currentJob )
181  currentJob->d_ptr->lostConnection();
182  connected = false;
183 }
184 
185 void SessionPrivate::dataReceived()
186 {
187  while ( socket->bytesAvailable() > 0 ) {
188  if ( parser->continuationSize() > 1 ) {
189  const QByteArray data = socket->read( qMin( socket->bytesAvailable(), parser->continuationSize() - 1 ) );
190  parser->parseBlock( data );
191  } else if ( socket->canReadLine() ) {
192  if ( !parser->parseNextLine( socket->readLine() ) )
193  continue; // response not yet completed
194 
195  // handle login response
196  if ( parser->tag() == QByteArray( "0" ) ) {
197  if ( parser->data().startsWith( "OK" ) ) { //krazy:exclude=strings
198  writeData("1 CAPABILITY (NOTIFY 2)");
199  } else {
200  kWarning() << "Unable to login to Akonadi server:" << parser->data();
201  socket->close();
202  QTimer::singleShot( 1000, mParent, SLOT(reconnect()) );
203  }
204  }
205 
206  // handle capability response
207  if ( parser->tag() == QByteArray("1") ) {
208  if ( parser->data().startsWith("OK") ) {
209  connected = true;
210  startNext();
211  } else {
212  kDebug() << "Unhandled server capability response:" << parser->data();
213  }
214  }
215 
216  // send login command
217  if ( parser->tag() == "*" && parser->data().startsWith( "OK Akonadi" ) ) {
218  const int pos = parser->data().indexOf( "[PROTOCOL" );
219  if ( pos > 0 ) {
220  qint64 tmp = 0;
221  ImapParser::parseNumber( parser->data(), tmp, 0, pos + 9 );
222  protocolVersion = tmp;
223  Internal::setServerProtocolVersion( tmp );
224  }
225  kDebug() << "Server protocol version is:" << protocolVersion;
226 
227  writeData( "0 LOGIN " + ImapParser::quote( sessionId ) + '\n' );
228 
229  // work for the current job
230  } else {
231  if ( currentJob )
232  currentJob->d_ptr->handleResponse( parser->tag(), parser->data() );
233  }
234 
235  // reset parser stuff
236  parser->reset();
237  } else {
238  break; // nothing we can do for now
239  }
240  }
241 }
242 
243 bool SessionPrivate::canPipelineNext()
244 {
245  if ( queue.isEmpty() || pipeline.count() >= PIPELINE_LENGTH )
246  return false;
247  if ( pipeline.isEmpty() && currentJob )
248  return currentJob->d_ptr->mWriteFinished;
249  if ( !pipeline.isEmpty() )
250  return pipeline.last()->d_ptr->mWriteFinished;
251  return false;
252 }
253 
254 void SessionPrivate::doStartNext()
255 {
256  if ( !connected || (queue.isEmpty() && pipeline.isEmpty()) )
257  return;
258  if ( canPipelineNext() ) {
259  Akonadi::Job *nextJob = queue.dequeue();
260  pipeline.enqueue( nextJob );
261  startJob( nextJob );
262  }
263  if ( jobRunning )
264  return;
265  jobRunning = true;
266  if ( !pipeline.isEmpty() ) {
267  currentJob = pipeline.dequeue();
268  } else {
269  currentJob = queue.dequeue();
270  startJob( currentJob );
271  }
272 }
273 
274 void SessionPrivate::startJob( Job *job )
275 {
276  if ( protocolVersion < minimumProtocolVersion() ) {
277  job->setError( Job::ProtocolVersionMismatch );
278  job->setErrorText( i18n( "Protocol version %1 found, expected at least %2", protocolVersion, minimumProtocolVersion() ) );
279  job->emitResult();
280  } else {
281  job->d_ptr->startQueued();
282  }
283 }
284 
285 void SessionPrivate::endJob( Job *job )
286 {
287  job->emitResult();
288 }
289 
290 void SessionPrivate::jobDone(KJob * job)
291 {
292  // ### careful, this method can be called from the QObject dtor of job (see jobDestroyed() below)
293  // so don't call any methods on job itself
294  if ( job == currentJob ) {
295  if ( pipeline.isEmpty() ) {
296  jobRunning = false;
297  currentJob = 0;
298  } else {
299  currentJob = pipeline.dequeue();
300  }
301  startNext();
302  } else {
303  // non-current job finished, likely canceled while still in the queue
304  queue.removeAll( static_cast<Akonadi::Job*>( job ) );
305  // ### likely not enough to really cancel already running jobs
306  pipeline.removeAll( static_cast<Akonadi::Job*>( job ) );
307  }
308 }
309 
310 void SessionPrivate::jobWriteFinished( Akonadi::Job* job )
311 {
312  Q_ASSERT( (job == currentJob && pipeline.isEmpty()) || (job = pipeline.last()) );
313  Q_UNUSED( job );
314 
315  startNext();
316 }
317 
318 void SessionPrivate::jobDestroyed(QObject * job)
319 {
320  // careful, accessing non-QObject methods of job will fail here already
321  jobDone( static_cast<KJob*>( job ) );
322 }
323 
324 void SessionPrivate::addJob(Job * job)
325 {
326  queue.append( job );
327  QObject::connect( job, SIGNAL(result(KJob*)), mParent, SLOT(jobDone(KJob*)) );
328  QObject::connect( job, SIGNAL(writeFinished(Akonadi::Job*)), mParent, SLOT(jobWriteFinished(Akonadi::Job*)) );
329  QObject::connect( job, SIGNAL(destroyed(QObject*)), mParent, SLOT(jobDestroyed(QObject*)) );
330  startNext();
331 }
332 
333 int SessionPrivate::nextTag()
334 {
335  return theNextTag++;
336 }
337 
338 void SessionPrivate::writeData(const QByteArray & data)
339 {
340  if ( socket )
341  socket->write( data );
342  else
343  kWarning() << "Trying to write while session is disconnected!" << kBacktrace();
344 }
345 
346 void SessionPrivate::serverStateChanged( ServerManager::State state )
347 {
348  if ( state == ServerManager::Running && !connected )
349  reconnect();
350 }
351 
352 void SessionPrivate::itemRevisionChanged( Akonadi::Item::Id itemId, int oldRevision, int newRevision )
353 {
354  // only deal with the queue, for the guys in the pipeline it's too late already anyway
355  // and they shouldn't have gotten there if they depend on a preceding job anyway.
356  foreach ( Job *job, queue )
357  job->d_ptr->updateItemRevision( itemId, oldRevision, newRevision );
358 }
359 
360 //@endcond
361 
362 
363 SessionPrivate::SessionPrivate( Session *parent )
364  : mParent( parent ), socket( 0 ), protocolVersion( 0 ), currentJob( 0 ), parser( 0 )
365 {
366 }
367 
368 void SessionPrivate::init( const QByteArray &id )
369 {
370  kDebug() << id;
371  parser = new ImapParser();
372 
373  if ( !id.isEmpty() ) {
374  sessionId = id;
375  } else {
376  sessionId = QCoreApplication::instance()->applicationName().toUtf8()
377  + '-' + QByteArray::number( qrand() );
378  }
379 
380  connected = false;
381  theNextTag = 2;
382  jobRunning = false;
383 
384  if ( ServerManager::state() == ServerManager::NotRunning )
385  ServerManager::start();
386  mParent->connect( ServerManager::self(), SIGNAL(stateChanged(Akonadi::ServerManager::State)),
387  SLOT(serverStateChanged(Akonadi::ServerManager::State)) );
388 
389  reconnect();
390 }
391 
392 void SessionPrivate::forceReconnect()
393 {
394  jobRunning = false;
395  connected = false;
396  if ( socket ) {
397  socket->disconnect( mParent ); // prevent signal emitted from close() causing mayhem - we might be called from ~QThreadStorage!
398  delete socket;
399  }
400  socket = 0;
401  QMetaObject::invokeMethod( mParent, "reconnect", Qt::QueuedConnection ); // avoids reconnecting in the dtor
402 }
403 
404 
405 Session::Session(const QByteArray & sessionId, QObject * parent) :
406  QObject( parent ),
407  d( new SessionPrivate( this ) )
408 {
409  d->init( sessionId );
410 }
411 
412 Session::Session( SessionPrivate *dd, const QByteArray & sessionId, QObject * parent)
413  : QObject( parent ),
414  d( dd )
415 {
416  d->init( sessionId );
417 }
418 
419 Session::~Session()
420 {
421  clear();
422  delete d;
423 }
424 
425 QByteArray Session::sessionId() const
426 {
427  return d->sessionId;
428 }
429 
430 static QThreadStorage<Session*> instances;
431 
432 void SessionPrivate::createDefaultSession( const QByteArray &sessionId )
433 {
434  Q_ASSERT_X( !sessionId.isEmpty(), "SessionPrivate::createDefaultSession",
435  "You tried to create a default session with empty session id!" );
436  Q_ASSERT_X( !instances.hasLocalData(), "SessionPrivate::createDefaultSession",
437  "You tried to create a default session twice!" );
438 
439  instances.setLocalData( new Session( sessionId ) );
440 }
441 
442 void SessionPrivate::setDefaultSession( Session *session )
443 {
444  instances.setLocalData( session );
445 }
446 
447 Session* Session::defaultSession()
448 {
449  if ( !instances.hasLocalData() )
450  instances.setLocalData( new Session() );
451  return instances.localData();
452 }
453 
454 void Session::clear()
455 {
456  foreach ( Job* job, d->queue )
457  job->kill( KJob::EmitResult ); // safe, not started yet
458  d->queue.clear();
459  foreach ( Job* job, d->pipeline ) {
460  job->d_ptr->mStarted = false; // avoid killing/reconnect loops
461  job->kill( KJob::EmitResult );
462  }
463  d->pipeline.clear();
464  if ( d->currentJob ) {
465  d->currentJob->d_ptr->mStarted = false; // avoid killing/reconnect loops
466  d->currentJob->kill( KJob::EmitResult );
467  }
468  d->forceReconnect();
469 }
470 
471 #include "moc_session.cpp"
Akonadi::Job::ProtocolVersionMismatch
The server protocol version is too old or too new.
Definition: job.h:107
Akonadi::SessionPrivate::forceReconnect
void forceReconnect()
Disconnects a previously existing connection and tries to reconnect.
Definition: session.cpp:392
Akonadi::SessionPrivate::nextTag
int nextTag()
Returns the next IMAP tag.
Akonadi::ServerManager::self
static ServerManager * self()
Returns the singleton instance of this class, for connecting to its signals.
Definition: servermanager.cpp:152
Akonadi::SessionPrivate::createDefaultSession
static void createDefaultSession(const QByteArray &sessionId)
Creates a new default session for this thread with the given sessionId.
Definition: session.cpp:432
Akonadi::Job
Base class for all actions in the Akonadi storage.
Definition: job.h:86
Akonadi::SessionPrivate::setDefaultSession
static void setDefaultSession(Session *session)
Sets the default session.
Definition: session.cpp:442
Akonadi::Session::defaultSession
static Session * defaultSession()
Returns the default session for this thread.
Definition: session.cpp:447
Akonadi::Session::~Session
~Session()
Destroys the session.
Definition: session.cpp:419
Akonadi::Session::sessionId
QByteArray sessionId() const
Returns the session identifier.
Definition: session.cpp:425
Akonadi::Session::clear
void clear()
Stops all jobs queued for execution.
Definition: session.cpp:454
Akonadi::ServerManager::state
static State state()
Returns the state of the server.
Definition: servermanager.cpp:215
Akonadi::Session
A communication session with the Akonadi storage.
Definition: session.h:59
Akonadi::SessionPrivate
Definition: session_p.h:41
Akonadi::ServerManager::NotRunning
Server is not running, could be no one started it yet or it failed to start.
Definition: servermanager.h:51
Akonadi::ServerManager::start
static bool start()
Starts the server.
Definition: servermanager.cpp:157
Akonadi::SessionPrivate::endJob
void endJob(Job *job)
Akonadi::SessionPrivate::addJob
virtual void addJob(Job *job)
Associates the given Job object with this session.
Akonadi::Session::reconnected
void reconnected()
This signal is emitted whenever the session has been reconnected to the server (e.g.
Akonadi::SessionPrivate::reconnect
virtual void reconnect()
Attemps to establish a connections to the Akonadi server.
Akonadi::SessionPrivate::itemRevisionChanged
void itemRevisionChanged(Akonadi::Item::Id itemId, int oldRevision, int newRevision)
Propagate item revision changes to following jobs.
Akonadi::ServerManager::Running
Server is running and operational.
Definition: servermanager.h:53
Akonadi::SessionPrivate::writeData
void writeData(const QByteArray &data)
Sends the given raw data.
Akonadi::SessionPrivate::connectionFile
static QString connectionFile()
Default location for akonadiconnectionrc.
Akonadi::ServerManager::State
State
Enum for the various states the server can be in.
Definition: servermanager.h:50
Akonadi::Session::Session
Session(const QByteArray &sessionId=QByteArray(), QObject *parent=0)
Creates a new session.
Definition: session.cpp:405
This file is part of the KDE documentation.
Documentation copyright © 1996-2013 The KDE developers.
Generated on Tue Nov 26 2013 09:03:19 by doxygen 1.8.5 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.

akonadi

Skip menu "akonadi"
  • Main Page
  • Namespace List
  • Namespace Members
  • Alphabetical List
  • Class List
  • Class Hierarchy
  • Class Members
  • File List
  • Modules
  • Related Pages

kdepimlibs-4.11.3 API Reference

Skip menu "kdepimlibs-4.11.3 API Reference"
  • akonadi
  •   contact
  •   kmime
  •   socialutils
  • kabc
  • kalarmcal
  • kblog
  • kcal
  • kcalcore
  • kcalutils
  • kholidays
  • kimap
  • kioslave
  •   imap4
  •   mbox
  •   nntp
  • kldap
  • kmbox
  • kmime
  • kontactinterface
  • kpimidentities
  • kpimtextedit
  • kpimutils
  • kresources
  • ktnef
  • kxmlrpcclient
  • mailtransport
  • microblog
  • qgpgme
  • syndication
  •   atom
  •   rdf
  •   rss2
Report problems with this website to our bug tracking system.
Contact the specific authors with questions and comments about the page contents.

KDE® and the K Desktop Environment® logo are registered trademarks of KDE e.V. | Legal