• Skip to content
  • Skip to link menu
  • KDE API Reference
  • kdepimlibs-4.9.5 API Reference
  • KDE Home
  • Contact Us
 

akonadi

  • akonadi
session.cpp
1 /*
2  Copyright (c) 2007 Volker Krause <vkrause@kde.org>
3 
4  This library is free software; you can redistribute it and/or modify it
5  under the terms of the GNU Library General Public License as published by
6  the Free Software Foundation; either version 2 of the License, or (at your
7  option) any later version.
8 
9  This library is distributed in the hope that it will be useful, but WITHOUT
10  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public
12  License for more details.
13 
14  You should have received a copy of the GNU Library General Public License
15  along with this library; see the file COPYING.LIB. If not, write to the
16  Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
17  02110-1301, USA.
18 */
19 
20 #include "session.h"
21 #include "session_p.h"
22 
23 #include "imapparser_p.h"
24 #include "job.h"
25 #include "job_p.h"
26 #include "servermanager.h"
27 #include "servermanager_p.h"
28 #include "xdgbasedirs_p.h"
29 
30 #include <kdebug.h>
31 #include <klocale.h>
32 
33 #include <QCoreApplication>
34 #include <QtCore/QDir>
35 #include <QtCore/QQueue>
36 #include <QtCore/QThreadStorage>
37 #include <QtCore/QTimer>
38 #include <QtCore/QThread>
39 #include <QSettings>
40 
41 #include <QtNetwork/QLocalSocket>
42 #include <QtNetwork/QTcpSocket>
43 #include <QtNetwork/QHostAddress>
44 
45 // ### FIXME pipelining got broken by switching result emission in JobPrivate::handleResponse to delayed emission
46 // in order to work around exec() deadlocks. As a result of that Session knows to late about a finished job and still
47 // sends responses for the next one to the already finished one
48 #define PIPELINE_LENGTH 0
49 //#define PIPELINE_LENGTH 2
50 
51 using namespace Akonadi;
52 
53 
54 //@cond PRIVATE
55 
56 void SessionPrivate::startNext()
57 {
58  QTimer::singleShot( 0, mParent, SLOT(doStartNext()) );
59 }
60 
61 void SessionPrivate::reconnect()
62 {
63  QLocalSocket *localSocket = qobject_cast<QLocalSocket*>( socket );
64  if ( localSocket && (localSocket->state() == QLocalSocket::ConnectedState
65  || localSocket->state() == QLocalSocket::ConnectingState ) ) {
66  // nothing to do, we are still/already connected
67  return;
68  }
69 
70  QTcpSocket *tcpSocket = qobject_cast<QTcpSocket*>( socket );
71  if ( tcpSocket && (tcpSocket->state() == QTcpSocket::ConnectedState
72  || tcpSocket->state() == QTcpSocket::ConnectingState ) ) {
73  // same here, but for TCP
74  return;
75  }
76 
77  // try to figure out where to connect to
78  QString serverAddress;
79  quint16 port = 0;
80  bool useTcp = false;
81 
82  // env var has precedence
83  const QByteArray serverAddressEnvVar = qgetenv( "AKONADI_SERVER_ADDRESS" );
84  if ( !serverAddressEnvVar.isEmpty() ) {
85  const int pos = serverAddressEnvVar.indexOf( ':' );
86  const QByteArray protocol = serverAddressEnvVar.left( pos );
87  QMap<QString, QString> options;
88  foreach ( const QString &entry, QString::fromLatin1( serverAddressEnvVar.mid( pos + 1 ) ).split( QLatin1Char(',') ) ) {
89  const QStringList pair = entry.split( QLatin1Char('=') );
90  if ( pair.size() != 2 )
91  continue;
92  options.insert( pair.first(), pair.last() );
93  }
94  kDebug() << protocol << options;
95 
96  if ( protocol == "tcp" ) {
97  serverAddress = options.value( QLatin1String( "host" ) );
98  port = options.value( QLatin1String( "port" ) ).toUInt();
99  useTcp = true;
100  } else if ( protocol == "unix" ) {
101  serverAddress = options.value( QLatin1String( "path" ) );
102  } else if ( protocol == "pipe" ) {
103  serverAddress = options.value( QLatin1String( "name" ) );
104  }
105  }
106 
107  // try config file next, fall back to defaults if that fails as well
108  if ( serverAddress.isEmpty() ) {
109  const QString connectionConfigFile = XdgBaseDirs::akonadiConnectionConfigFile();
110  const QFileInfo fileInfo( connectionConfigFile );
111  if ( !fileInfo.exists() ) {
112  kDebug() << "Akonadi Client Session: connection config file '"
113  "akonadi/akonadiconnectionrc' can not be found in"
114  << XdgBaseDirs::homePath( "config" ) << "nor in any of"
115  << XdgBaseDirs::systemPathList( "config" );
116  }
117  const QSettings connectionSettings( connectionConfigFile, QSettings::IniFormat );
118 
119 #ifdef Q_OS_WIN //krazy:exclude=cpp
120  serverAddress = connectionSettings.value( QLatin1String( "Data/NamedPipe" ), QLatin1String( "Akonadi" ) ).toString();
121 #else
122  const QString defaultSocketDir = XdgBaseDirs::saveDir( "data", QLatin1String( "akonadi" ) );
123  serverAddress = connectionSettings.value( QLatin1String( "Data/UnixPath" ), QString(defaultSocketDir + QLatin1String( "/akonadiserver.socket" )) ).toString();
124 #endif
125  }
126 #ifdef Q_OS_WINCE
127  useTcp = true;
128 #endif
129 
130  // create sockets if not yet done, note that this does not yet allow changing socket types on the fly
131  // but that's probably not something we need to support anyway
132  if ( !socket ) {
133  if ( !useTcp ) {
134  socket = localSocket = new QLocalSocket( mParent );
135  mParent->connect( localSocket, SIGNAL(error(QLocalSocket::LocalSocketError)), SLOT(socketError(QLocalSocket::LocalSocketError)) );
136  } else {
137  socket = tcpSocket = new QTcpSocket( mParent );
138  mParent->connect( tcpSocket, SIGNAL(error(QAbstractSocket::SocketError)), SLOT(socketError(QAbstractSocket::SocketError)) );
139  }
140  mParent->connect( socket, SIGNAL(disconnected()), SLOT(socketDisconnected()) );
141  mParent->connect( socket, SIGNAL(readyRead()), SLOT(dataReceived()) );
142  }
143 
144  // actually do connect
145  kDebug() << "connectToServer" << serverAddress;
146 #ifdef Q_OS_WINCE
147  tcpSocket->connectToHost( QHostAddress::LocalHost, 31414 );
148 #else
149  if ( !useTcp ) {
150  localSocket->connectToServer( serverAddress );
151  } else {
152  tcpSocket->connectToHost( serverAddress, port );
153  }
154 #endif
155 
156  emit mParent->reconnected();
157 }
158 
159 void SessionPrivate::socketError( QLocalSocket::LocalSocketError )
160 {
161  Q_ASSERT( mParent->sender() == socket );
162  kWarning() << "Socket error occurred:" << qobject_cast<QLocalSocket*>( socket )->errorString();
163  socketDisconnected();
164 }
165 
166 void SessionPrivate::socketError( QAbstractSocket::SocketError )
167 {
168  Q_ASSERT( mParent->sender() == socket );
169  kWarning() << "Socket error occurred:" << qobject_cast<QTcpSocket*>( socket )->errorString();
170  socketDisconnected();
171 }
172 
173 void SessionPrivate::socketDisconnected()
174 {
175  if ( currentJob )
176  currentJob->d_ptr->lostConnection();
177  connected = false;
178 }
179 
180 void SessionPrivate::dataReceived()
181 {
182  while ( socket->bytesAvailable() > 0 ) {
183  if ( parser->continuationSize() > 1 ) {
184  const QByteArray data = socket->read( qMin( socket->bytesAvailable(), parser->continuationSize() - 1 ) );
185  parser->parseBlock( data );
186  } else if ( socket->canReadLine() ) {
187  if ( !parser->parseNextLine( socket->readLine() ) )
188  continue; // response not yet completed
189 
190  // handle login response
191  if ( parser->tag() == QByteArray( "0" ) ) {
192  if ( parser->data().startsWith( "OK" ) ) { //krazy:exclude=strings
193  connected = true;
194  startNext();
195  } else {
196  kWarning() << "Unable to login to Akonadi server:" << parser->data();
197  socket->close();
198  QTimer::singleShot( 1000, mParent, SLOT(reconnect()) );
199  }
200  }
201 
202  // send login command
203  if ( parser->tag() == "*" && parser->data().startsWith( "OK Akonadi" ) ) {
204  const int pos = parser->data().indexOf( "[PROTOCOL" );
205  if ( pos > 0 ) {
206  qint64 tmp = 0;
207  ImapParser::parseNumber( parser->data(), tmp, 0, pos + 9 );
208  protocolVersion = tmp;
209  Internal::setServerProtocolVersion( tmp );
210  }
211  kDebug() << "Server protocol version is:" << protocolVersion;
212 
213  writeData( "0 LOGIN " + ImapParser::quote( sessionId ) + '\n' );
214 
215  // work for the current job
216  } else {
217  if ( currentJob )
218  currentJob->d_ptr->handleResponse( parser->tag(), parser->data() );
219  }
220 
221  // reset parser stuff
222  parser->reset();
223  } else {
224  break; // nothing we can do for now
225  }
226  }
227 }
228 
229 bool SessionPrivate::canPipelineNext()
230 {
231  if ( queue.isEmpty() || pipeline.count() >= PIPELINE_LENGTH )
232  return false;
233  if ( pipeline.isEmpty() && currentJob )
234  return currentJob->d_ptr->mWriteFinished;
235  if ( !pipeline.isEmpty() )
236  return pipeline.last()->d_ptr->mWriteFinished;
237  return false;
238 }
239 
240 void SessionPrivate::doStartNext()
241 {
242  if ( !connected || (queue.isEmpty() && pipeline.isEmpty()) )
243  return;
244  if ( canPipelineNext() ) {
245  Akonadi::Job *nextJob = queue.dequeue();
246  pipeline.enqueue( nextJob );
247  startJob( nextJob );
248  }
249  if ( jobRunning )
250  return;
251  jobRunning = true;
252  if ( !pipeline.isEmpty() ) {
253  currentJob = pipeline.dequeue();
254  } else {
255  currentJob = queue.dequeue();
256  startJob( currentJob );
257  }
258 }
259 
260 void SessionPrivate::startJob( Job *job )
261 {
262  if ( protocolVersion < minimumProtocolVersion() ) {
263  job->setError( Job::ProtocolVersionMismatch );
264  job->setErrorText( i18n( "Protocol version %1 found, expected at least %2", protocolVersion, minimumProtocolVersion() ) );
265  job->emitResult();
266  } else {
267  job->d_ptr->startQueued();
268  }
269 }
270 
271 void SessionPrivate::endJob( Job *job )
272 {
273  job->emitResult();
274 }
275 
276 void SessionPrivate::jobDone(KJob * job)
277 {
278  // ### careful, this method can be called from the QObject dtor of job (see jobDestroyed() below)
279  // so don't call any methods on job itself
280  if ( job == currentJob ) {
281  if ( pipeline.isEmpty() ) {
282  jobRunning = false;
283  currentJob = 0;
284  } else {
285  currentJob = pipeline.dequeue();
286  }
287  startNext();
288  } else {
289  // non-current job finished, likely canceled while still in the queue
290  queue.removeAll( static_cast<Akonadi::Job*>( job ) );
291  // ### likely not enough to really cancel already running jobs
292  pipeline.removeAll( static_cast<Akonadi::Job*>( job ) );
293  }
294 }
295 
296 void SessionPrivate::jobWriteFinished( Akonadi::Job* job )
297 {
298  Q_ASSERT( (job == currentJob && pipeline.isEmpty()) || (job = pipeline.last()) );
299  Q_UNUSED( job );
300 
301  startNext();
302 }
303 
304 void SessionPrivate::jobDestroyed(QObject * job)
305 {
306  // careful, accessing non-QObject methods of job will fail here already
307  jobDone( static_cast<KJob*>( job ) );
308 }
309 
310 void SessionPrivate::addJob(Job * job)
311 {
312  queue.append( job );
313  QObject::connect( job, SIGNAL(result(KJob*)), mParent, SLOT(jobDone(KJob*)) );
314  QObject::connect( job, SIGNAL(writeFinished(Akonadi::Job*)), mParent, SLOT(jobWriteFinished(Akonadi::Job*)) );
315  QObject::connect( job, SIGNAL(destroyed(QObject*)), mParent, SLOT(jobDestroyed(QObject*)) );
316  startNext();
317 }
318 
319 int SessionPrivate::nextTag()
320 {
321  return theNextTag++;
322 }
323 
324 void SessionPrivate::writeData(const QByteArray & data)
325 {
326  if ( socket )
327  socket->write( data );
328  else
329  kWarning() << "Trying to write while session is disconnected!" << kBacktrace();
330 }
331 
332 void SessionPrivate::serverStateChanged( ServerManager::State state )
333 {
334  if ( state == ServerManager::Running && !connected )
335  reconnect();
336 }
337 
338 void SessionPrivate::itemRevisionChanged( Akonadi::Item::Id itemId, int oldRevision, int newRevision )
339 {
340  // only deal with the queue, for the guys in the pipeline it's too late already anyway
341  // and they shouldn't have gotten there if they depend on a preceding job anyway.
342  foreach ( Job *job, queue )
343  job->d_ptr->updateItemRevision( itemId, oldRevision, newRevision );
344 }
345 
346 //@endcond
347 
348 
349 SessionPrivate::SessionPrivate( Session *parent )
350  : mParent( parent ), socket( 0 ), protocolVersion( 0 ), currentJob( 0 ), parser( 0 )
351 {
352 }
353 
354 void SessionPrivate::init( const QByteArray &id )
355 {
356  kDebug() << id;
357  parser = new ImapParser();
358 
359  if ( !id.isEmpty() ) {
360  sessionId = id;
361  } else {
362  sessionId = QCoreApplication::instance()->applicationName().toUtf8()
363  + '-' + QByteArray::number( qrand() );
364  }
365 
366  connected = false;
367  theNextTag = 1;
368  jobRunning = false;
369 
370  if ( ServerManager::state() == ServerManager::NotRunning )
371  ServerManager::start();
372  mParent->connect( ServerManager::self(), SIGNAL(stateChanged(Akonadi::ServerManager::State)),
373  SLOT(serverStateChanged(Akonadi::ServerManager::State)) );
374 
375  reconnect();
376 }
377 
378 Session::Session(const QByteArray & sessionId, QObject * parent) :
379  QObject( parent ),
380  d( new SessionPrivate( this ) )
381 {
382  d->init( sessionId );
383 }
384 
385 Session::Session( SessionPrivate *dd, const QByteArray & sessionId, QObject * parent)
386  : QObject( parent ),
387  d( dd )
388 {
389  d->init( sessionId );
390 }
391 
392 Session::~Session()
393 {
394  clear();
395  delete d;
396 }
397 
398 QByteArray Session::sessionId() const
399 {
400  return d->sessionId;
401 }
402 
403 static QThreadStorage<Session*> instances;
404 
405 void SessionPrivate::createDefaultSession( const QByteArray &sessionId )
406 {
407  Q_ASSERT_X( !sessionId.isEmpty(), "SessionPrivate::createDefaultSession",
408  "You tried to create a default session with empty session id!" );
409  Q_ASSERT_X( !instances.hasLocalData(), "SessionPrivate::createDefaultSession",
410  "You tried to create a default session twice!" );
411 
412  instances.setLocalData( new Session( sessionId ) );
413 }
414 
415 Session* Session::defaultSession()
416 {
417  if ( !instances.hasLocalData() )
418  instances.setLocalData( new Session() );
419  return instances.localData();
420 }
421 
422 void Session::clear()
423 {
424  foreach ( Job* job, d->queue )
425  job->kill( KJob::EmitResult );
426  d->queue.clear();
427  foreach ( Job* job, d->pipeline )
428  job->kill( KJob::EmitResult );
429  d->pipeline.clear();
430  if ( d->currentJob )
431  d->currentJob->kill( KJob::EmitResult );
432  d->jobRunning = false;
433  d->connected = false;
434  if ( d->socket )
435  d->socket->disconnect( this ); // prevent signal emitted from close() causing mayhem - we might be called from ~QThreadStorage!
436  delete d->socket;
437  d->socket = 0;
438  QMetaObject::invokeMethod( this, "reconnect", Qt::QueuedConnection ); // avoids reconnecting in the dtor
439 }
440 
441 #include "session.moc"
This file is part of the KDE documentation.
Documentation copyright © 1996-2013 The KDE developers.
Generated on Sat Jan 5 2013 19:46:08 by doxygen 1.8.1.2 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.

akonadi

Skip menu "akonadi"
  • Main Page
  • Namespace List
  • Namespace Members
  • Alphabetical List
  • Class List
  • Class Hierarchy
  • Class Members
  • File List
  • Modules
  • Related Pages

kdepimlibs-4.9.5 API Reference

Skip menu "kdepimlibs-4.9.5 API Reference"
  • akonadi
  •   contact
  •   kmime
  • kabc
  • kalarmcal
  • kblog
  • kcal
  • kcalcore
  • kcalutils
  • kholidays
  • kimap
  • kioslave
  •   imap4
  •   mbox
  •   nntp
  • kldap
  • kmbox
  • kmime
  • kontactinterface
  • kpimidentities
  • kpimtextedit
  •   richtextbuilders
  • kpimutils
  • kresources
  • ktnef
  • kxmlrpcclient
  • mailtransport
  • microblog
  • qgpgme
  • syndication
  •   atom
  •   rdf
  •   rss2
Report problems with this website to our bug tracking system.
Contact the specific authors with questions and comments about the page contents.

KDE® and the K Desktop Environment® logo are registered trademarks of KDE e.V. | Legal