00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #include "itemsync.h"
00022
00023 #include "collection.h"
00024 #include "item.h"
00025 #include "itemcreatejob.h"
00026 #include "itemdeletejob.h"
00027 #include "itemfetchjob.h"
00028 #include "itemmodifyjob.h"
00029 #include "transactionsequence.h"
00030 #include "itemfetchscope.h"
00031
00032 #include <kdebug.h>
00033
00034 #include <QtCore/QStringList>
00035
00036 using namespace Akonadi;
00037
00041 class ItemSync::Private
00042 {
00043 public:
00044 Private( ItemSync *parent ) :
00045 q( parent ),
00046 mTransactionMode( SingleTransaction ),
00047 mCurrentTransaction( 0 ),
00048 mTransactionJobs( 0 ),
00049 mPendingJobs( 0 ),
00050 mProgress( 0 ),
00051 mTotalItems( -1 ),
00052 mTotalItemsProcessed( 0 ),
00053 mStreaming( false ),
00054 mIncremental( false ),
00055 mLocalListDone( false ),
00056 mDeliveryDone( false ),
00057 mFinished( false )
00058 {
00059
00060 mFetchScope.fetchFullPayload();
00061 mFetchScope.fetchAllAttributes();
00062 }
00063
00064 void createLocalItem( const Item &item );
00065 void checkDone();
00066 void slotLocalListDone( KJob* );
00067 void slotLocalDeleteDone( KJob* );
00068 void slotLocalChangeDone( KJob* );
00069 void execute();
00070 void processItems();
00071 void deleteItems( const Item::List &items );
00072 void slotTransactionResult( KJob *job );
00073 Job* subjobParent() const;
00074
00075 ItemSync *q;
00076 Collection mSyncCollection;
00077 QHash<Item::Id, Akonadi::Item> mLocalItemsById;
00078 QHash<QString, Akonadi::Item> mLocalItemsByRemoteId;
00079 QSet<Akonadi::Item> mUnprocessedLocalItems;
00080
00081 ItemSync::TransactionMode mTransactionMode;
00082 TransactionSequence *mCurrentTransaction;
00083 int mTransactionJobs;
00084
00085
00086 ItemFetchScope mFetchScope;
00087
00088
00089 Akonadi::Item::List mRemoteItems;
00090
00091
00092 Item::List mRemovedRemoteItems;
00093
00094
00095 int mPendingJobs;
00096 int mProgress;
00097 int mTotalItems;
00098 int mTotalItemsProcessed;
00099
00100 bool mStreaming;
00101 bool mIncremental;
00102 bool mLocalListDone;
00103 bool mDeliveryDone;
00104 bool mFinished;
00105 };
00106
00107 void ItemSync::Private::createLocalItem( const Item & item )
00108 {
00109
00110 if ( q->error() )
00111 return;
00112 mPendingJobs++;
00113 ItemCreateJob *create = new ItemCreateJob( item, mSyncCollection, subjobParent() );
00114 q->connect( create, SIGNAL( result( KJob* ) ), q, SLOT( slotLocalChangeDone( KJob* ) ) );
00115 }
00116
00117 void ItemSync::Private::checkDone()
00118 {
00119 q->setProcessedAmount( KJob::Bytes, mProgress );
00120 if ( mPendingJobs > 0 || !mDeliveryDone || mTransactionJobs > 0 )
00121 return;
00122
00123 if ( !mFinished ) {
00124 mFinished = true;
00125 q->emitResult();
00126 }
00127 }
00128
00129 ItemSync::ItemSync( const Collection &collection, QObject *parent ) :
00130 Job( parent ),
00131 d( new Private( this ) )
00132 {
00133 d->mSyncCollection = collection;
00134 }
00135
00136 ItemSync::~ItemSync()
00137 {
00138 delete d;
00139 }
00140
00141 void ItemSync::setFullSyncItems( const Item::List &items )
00142 {
00143 Q_ASSERT( !d->mIncremental );
00144 if ( !d->mStreaming )
00145 d->mDeliveryDone = true;
00146 d->mRemoteItems += items;
00147 d->mTotalItemsProcessed += items.count();
00148 kDebug() << "Received: " << items.count() << "In total: " << d->mTotalItemsProcessed << " Wanted: " << d->mTotalItems;
00149 setTotalAmount( KJob::Bytes, d->mTotalItemsProcessed );
00150 if ( d->mTotalItemsProcessed == d->mTotalItems )
00151 d->mDeliveryDone = true;
00152 d->execute();
00153 }
00154
00155 void ItemSync::setTotalItems( int amount )
00156 {
00157 Q_ASSERT( !d->mIncremental );
00158 Q_ASSERT( amount >= 0 );
00159 setStreamingEnabled( true );
00160 kDebug() << amount;
00161 d->mTotalItems = amount;
00162 setTotalAmount( KJob::Bytes, amount );
00163 if ( d->mTotalItems == 0 ) {
00164 d->mDeliveryDone = true;
00165 d->execute();
00166 }
00167 }
00168
00169 void ItemSync::setIncrementalSyncItems( const Item::List &changedItems, const Item::List &removedItems )
00170 {
00171 d->mIncremental = true;
00172 if ( !d->mStreaming )
00173 d->mDeliveryDone = true;
00174 d->mRemoteItems += changedItems;
00175 d->mRemovedRemoteItems += removedItems;
00176 d->mTotalItemsProcessed += changedItems.count() + removedItems.count();
00177 setTotalAmount( KJob::Bytes, d->mTotalItemsProcessed );
00178 if ( d->mTotalItemsProcessed == d->mTotalItems )
00179 d->mDeliveryDone = true;
00180 d->execute();
00181 }
00182
00183 void ItemSync::setFetchScope( ItemFetchScope &fetchScope )
00184 {
00185 d->mFetchScope = fetchScope;
00186 }
00187
00188 ItemFetchScope &ItemSync::fetchScope()
00189 {
00190 return d->mFetchScope;
00191 }
00192
00193 void ItemSync::doStart()
00194 {
00195 ItemFetchJob* job = new ItemFetchJob( d->mSyncCollection, this );
00196 job->setFetchScope( d->mFetchScope );
00197
00198
00199 job->fetchScope().setCacheOnly( true );
00200
00201 connect( job, SIGNAL( result( KJob* ) ), SLOT( slotLocalListDone( KJob* ) ) );
00202 }
00203
00204 bool ItemSync::updateItem( const Item &storedItem, Item &newItem )
00205 {
00206
00207 if ( error() )
00208 return false;
00209
00210
00211
00212
00213
00214
00215 if ( d->mIncremental )
00216 return true;
00217
00218
00219 if ( storedItem.flags() != newItem.flags() ) {
00220 kDebug() << "Stored flags " << storedItem.flags()
00221 << "new flags " << newItem.flags();
00222 return true;
00223 }
00224
00225
00226 QSet<QByteArray> missingParts = newItem.loadedPayloadParts();
00227 missingParts.subtract( storedItem.loadedPayloadParts() );
00228 if ( !missingParts.isEmpty() )
00229 return true;
00230
00231
00232
00233
00234 if ( newItem.hasPayload()
00235 && storedItem.payloadData() != newItem.payloadData() )
00236 return true;
00237
00238
00239 foreach ( Attribute* attr, newItem.attributes() ) {
00240 if ( !storedItem.hasAttribute( attr->type() ) )
00241 return true;
00242 if ( attr->serialized() != storedItem.attribute( attr->type() )->serialized() )
00243 return true;
00244 }
00245
00246 return false;
00247 }
00248
00249 void ItemSync::Private::slotLocalListDone( KJob * job )
00250 {
00251 if ( !job->error() ) {
00252 const Item::List list = static_cast<ItemFetchJob*>( job )->items();
00253 foreach ( const Item &item, list ) {
00254 if ( item.remoteId().isEmpty() )
00255 continue;
00256 mLocalItemsById.insert( item.id(), item );
00257 mLocalItemsByRemoteId.insert( item.remoteId(), item );
00258 mUnprocessedLocalItems.insert( item );
00259 }
00260 }
00261
00262 mLocalListDone = true;
00263 execute();
00264 }
00265
00266 void ItemSync::Private::execute()
00267 {
00268 if ( !mLocalListDone )
00269 return;
00270
00271
00272
00273 if ( !mDeliveryDone && mRemoteItems.isEmpty() )
00274 return;
00275
00276 if ( (mTransactionMode == SingleTransaction && !mCurrentTransaction) || mTransactionMode == MultipleTransactions) {
00277 ++mTransactionJobs;
00278 mCurrentTransaction = new TransactionSequence( q );
00279 mCurrentTransaction->setAutomaticCommittingEnabled( false );
00280 connect( mCurrentTransaction, SIGNAL( result( KJob* ) ), q, SLOT( slotTransactionResult( KJob* ) ) );
00281 }
00282
00283 processItems();
00284 if ( !mDeliveryDone ) {
00285 if ( mTransactionMode == MultipleTransactions && mCurrentTransaction ) {
00286 mCurrentTransaction->commit();
00287 mCurrentTransaction = 0;
00288 }
00289 return;
00290 }
00291
00292
00293 if ( !mIncremental ) {
00294 mRemovedRemoteItems = mUnprocessedLocalItems.toList();
00295 mUnprocessedLocalItems.clear();
00296 }
00297
00298 deleteItems( mRemovedRemoteItems );
00299 mLocalItemsById.clear();
00300 mLocalItemsByRemoteId.clear();
00301 mRemovedRemoteItems.clear();
00302
00303 if ( mCurrentTransaction ) {
00304 mCurrentTransaction->commit();
00305 mCurrentTransaction = 0;
00306 }
00307
00308 checkDone();
00309 }
00310
00311 void ItemSync::Private::processItems()
00312 {
00313
00314 foreach ( Item remoteItem, mRemoteItems ) {
00315 #ifndef NDEBUG
00316 if ( remoteItem.remoteId().isEmpty() ) {
00317 kWarning() << "Item " << remoteItem.id() << " does not have a remote identifier";
00318 }
00319 #endif
00320
00321 Item localItem = mLocalItemsById.value( remoteItem.id() );
00322 if ( !localItem.isValid() )
00323 localItem = mLocalItemsByRemoteId.value( remoteItem.remoteId() );
00324 mUnprocessedLocalItems.remove( localItem );
00325
00326 if ( !localItem.isValid() ) {
00327 createLocalItem( remoteItem );
00328 continue;
00329 }
00330
00331 if ( q->updateItem( localItem, remoteItem ) ) {
00332 mPendingJobs++;
00333
00334 remoteItem.setId( localItem.id() );
00335 remoteItem.setRevision( localItem.revision() );
00336 remoteItem.setSize( localItem.size() );
00337 remoteItem.setRemoteId( localItem.remoteId() );
00338 ItemModifyJob *mod = new ItemModifyJob( remoteItem, subjobParent() );
00339 mod->disableRevisionCheck();
00340 q->connect( mod, SIGNAL( result( KJob* ) ), q, SLOT( slotLocalChangeDone( KJob* ) ) );
00341 } else {
00342 mProgress++;
00343 }
00344 }
00345 mRemoteItems.clear();
00346 }
00347
00348 void ItemSync::Private::deleteItems( const Item::List &items )
00349 {
00350
00351 if ( q->error() )
00352 return;
00353
00354 Item::List itemsToDelete;
00355 foreach ( const Item &item, items ) {
00356 Item delItem( item );
00357 if ( !item.isValid() ) {
00358 delItem = mLocalItemsByRemoteId.value( item.remoteId() );
00359 }
00360
00361 if ( !delItem.isValid() ) {
00362 #ifndef NDEBUG
00363 kWarning() << "Delete item (remoteeId=" << item.remoteId()
00364 << "mimeType=" << item.mimeType()
00365 << ") does not have a valid UID and no item with that remote ID exists either";
00366 #endif
00367 continue;
00368 }
00369
00370 if ( delItem.remoteId().isEmpty() ) {
00371
00372 continue;
00373 }
00374
00375 itemsToDelete.append ( delItem );
00376 }
00377
00378 if ( !itemsToDelete.isEmpty() ) {
00379 mPendingJobs++;
00380 ItemDeleteJob *job = new ItemDeleteJob( itemsToDelete, subjobParent() );
00381 q->connect( job, SIGNAL( result( KJob* ) ), q, SLOT( slotLocalDeleteDone( KJob* ) ) );
00382
00383
00384
00385
00386
00387 TransactionSequence *transaction = qobject_cast<TransactionSequence*>( subjobParent() );
00388 if ( transaction )
00389 transaction->setIgnoreJobFailure( job );
00390 }
00391 }
00392
00393 void ItemSync::Private::slotLocalDeleteDone( KJob* )
00394 {
00395 mPendingJobs--;
00396 mProgress++;
00397
00398 checkDone();
00399 }
00400
00401 void ItemSync::Private::slotLocalChangeDone( KJob * job )
00402 {
00403 Q_UNUSED( job );
00404 mPendingJobs--;
00405 mProgress++;
00406
00407 checkDone();
00408 }
00409
00410 void ItemSync::Private::slotTransactionResult( KJob *job )
00411 {
00412 --mTransactionJobs;
00413 if ( mCurrentTransaction == job )
00414 mCurrentTransaction = 0;
00415
00416 checkDone();
00417 }
00418
00419 Job * ItemSync::Private::subjobParent() const
00420 {
00421 if ( mCurrentTransaction && mTransactionMode != NoTransaction )
00422 return mCurrentTransaction;
00423 return q;
00424 }
00425
00426 void ItemSync::setStreamingEnabled(bool enable)
00427 {
00428 d->mStreaming = enable;
00429 }
00430
00431 void ItemSync::deliveryDone()
00432 {
00433 Q_ASSERT( d->mStreaming );
00434 d->mDeliveryDone = true;
00435 d->execute();
00436 }
00437
00438 void ItemSync::slotResult(KJob* job)
00439 {
00440 if ( job->error() ) {
00441
00442 Akonadi::Job::removeSubjob( job );
00443
00444 if ( !error() ) {
00445 setError( job->error() );
00446 setErrorText( job->errorText() );
00447 }
00448 } else {
00449 Akonadi::Job::slotResult( job );
00450 }
00451 }
00452
00453 void ItemSync::rollback()
00454 {
00455 setError( UserCanceled );
00456 if ( d->mCurrentTransaction )
00457 d->mCurrentTransaction->rollback();
00458 d->mDeliveryDone = true;
00459 d->execute();
00460 }
00461
00462 void ItemSync::setTransactionMode(ItemSync::TransactionMode mode)
00463 {
00464 d->mTransactionMode = mode;
00465 }
00466
00467
00468 #include "itemsync.moc"