akonadi
resourcebase.cpp
00001 /* 00002 Copyright (c) 2006 Till Adam <adam@kde.org> 00003 Copyright (c) 2007 Volker Krause <vkrause@kde.org> 00004 00005 This library is free software; you can redistribute it and/or modify it 00006 under the terms of the GNU Library General Public License as published by 00007 the Free Software Foundation; either version 2 of the License, or (at your 00008 option) any later version. 00009 00010 This library is distributed in the hope that it will be useful, but WITHOUT 00011 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 00012 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public 00013 License for more details. 00014 00015 You should have received a copy of the GNU Library General Public License 00016 along with this library; see the file COPYING.LIB. If not, write to the 00017 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 00018 02110-1301, USA. 00019 */ 00020 00021 #include "resourcebase.h" 00022 #include "agentbase_p.h" 00023 00024 #include "resourceadaptor.h" 00025 #include "collectiondeletejob.h" 00026 #include "collectionsync_p.h" 00027 #include "dbusconnectionpool.h" 00028 #include "itemsync.h" 00029 #include "kdepimlibs-version.h" 00030 #include "resourcescheduler_p.h" 00031 #include "tracerinterface.h" 00032 #include "xdgbasedirs_p.h" 00033 00034 #include "changerecorder.h" 00035 #include "collectionfetchjob.h" 00036 #include "collectionfetchscope.h" 00037 #include "collectionmodifyjob.h" 00038 #include "itemfetchjob.h" 00039 #include "itemfetchscope.h" 00040 #include "itemmodifyjob.h" 00041 #include "itemmodifyjob_p.h" 00042 #include "session.h" 00043 #include "resourceselectjob_p.h" 00044 #include "monitor_p.h" 00045 #include "servermanager_p.h" 00046 00047 #include <kaboutdata.h> 00048 #include <kcmdlineargs.h> 00049 #include <kdebug.h> 00050 #include <klocale.h> 00051 00052 #include <QtCore/QDebug> 00053 #include <QtCore/QDir> 00054 #include <QtCore/QHash> 00055 #include <QtCore/QSettings> 00056 #include <QtCore/QTimer> 00057 #include <QtGui/QApplication> 00058 #include <QtDBus/QtDBus> 00059 00060 using namespace Akonadi; 00061 00062 class Akonadi::ResourceBasePrivate : public AgentBasePrivate 00063 { 00064 Q_OBJECT 00065 Q_CLASSINFO( "D-Bus Interface", "org.kde.dfaure" ) 00066 00067 public: 00068 ResourceBasePrivate( ResourceBase *parent ) 00069 : AgentBasePrivate( parent ), 00070 scheduler( 0 ), 00071 mItemSyncer( 0 ), 00072 mItemSyncFetchScope( 0 ), 00073 mItemTransactionMode( ItemSync::SingleTransaction ), 00074 mCollectionSyncer( 0 ), 00075 mHierarchicalRid( false ), 00076 mUnemittedProgress( 0 ), 00077 mAutomaticProgressReporting( true ) 00078 { 00079 Internal::setClientType( Internal::Resource ); 00080 mStatusMessage = defaultReadyMessage(); 00081 mProgressEmissionCompressor.setInterval( 1000 ); 00082 mProgressEmissionCompressor.setSingleShot( true ); 00083 } 00084 00085 ~ResourceBasePrivate() 00086 { 00087 delete mItemSyncFetchScope; 00088 } 00089 00090 Q_DECLARE_PUBLIC( ResourceBase ) 00091 00092 void delayedInit() 00093 { 00094 if ( !DBusConnectionPool::threadConnection().registerService( QLatin1String( "org.freedesktop.Akonadi.Resource." ) + mId ) ) { 00095 QString reason = DBusConnectionPool::threadConnection().lastError().message(); 00096 if ( reason.isEmpty() ) { 00097 reason = QString::fromLatin1( "this service is probably running already." ); 00098 } 00099 kError() << "Unable to register service at D-Bus: " << reason; 00100 00101 if ( QThread::currentThread() == QCoreApplication::instance()->thread() ) 00102 QCoreApplication::instance()->exit(1); 00103 00104 } else { 00105 AgentBasePrivate::delayedInit(); 00106 } 00107 } 00108 00109 virtual void changeProcessed() 00110 { 00111 mChangeRecorder->changeProcessed(); 00112 if ( !mChangeRecorder->isEmpty() ) 00113 scheduler->scheduleChangeReplay(); 00114 scheduler->taskDone(); 00115 } 00116 00117 void slotAbortRequested(); 00118 00119 void slotDeliveryDone( KJob* job ); 00120 void slotCollectionSyncDone( KJob *job ); 00121 void slotLocalListDone( KJob *job ); 00122 void slotSynchronizeCollection( const Collection &col ); 00123 void slotCollectionListDone( KJob *job ); 00124 void slotSynchronizeCollectionAttributes( const Collection &col ); 00125 void slotCollectionListForAttributesDone( KJob *job ); 00126 void slotCollectionAttributesSyncDone( KJob *job ); 00127 00128 void slotItemSyncDone( KJob *job ); 00129 00130 void slotPercent( KJob* job, unsigned long percent ); 00131 void slotDelayedEmitProgress(); 00132 void slotDeleteResourceCollection(); 00133 void slotDeleteResourceCollectionDone( KJob *job ); 00134 void slotCollectionDeletionDone( KJob *job ); 00135 00136 void slotPrepareItemRetrieval( const Akonadi::Item &item ); 00137 void slotPrepareItemRetrievalResult( KJob* job ); 00138 00139 void changeCommittedResult( KJob* job ); 00140 00141 void slotSessionReconnected() 00142 { 00143 Q_Q( ResourceBase ); 00144 00145 new ResourceSelectJob( q->identifier() ); 00146 } 00147 00148 void createItemSyncInstanceIfMissing() 00149 { 00150 Q_Q( ResourceBase ); 00151 Q_ASSERT_X( scheduler->currentTask().type == ResourceScheduler::SyncCollection, 00152 "createItemSyncInstance", "Calling items retrieval methods although no item retrieval is in progress" ); 00153 if ( !mItemSyncer ) { 00154 mItemSyncer = new ItemSync( q->currentCollection() ); 00155 mItemSyncer->setTransactionMode( mItemTransactionMode ); 00156 if ( mItemSyncFetchScope ) 00157 mItemSyncer->setFetchScope( *mItemSyncFetchScope ); 00158 mItemSyncer->setProperty( "collection", QVariant::fromValue( q->currentCollection() ) ); 00159 connect( mItemSyncer, SIGNAL( percent( KJob*, unsigned long ) ), q, SLOT( slotPercent( KJob*, unsigned long ) ) ); 00160 connect( mItemSyncer, SIGNAL( result( KJob* ) ), q, SLOT( slotItemSyncDone( KJob* ) ) ); 00161 } 00162 Q_ASSERT( mItemSyncer ); 00163 } 00164 00165 public Q_SLOTS: 00166 Q_SCRIPTABLE void dump() 00167 { 00168 scheduler->dump(); 00169 } 00170 00171 Q_SCRIPTABLE void clear() 00172 { 00173 scheduler->clear(); 00174 } 00175 00176 protected Q_SLOTS: 00177 // reimplementations from AgentbBasePrivate, containing sanity checks that only apply to resources 00178 // such as making sure that RIDs are present as well as translations of cross-resource moves 00179 // TODO: we could possibly add recovery code for no-RID notifications by re-enquing those to the change recorder 00180 // as the corresponding Add notifications, although that contains a risk of endless fail/retry loops 00181 00182 void itemAdded(const Akonadi::Item& item, const Akonadi::Collection& collection) 00183 { 00184 if ( collection.remoteId().isEmpty() ) { 00185 changeProcessed(); 00186 return; 00187 } 00188 AgentBasePrivate::itemAdded( item, collection ); 00189 } 00190 00191 void itemChanged(const Akonadi::Item& item, const QSet< QByteArray >& partIdentifiers) 00192 { 00193 if ( item.remoteId().isEmpty() ) { 00194 changeProcessed(); 00195 return; 00196 } 00197 AgentBasePrivate::itemChanged( item, partIdentifiers ); 00198 } 00199 00200 // TODO move the move translation code from AgebtBasePrivate here, it's wrong for agents 00201 void itemMoved(const Akonadi::Item &item, const Akonadi::Collection &source, const Akonadi::Collection &destination) 00202 { 00203 if ( item.remoteId().isEmpty() || destination.remoteId().isEmpty() || destination == source ) { 00204 changeProcessed(); 00205 return; 00206 } 00207 AgentBasePrivate::itemMoved( item, source, destination ); 00208 } 00209 00210 void itemRemoved(const Akonadi::Item& item) 00211 { 00212 if ( item.remoteId().isEmpty() ) { 00213 changeProcessed(); 00214 return; 00215 } 00216 AgentBasePrivate::itemRemoved( item ); 00217 } 00218 00219 void collectionAdded(const Akonadi::Collection& collection, const Akonadi::Collection& parent) 00220 { 00221 if ( parent.remoteId().isEmpty() ) { 00222 changeProcessed(); 00223 return; 00224 } 00225 AgentBasePrivate::collectionAdded( collection, parent ); 00226 } 00227 00228 void collectionChanged(const Akonadi::Collection& collection) 00229 { 00230 if ( collection.remoteId().isEmpty() ) { 00231 changeProcessed(); 00232 return; 00233 } 00234 AgentBasePrivate::collectionChanged( collection ); 00235 } 00236 00237 void collectionChanged(const Akonadi::Collection& collection, const QSet< QByteArray >& partIdentifiers) 00238 { 00239 if ( collection.remoteId().isEmpty() ) { 00240 changeProcessed(); 00241 return; 00242 } 00243 AgentBasePrivate::collectionChanged( collection, partIdentifiers ); 00244 } 00245 00246 // TODO move the move translation code from AgebtBasePrivate here, it's wrong for agents 00247 void collectionMoved(const Akonadi::Collection& collection, const Akonadi::Collection& source, const Akonadi::Collection& destination) 00248 { 00249 if ( collection.remoteId().isEmpty() || destination.remoteId().isEmpty() || source == destination ) { 00250 changeProcessed(); 00251 return; 00252 } 00253 AgentBasePrivate::collectionMoved( collection, source, destination ); 00254 } 00255 00256 void collectionRemoved(const Akonadi::Collection& collection) 00257 { 00258 if ( collection.remoteId().isEmpty() ) { 00259 changeProcessed(); 00260 return; 00261 } 00262 AgentBasePrivate::collectionRemoved( collection ); 00263 } 00264 00265 public: 00266 // synchronize states 00267 Collection currentCollection; 00268 00269 ResourceScheduler *scheduler; 00270 ItemSync *mItemSyncer; 00271 ItemFetchScope *mItemSyncFetchScope; 00272 ItemSync::TransactionMode mItemTransactionMode; 00273 CollectionSync *mCollectionSyncer; 00274 bool mHierarchicalRid; 00275 QTimer mProgressEmissionCompressor; 00276 int mUnemittedProgress; 00277 QMap<Akonadi::Collection::Id, QVariantMap> mUnemittedAdvancedStatus; 00278 bool mAutomaticProgressReporting; 00279 }; 00280 00281 ResourceBase::ResourceBase( const QString & id ) 00282 : AgentBase( new ResourceBasePrivate( this ), id ) 00283 { 00284 Q_D( ResourceBase ); 00285 00286 new Akonadi__ResourceAdaptor( this ); 00287 00288 d->scheduler = new ResourceScheduler( this ); 00289 00290 d->mChangeRecorder->setChangeRecordingEnabled( true ); 00291 connect( d->mChangeRecorder, SIGNAL( changesAdded() ), 00292 d->scheduler, SLOT( scheduleChangeReplay() ) ); 00293 00294 d->mChangeRecorder->setResourceMonitored( d->mId.toLatin1() ); 00295 d->mChangeRecorder->fetchCollection( true ); 00296 00297 connect( d->scheduler, SIGNAL( executeFullSync() ), 00298 SLOT( retrieveCollections() ) ); 00299 connect( d->scheduler, SIGNAL( executeCollectionTreeSync() ), 00300 SLOT( retrieveCollections() ) ); 00301 connect( d->scheduler, SIGNAL( executeCollectionSync( const Akonadi::Collection& ) ), 00302 SLOT( slotSynchronizeCollection( const Akonadi::Collection& ) ) ); 00303 connect( d->scheduler, SIGNAL( executeCollectionAttributesSync( const Akonadi::Collection& ) ), 00304 SLOT( slotSynchronizeCollectionAttributes(Akonadi::Collection)) ); 00305 connect( d->scheduler, SIGNAL( executeItemFetch( const Akonadi::Item&, const QSet<QByteArray>& ) ), 00306 SLOT( slotPrepareItemRetrieval(Akonadi::Item)) ); 00307 connect( d->scheduler, SIGNAL( executeResourceCollectionDeletion() ), 00308 SLOT( slotDeleteResourceCollection() ) ); 00309 connect( d->scheduler, SIGNAL( status( int, const QString& ) ), 00310 SIGNAL( status( int, const QString& ) ) ); 00311 connect( d->scheduler, SIGNAL( executeChangeReplay() ), 00312 d->mChangeRecorder, SLOT( replayNext() ) ); 00313 connect( d->scheduler, SIGNAL( fullSyncComplete() ), SIGNAL( synchronized() ) ); 00314 connect( d->mChangeRecorder, SIGNAL( nothingToReplay() ), d->scheduler, SLOT( taskDone() ) ); 00315 connect( d->mChangeRecorder, SIGNAL( collectionRemoved( const Akonadi::Collection& ) ), 00316 d->scheduler, SLOT( collectionRemoved( const Akonadi::Collection& ) ) ); 00317 connect( this, SIGNAL( abortRequested() ), this, SLOT( slotAbortRequested() ) ); 00318 connect( this, SIGNAL( synchronized() ), d->scheduler, SLOT( taskDone() ) ); 00319 connect( this, SIGNAL( agentNameChanged( const QString& ) ), 00320 this, SIGNAL( nameChanged( const QString& ) ) ); 00321 00322 connect( &d->mProgressEmissionCompressor, SIGNAL( timeout() ), 00323 this, SLOT( slotDelayedEmitProgress() ) ); 00324 00325 d->scheduler->setOnline( d->mOnline ); 00326 if ( !d->mChangeRecorder->isEmpty() ) 00327 d->scheduler->scheduleChangeReplay(); 00328 00329 DBusConnectionPool::threadConnection().registerObject( QLatin1String( "/Debug" ), d, QDBusConnection::ExportScriptableSlots ); 00330 00331 new ResourceSelectJob( identifier() ); 00332 00333 connect( d->mChangeRecorder->session(), SIGNAL( reconnected() ), SLOT( slotSessionReconnected() ) ); 00334 } 00335 00336 ResourceBase::~ResourceBase() 00337 { 00338 } 00339 00340 void ResourceBase::synchronize() 00341 { 00342 d_func()->scheduler->scheduleFullSync(); 00343 } 00344 00345 void ResourceBase::setName( const QString &name ) 00346 { 00347 AgentBase::setAgentName( name ); 00348 } 00349 00350 QString ResourceBase::name() const 00351 { 00352 return AgentBase::agentName(); 00353 } 00354 00355 QString ResourceBase::parseArguments( int argc, char **argv ) 00356 { 00357 QString identifier; 00358 if ( argc < 3 ) { 00359 kDebug() << "Not enough arguments passed..."; 00360 exit( 1 ); 00361 } 00362 00363 for ( int i = 1; i < argc - 1; ++i ) { 00364 if ( QLatin1String( argv[ i ] ) == QLatin1String( "--identifier" ) ) 00365 identifier = QLatin1String( argv[ i + 1 ] ); 00366 } 00367 00368 if ( identifier.isEmpty() ) { 00369 kDebug() << "Identifier argument missing"; 00370 exit( 1 ); 00371 } 00372 00373 const QFileInfo fi( QString::fromLocal8Bit( argv[0] ) ); 00374 // strip off full path and possible .exe suffix 00375 const QByteArray catalog = fi.baseName().toLatin1(); 00376 00377 KCmdLineArgs::init( argc, argv, identifier.toLatin1(), catalog, 00378 ki18nc( "@title application name", "Akonadi Resource" ), KDEPIMLIBS_VERSION, 00379 ki18nc( "@title application description", "Akonadi Resource" ) ); 00380 00381 KCmdLineOptions options; 00382 options.add( "identifier <argument>", 00383 ki18nc( "@label commandline option", "Resource identifier" ) ); 00384 KCmdLineArgs::addCmdLineOptions( options ); 00385 00386 return identifier; 00387 } 00388 00389 int ResourceBase::init( ResourceBase *r ) 00390 { 00391 QApplication::setQuitOnLastWindowClosed( false ); 00392 KGlobal::locale()->insertCatalog( QLatin1String( "libakonadi" ) ); 00393 int rv = kapp->exec(); 00394 delete r; 00395 return rv; 00396 } 00397 00398 void ResourceBasePrivate::slotAbortRequested() 00399 { 00400 Q_Q( ResourceBase ); 00401 00402 scheduler->cancelQueues(); 00403 QMetaObject::invokeMethod( q, "abortActivity" ); 00404 } 00405 00406 void ResourceBase::itemRetrieved( const Item &item ) 00407 { 00408 Q_D( ResourceBase ); 00409 Q_ASSERT( d->scheduler->currentTask().type == ResourceScheduler::FetchItem ); 00410 if ( !item.isValid() ) { 00411 d->scheduler->currentTask().sendDBusReplies( false ); 00412 d->scheduler->taskDone(); 00413 return; 00414 } 00415 00416 Item i( item ); 00417 QSet<QByteArray> requestedParts = d->scheduler->currentTask().itemParts; 00418 foreach ( const QByteArray &part, requestedParts ) { 00419 if ( !item.loadedPayloadParts().contains( part ) ) { 00420 kWarning() << "Item does not provide part" << part; 00421 } 00422 } 00423 00424 ItemModifyJob *job = new ItemModifyJob( i ); 00425 // FIXME: remove once the item with which we call retrieveItem() has a revision number 00426 job->disableRevisionCheck(); 00427 connect( job, SIGNAL( result( KJob* ) ), SLOT( slotDeliveryDone( KJob* ) ) ); 00428 } 00429 00430 void ResourceBasePrivate::slotDeliveryDone(KJob * job) 00431 { 00432 Q_Q( ResourceBase ); 00433 Q_ASSERT( scheduler->currentTask().type == ResourceScheduler::FetchItem ); 00434 if ( job->error() ) { 00435 emit q->error( QLatin1String( "Error while creating item: " ) + job->errorString() ); 00436 } 00437 scheduler->currentTask().sendDBusReplies( !job->error() ); 00438 scheduler->taskDone(); 00439 } 00440 00441 void ResourceBase::collectionAttributesRetrieved( const Collection &collection ) 00442 { 00443 Q_D( ResourceBase ); 00444 Q_ASSERT( d->scheduler->currentTask().type == ResourceScheduler::SyncCollectionAttributes ); 00445 if ( !collection.isValid() ) { 00446 emit attributesSynchronized( d->scheduler->currentTask().collection.id() ); 00447 d->scheduler->taskDone(); 00448 return; 00449 } 00450 00451 CollectionModifyJob *job = new CollectionModifyJob( collection ); 00452 connect( job, SIGNAL( result( KJob* ) ), SLOT( slotCollectionAttributesSyncDone( KJob* ) ) ); 00453 } 00454 00455 void ResourceBasePrivate::slotCollectionAttributesSyncDone(KJob * job) 00456 { 00457 Q_Q( ResourceBase ); 00458 Q_ASSERT( scheduler->currentTask().type == ResourceScheduler::SyncCollectionAttributes ); 00459 if ( job->error() ) { 00460 emit q->error( QLatin1String( "Error while updating collection: " ) + job->errorString() ); 00461 } 00462 emit q->attributesSynchronized( scheduler->currentTask().collection.id() ); 00463 scheduler->taskDone(); 00464 } 00465 00466 void ResourceBasePrivate::slotDeleteResourceCollection() 00467 { 00468 Q_Q( ResourceBase ); 00469 00470 CollectionFetchJob *job = new CollectionFetchJob( Collection::root(), CollectionFetchJob::FirstLevel ); 00471 job->fetchScope().setResource( q->identifier() ); 00472 connect( job, SIGNAL( result( KJob* ) ), q, SLOT( slotDeleteResourceCollectionDone( KJob* ) ) ); 00473 } 00474 00475 void ResourceBasePrivate::slotDeleteResourceCollectionDone( KJob *job ) 00476 { 00477 Q_Q( ResourceBase ); 00478 if ( job->error() ) { 00479 emit q->error( job->errorString() ); 00480 scheduler->taskDone(); 00481 } else { 00482 const CollectionFetchJob *fetchJob = static_cast<const CollectionFetchJob*>( job ); 00483 00484 if ( !fetchJob->collections().isEmpty() ) { 00485 CollectionDeleteJob *job = new CollectionDeleteJob( fetchJob->collections().first() ); 00486 connect( job, SIGNAL( result( KJob* ) ), q, SLOT( slotCollectionDeletionDone( KJob* ) ) ); 00487 } else { 00488 // there is no resource collection, so just ignore the request 00489 scheduler->taskDone(); 00490 } 00491 } 00492 } 00493 00494 void ResourceBasePrivate::slotCollectionDeletionDone( KJob *job ) 00495 { 00496 Q_Q( ResourceBase ); 00497 if ( job->error() ) { 00498 emit q->error( job->errorString() ); 00499 } 00500 00501 scheduler->taskDone(); 00502 } 00503 00504 void ResourceBase::changeCommitted( const Item& item ) 00505 { 00506 Q_D( ResourceBase ); 00507 ItemModifyJob *job = new ItemModifyJob( item ); 00508 job->d_func()->setClean(); 00509 job->disableRevisionCheck(); // TODO: remove, but where/how do we handle the error? 00510 job->setIgnorePayload( true ); // we only want to reset the dirty flag and update the remote id 00511 d->changeProcessed(); 00512 } 00513 00514 void ResourceBase::changeCommitted( const Collection &collection ) 00515 { 00516 CollectionModifyJob *job = new CollectionModifyJob( collection ); 00517 connect( job, SIGNAL( result( KJob* ) ), SLOT( changeCommittedResult( KJob* ) ) ); 00518 } 00519 00520 void ResourceBasePrivate::changeCommittedResult( KJob *job ) 00521 { 00522 Q_Q( ResourceBase ); 00523 if ( job->error() ) 00524 emit q->error( i18nc( "@info", "Updating local collection failed: %1.", job->errorText() ) ); 00525 mChangeRecorder->d_ptr->invalidateCache( static_cast<CollectionModifyJob*>( job )->collection() ); 00526 changeProcessed(); 00527 } 00528 00529 bool ResourceBase::requestItemDelivery( qint64 uid, const QString & remoteId, 00530 const QString &mimeType, const QStringList &_parts ) 00531 { 00532 Q_D( ResourceBase ); 00533 if ( !isOnline() ) { 00534 emit error( i18nc( "@info", "Cannot fetch item in offline mode." ) ); 00535 return false; 00536 } 00537 00538 setDelayedReply( true ); 00539 // FIXME: we need at least the revision number too 00540 Item item( uid ); 00541 item.setMimeType( mimeType ); 00542 item.setRemoteId( remoteId ); 00543 00544 QSet<QByteArray> parts; 00545 Q_FOREACH( const QString &str, _parts ) 00546 parts.insert( str.toLatin1() ); 00547 00548 d->scheduler->scheduleItemFetch( item, parts, message().createReply() ); 00549 00550 return true; 00551 } 00552 00553 void ResourceBase::collectionsRetrieved( const Collection::List & collections ) 00554 { 00555 Q_D( ResourceBase ); 00556 Q_ASSERT_X( d->scheduler->currentTask().type == ResourceScheduler::SyncCollectionTree || 00557 d->scheduler->currentTask().type == ResourceScheduler::SyncAll, 00558 "ResourceBase::collectionsRetrieved()", 00559 "Calling collectionsRetrieved() although no collection retrieval is in progress" ); 00560 if ( !d->mCollectionSyncer ) { 00561 d->mCollectionSyncer = new CollectionSync( identifier() ); 00562 d->mCollectionSyncer->setHierarchicalRemoteIds( d->mHierarchicalRid ); 00563 connect( d->mCollectionSyncer, SIGNAL( percent( KJob*, unsigned long ) ), SLOT( slotPercent( KJob*, unsigned long ) ) ); 00564 connect( d->mCollectionSyncer, SIGNAL( result( KJob* ) ), SLOT( slotCollectionSyncDone( KJob* ) ) ); 00565 } 00566 d->mCollectionSyncer->setRemoteCollections( collections ); 00567 } 00568 00569 void ResourceBase::collectionsRetrievedIncremental( const Collection::List & changedCollections, 00570 const Collection::List & removedCollections ) 00571 { 00572 Q_D( ResourceBase ); 00573 Q_ASSERT_X( d->scheduler->currentTask().type == ResourceScheduler::SyncCollectionTree || 00574 d->scheduler->currentTask().type == ResourceScheduler::SyncAll, 00575 "ResourceBase::collectionsRetrievedIncremental()", 00576 "Calling collectionsRetrievedIncremental() although no collection retrieval is in progress" ); 00577 if ( !d->mCollectionSyncer ) { 00578 d->mCollectionSyncer = new CollectionSync( identifier() ); 00579 d->mCollectionSyncer->setHierarchicalRemoteIds( d->mHierarchicalRid ); 00580 connect( d->mCollectionSyncer, SIGNAL( percent( KJob*, unsigned long ) ), SLOT( slotPercent( KJob*, unsigned long ) ) ); 00581 connect( d->mCollectionSyncer, SIGNAL( result( KJob* ) ), SLOT( slotCollectionSyncDone( KJob* ) ) ); 00582 } 00583 d->mCollectionSyncer->setRemoteCollections( changedCollections, removedCollections ); 00584 } 00585 00586 void ResourceBase::setCollectionStreamingEnabled( bool enable ) 00587 { 00588 Q_D( ResourceBase ); 00589 Q_ASSERT_X( d->scheduler->currentTask().type == ResourceScheduler::SyncCollectionTree || 00590 d->scheduler->currentTask().type == ResourceScheduler::SyncAll, 00591 "ResourceBase::setCollectionStreamingEnabled()", 00592 "Calling setCollectionStreamingEnabled() although no collection retrieval is in progress" ); 00593 if ( !d->mCollectionSyncer ) { 00594 d->mCollectionSyncer = new CollectionSync( identifier() ); 00595 d->mCollectionSyncer->setHierarchicalRemoteIds( d->mHierarchicalRid ); 00596 connect( d->mCollectionSyncer, SIGNAL( percent( KJob*, unsigned long ) ), SLOT( slotPercent( KJob*, unsigned long ) ) ); 00597 connect( d->mCollectionSyncer, SIGNAL( result( KJob* ) ), SLOT( slotCollectionSyncDone( KJob* ) ) ); 00598 } 00599 d->mCollectionSyncer->setStreamingEnabled( enable ); 00600 } 00601 00602 void ResourceBase::collectionsRetrievalDone() 00603 { 00604 Q_D( ResourceBase ); 00605 Q_ASSERT_X( d->scheduler->currentTask().type == ResourceScheduler::SyncCollectionTree || 00606 d->scheduler->currentTask().type == ResourceScheduler::SyncAll, 00607 "ResourceBase::collectionsRetrievalDone()", 00608 "Calling collectionsRetrievalDone() although no collection retrieval is in progress" ); 00609 // streaming enabled, so finalize the sync 00610 if ( d->mCollectionSyncer ) { 00611 d->mCollectionSyncer->retrievalDone(); 00612 } 00613 // user did the sync himself, we are done now 00614 else { 00615 // FIXME: we need the same special case for SyncAll as in slotCollectionSyncDone here! 00616 d->scheduler->taskDone(); 00617 } 00618 } 00619 00620 void ResourceBasePrivate::slotCollectionSyncDone( KJob * job ) 00621 { 00622 Q_Q( ResourceBase ); 00623 mCollectionSyncer = 0; 00624 if ( job->error() ) { 00625 if ( job->error() != Job::UserCanceled ) 00626 emit q->error( job->errorString() ); 00627 } else { 00628 if ( scheduler->currentTask().type == ResourceScheduler::SyncAll ) { 00629 CollectionFetchJob *list = new CollectionFetchJob( Collection::root(), CollectionFetchJob::Recursive ); 00630 list->setFetchScope( q->changeRecorder()->collectionFetchScope() ); 00631 list->fetchScope().setResource( mId ); 00632 q->connect( list, SIGNAL( result( KJob* ) ), q, SLOT( slotLocalListDone( KJob* ) ) ); 00633 return; 00634 } 00635 } 00636 scheduler->taskDone(); 00637 } 00638 00639 void ResourceBasePrivate::slotLocalListDone( KJob * job ) 00640 { 00641 Q_Q( ResourceBase ); 00642 if ( job->error() ) { 00643 emit q->error( job->errorString() ); 00644 } else { 00645 Collection::List cols = static_cast<CollectionFetchJob*>( job )->collections(); 00646 foreach ( const Collection &col, cols ) { 00647 scheduler->scheduleSync( col ); 00648 } 00649 scheduler->scheduleFullSyncCompletion(); 00650 } 00651 scheduler->taskDone(); 00652 } 00653 00654 void ResourceBasePrivate::slotSynchronizeCollection( const Collection &col ) 00655 { 00656 Q_Q( ResourceBase ); 00657 currentCollection = col; 00658 // check if this collection actually can contain anything 00659 QStringList contentTypes = currentCollection.contentMimeTypes(); 00660 contentTypes.removeAll( Collection::mimeType() ); 00661 if ( !contentTypes.isEmpty() || (col.rights() & (Collection::CanLinkItem)) ) { // HACK to check for virtual collections 00662 if ( mAutomaticProgressReporting ) { 00663 emit q->status( AgentBase::Running, i18nc( "@info:status", "Syncing collection '%1'", currentCollection.name() ) ); 00664 } 00665 q->retrieveItems( currentCollection ); 00666 return; 00667 } 00668 scheduler->taskDone(); 00669 } 00670 00671 void ResourceBasePrivate::slotSynchronizeCollectionAttributes( const Collection &col ) 00672 { 00673 Q_Q( ResourceBase ); 00674 QMetaObject::invokeMethod( q, "retrieveCollectionAttributes", Q_ARG( Akonadi::Collection, col ) ); 00675 } 00676 00677 void ResourceBasePrivate::slotPrepareItemRetrieval( const Akonadi::Item &item ) 00678 { 00679 Q_Q( ResourceBase ); 00680 ItemFetchJob *fetch = new ItemFetchJob( item, this ); 00681 fetch->fetchScope().setAncestorRetrieval( q->changeRecorder()->itemFetchScope().ancestorRetrieval() ); 00682 fetch->fetchScope().setCacheOnly( true ); 00683 00684 // copy list of attributes to fetch 00685 const QSet<QByteArray> attributes = q->changeRecorder()->itemFetchScope().attributes(); 00686 foreach ( const QByteArray &attribute, attributes ) 00687 fetch->fetchScope().fetchAttribute( attribute ); 00688 00689 q->connect( fetch, SIGNAL( result( KJob* ) ), SLOT( slotPrepareItemRetrievalResult( KJob* ) ) ); 00690 } 00691 00692 void ResourceBasePrivate::slotPrepareItemRetrievalResult( KJob* job ) 00693 { 00694 Q_Q( ResourceBase ); 00695 Q_ASSERT_X( scheduler->currentTask().type == ResourceScheduler::FetchItem, 00696 "ResourceBasePrivate::slotPrepareItemRetrievalResult()", 00697 "Preparing item retrieval although no item retrieval is in progress" ); 00698 if ( job->error() ) { 00699 q->cancelTask( job->errorText() ); 00700 return; 00701 } 00702 ItemFetchJob *fetch = qobject_cast<ItemFetchJob*>( job ); 00703 if ( fetch->items().count() != 1 ) { 00704 q->cancelTask( i18n( "The requested item no longer exists" ) ); 00705 return; 00706 } 00707 const Item item = fetch->items().first(); 00708 const QSet<QByteArray> parts = scheduler->currentTask().itemParts; 00709 if ( !q->retrieveItem( item, parts ) ) 00710 q->cancelTask(); 00711 } 00712 00713 void ResourceBase::itemsRetrievalDone() 00714 { 00715 Q_D( ResourceBase ); 00716 // streaming enabled, so finalize the sync 00717 if ( d->mItemSyncer ) { 00718 d->mItemSyncer->deliveryDone(); 00719 } 00720 // user did the sync himself, we are done now 00721 else { 00722 d->scheduler->taskDone(); 00723 } 00724 } 00725 00726 void ResourceBase::clearCache() 00727 { 00728 Q_D( ResourceBase ); 00729 d->scheduler->scheduleResourceCollectionDeletion(); 00730 } 00731 00732 Collection ResourceBase::currentCollection() const 00733 { 00734 Q_D( const ResourceBase ); 00735 Q_ASSERT_X( d->scheduler->currentTask().type == ResourceScheduler::SyncCollection , 00736 "ResourceBase::currentCollection()", 00737 "Trying to access current collection although no item retrieval is in progress" ); 00738 return d->currentCollection; 00739 } 00740 00741 Item ResourceBase::currentItem() const 00742 { 00743 Q_D( const ResourceBase ); 00744 Q_ASSERT_X( d->scheduler->currentTask().type == ResourceScheduler::FetchItem , 00745 "ResourceBase::currentItem()", 00746 "Trying to access current item although no item retrieval is in progress" ); 00747 return d->scheduler->currentTask().item; 00748 } 00749 00750 void ResourceBase::synchronizeCollectionTree() 00751 { 00752 d_func()->scheduler->scheduleCollectionTreeSync(); 00753 } 00754 00755 void ResourceBase::cancelTask() 00756 { 00757 Q_D( ResourceBase ); 00758 switch ( d->scheduler->currentTask().type ) { 00759 case ResourceScheduler::FetchItem: 00760 itemRetrieved( Item() ); // sends the error reply and 00761 break; 00762 case ResourceScheduler::ChangeReplay: 00763 d->changeProcessed(); 00764 break; 00765 case ResourceScheduler::SyncCollectionTree: 00766 case ResourceScheduler::SyncAll: 00767 if ( d->mCollectionSyncer ) 00768 d->mCollectionSyncer->rollback(); 00769 else 00770 d->scheduler->taskDone(); 00771 break; 00772 case ResourceScheduler::SyncCollection: 00773 if ( d->mItemSyncer ) 00774 d->mItemSyncer->rollback(); 00775 else 00776 d->scheduler->taskDone(); 00777 break; 00778 default: 00779 d->scheduler->taskDone(); 00780 } 00781 } 00782 00783 void ResourceBase::cancelTask( const QString &msg ) 00784 { 00785 cancelTask(); 00786 00787 emit error( msg ); 00788 } 00789 00790 void ResourceBase::deferTask() 00791 { 00792 Q_D( ResourceBase ); 00793 d->scheduler->deferTask(); 00794 } 00795 00796 void ResourceBase::doSetOnline( bool state ) 00797 { 00798 d_func()->scheduler->setOnline( state ); 00799 } 00800 00801 void ResourceBase::synchronizeCollection( qint64 collectionId ) 00802 { 00803 synchronizeCollection( collectionId, false ); 00804 } 00805 00806 void ResourceBase::synchronizeCollection( qint64 collectionId, bool recursive ) 00807 { 00808 CollectionFetchJob* job = new CollectionFetchJob( Collection( collectionId ), recursive ? CollectionFetchJob::Recursive : CollectionFetchJob::Base ); 00809 job->setFetchScope( changeRecorder()->collectionFetchScope() ); 00810 job->fetchScope().setResource( identifier() ); 00811 job->setProperty( "recursive", recursive ); 00812 connect( job, SIGNAL( result( KJob* ) ), SLOT( slotCollectionListDone( KJob* ) ) ); 00813 } 00814 00815 void ResourceBasePrivate::slotCollectionListDone( KJob *job ) 00816 { 00817 if ( !job->error() ) { 00818 Collection::List list = static_cast<CollectionFetchJob*>( job )->collections(); 00819 if ( !list.isEmpty() ) { 00820 if ( job->property( "recursive" ).toBool() ) { 00821 Q_FOREACH ( const Collection &collection, list ) { 00822 scheduler->scheduleSync( collection ); 00823 } 00824 } else { 00825 scheduler->scheduleSync( list.first() ); 00826 } 00827 } 00828 } 00829 // TODO: error handling 00830 } 00831 00832 void ResourceBase::synchronizeCollectionAttributes( qint64 collectionId ) 00833 { 00834 CollectionFetchJob* job = new CollectionFetchJob( Collection( collectionId ), CollectionFetchJob::Base ); 00835 job->setFetchScope( changeRecorder()->collectionFetchScope() ); 00836 job->fetchScope().setResource( identifier() ); 00837 connect( job, SIGNAL( result( KJob* ) ), SLOT( slotCollectionListForAttributesDone( KJob* ) ) ); 00838 } 00839 00840 void ResourceBasePrivate::slotCollectionListForAttributesDone( KJob *job ) 00841 { 00842 if ( !job->error() ) { 00843 Collection::List list = static_cast<CollectionFetchJob*>( job )->collections(); 00844 if ( !list.isEmpty() ) { 00845 Collection col = list.first(); 00846 scheduler->scheduleAttributesSync( col ); 00847 } 00848 } 00849 // TODO: error handling 00850 } 00851 00852 void ResourceBase::setTotalItems( int amount ) 00853 { 00854 kDebug() << amount; 00855 Q_D( ResourceBase ); 00856 setItemStreamingEnabled( true ); 00857 d->mItemSyncer->setTotalItems( amount ); 00858 } 00859 00860 void ResourceBase::setItemStreamingEnabled( bool enable ) 00861 { 00862 Q_D( ResourceBase ); 00863 d->createItemSyncInstanceIfMissing(); 00864 d->mItemSyncer->setStreamingEnabled( enable ); 00865 } 00866 00867 void ResourceBase::itemsRetrieved( const Item::List &items ) 00868 { 00869 Q_D( ResourceBase ); 00870 d->createItemSyncInstanceIfMissing(); 00871 d->mItemSyncer->setFullSyncItems( items ); 00872 } 00873 00874 void ResourceBase::itemsRetrievedIncremental( const Item::List &changedItems, const Item::List &removedItems ) 00875 { 00876 Q_D( ResourceBase ); 00877 d->createItemSyncInstanceIfMissing(); 00878 d->mItemSyncer->setIncrementalSyncItems( changedItems, removedItems ); 00879 } 00880 00881 void ResourceBasePrivate::slotItemSyncDone( KJob *job ) 00882 { 00883 mItemSyncer = 0; 00884 Q_Q( ResourceBase ); 00885 if ( job->error() && job->error() != Job::UserCanceled ) { 00886 emit q->error( job->errorString() ); 00887 } 00888 scheduler->taskDone(); 00889 } 00890 00891 00892 void ResourceBasePrivate::slotDelayedEmitProgress() 00893 { 00894 Q_Q( ResourceBase ); 00895 if ( mAutomaticProgressReporting ) { 00896 emit q->percent( mUnemittedProgress ); 00897 00898 Q_FOREACH( const QVariantMap &statusMap, mUnemittedAdvancedStatus ) { 00899 emit q->advancedStatus( statusMap ); 00900 } 00901 } 00902 mUnemittedProgress = 0; 00903 mUnemittedAdvancedStatus.clear(); 00904 } 00905 00906 void ResourceBasePrivate::slotPercent( KJob *job, unsigned long percent ) 00907 { 00908 mUnemittedProgress = percent; 00909 00910 const Collection collection = job->property( "collection" ).value<Collection>(); 00911 if ( collection.isValid() ) { 00912 QVariantMap statusMap; 00913 statusMap.insert( QLatin1String( "key" ), QString::fromLatin1( "collectionSyncProgress" ) ); 00914 statusMap.insert( QLatin1String( "collectionId" ), collection.id() ); 00915 statusMap.insert( QLatin1String( "percent" ), static_cast<unsigned int>( percent ) ); 00916 00917 mUnemittedAdvancedStatus[collection.id()] = statusMap; 00918 } 00919 // deliver completion right away, intermediate progress at 1s intervals 00920 if ( percent == 100 ) { 00921 mProgressEmissionCompressor.stop(); 00922 slotDelayedEmitProgress(); 00923 } else if ( !mProgressEmissionCompressor.isActive() ) { 00924 mProgressEmissionCompressor.start(); 00925 } 00926 } 00927 00928 void ResourceBase::setHierarchicalRemoteIdentifiersEnabled( bool enable ) 00929 { 00930 Q_D( ResourceBase ); 00931 d->mHierarchicalRid = enable; 00932 } 00933 00934 void ResourceBase::scheduleCustomTask( QObject *receiver, const char *method, const QVariant &argument, SchedulePriority priority ) 00935 { 00936 Q_D( ResourceBase ); 00937 d->scheduler->scheduleCustomTask( receiver, method, argument, priority ); 00938 } 00939 00940 void ResourceBase::taskDone() 00941 { 00942 Q_D( ResourceBase ); 00943 d->scheduler->taskDone(); 00944 } 00945 00946 void ResourceBase::retrieveCollectionAttributes( const Collection &collection ) 00947 { 00948 collectionAttributesRetrieved( collection ); 00949 } 00950 00951 void Akonadi::ResourceBase::abortActivity() 00952 { 00953 00954 } 00955 00956 void ResourceBase::setItemTransactionMode(ItemSync::TransactionMode mode) 00957 { 00958 Q_D( ResourceBase ); 00959 d->mItemTransactionMode = mode; 00960 } 00961 00962 void ResourceBase::setItemSynchronizationFetchScope(const ItemFetchScope& fetchScope) 00963 { 00964 Q_D( ResourceBase ); 00965 if ( !d->mItemSyncFetchScope ) 00966 d->mItemSyncFetchScope = new ItemFetchScope; 00967 *(d->mItemSyncFetchScope) = fetchScope; 00968 } 00969 00970 void ResourceBase::setAutomaticProgressReporting( bool enabled ) 00971 { 00972 Q_D( ResourceBase ); 00973 d->mAutomaticProgressReporting = enabled; 00974 } 00975 00976 #include "resourcebase.moc" 00977 #include "moc_resourcebase.cpp"