00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #include "itemsync.h"
00022
00023 #include "collection.h"
00024 #include "item.h"
00025 #include "itemcreatejob.h"
00026 #include "itemdeletejob.h"
00027 #include "itemfetchjob.h"
00028 #include "itemmodifyjob.h"
00029 #include "transactionsequence.h"
00030 #include "itemfetchscope.h"
00031
00032 #include <kdebug.h>
00033
00034 #include <QtCore/QStringList>
00035
00036 using namespace Akonadi;
00037
00041 class ItemSync::Private
00042 {
00043 public:
00044 Private( ItemSync *parent ) :
00045 q( parent ),
00046 mTransactionMode( Single ),
00047 mCurrentTransaction( 0 ),
00048 mTransactionJobs( 0 ),
00049 mPendingJobs( 0 ),
00050 mProgress( 0 ),
00051 mTotalItems( -1 ),
00052 mTotalItemsProcessed( 0 ),
00053 mStreaming( false ),
00054 mIncremental( false ),
00055 mLocalListDone( false ),
00056 mDeliveryDone( false )
00057 {
00058
00059 mFetchScope.fetchFullPayload();
00060 mFetchScope.fetchAllAttributes();
00061 }
00062
00063 void createLocalItem( const Item &item );
00064 void checkDone();
00065 void slotLocalListDone( KJob* );
00066 void slotLocalChangeDone( KJob* );
00067 void execute();
00068 void processItems();
00069 void deleteItems( const Item::List &items );
00070 void slotTransactionResult( KJob *job );
00071 Job* subjobParent() const;
00072
00073 ItemSync *q;
00074 Collection mSyncCollection;
00075 QHash<Item::Id, Akonadi::Item> mLocalItemsById;
00076 QHash<QString, Akonadi::Item> mLocalItemsByRemoteId;
00077 QSet<Akonadi::Item> mUnprocessedLocalItems;
00078
00079
00080 enum TransactionMode {
00081 Single,
00082 Chunkwise,
00083 None
00084 };
00085 TransactionMode mTransactionMode;
00086 TransactionSequence *mCurrentTransaction;
00087 int mTransactionJobs;
00088
00089
00090 ItemFetchScope mFetchScope;
00091
00092
00093 Akonadi::Item::List mRemoteItems;
00094
00095
00096 Item::List mRemovedRemoteItems;
00097
00098
00099 int mPendingJobs;
00100 int mProgress;
00101 int mTotalItems;
00102 int mTotalItemsProcessed;
00103
00104 bool mStreaming;
00105 bool mIncremental;
00106 bool mLocalListDone;
00107 bool mDeliveryDone;
00108 };
00109
00110 void ItemSync::Private::createLocalItem( const Item & item )
00111 {
00112 mPendingJobs++;
00113 ItemCreateJob *create = new ItemCreateJob( item, mSyncCollection, subjobParent() );
00114 q->connect( create, SIGNAL( result( KJob* ) ), q, SLOT( slotLocalChangeDone( KJob* ) ) );
00115 }
00116
00117 void ItemSync::Private::checkDone()
00118 {
00119 q->setProcessedAmount( KJob::Bytes, mProgress );
00120 if ( mPendingJobs > 0 || !mDeliveryDone || mTransactionJobs > 0 )
00121 return;
00122
00123 q->emitResult();
00124 }
00125
00126 ItemSync::ItemSync( const Collection &collection, QObject *parent ) :
00127 Job( parent ),
00128 d( new Private( this ) )
00129 {
00130 d->mSyncCollection = collection;
00131 }
00132
00133 ItemSync::~ItemSync()
00134 {
00135 delete d;
00136 }
00137
00138 void ItemSync::setFullSyncItems( const Item::List &items )
00139 {
00140 Q_ASSERT( !d->mIncremental );
00141 if ( !d->mStreaming )
00142 d->mDeliveryDone = true;
00143 d->mRemoteItems += items;
00144 d->mTotalItemsProcessed += items.count();
00145 kDebug() << "Received: " << items.count() << "In total: " << d->mTotalItemsProcessed << " Wanted: " << d->mTotalItems;
00146 setTotalAmount( KJob::Bytes, d->mTotalItemsProcessed );
00147 if ( d->mTotalItemsProcessed == d->mTotalItems )
00148 d->mDeliveryDone = true;
00149 d->execute();
00150 }
00151
00152 void ItemSync::setTotalItems( int amount )
00153 {
00154 Q_ASSERT( !d->mIncremental );
00155 Q_ASSERT( amount >= 0 );
00156 setStreamingEnabled( true );
00157 kDebug() << amount;
00158 d->mTotalItems = amount;
00159 setTotalAmount( KJob::Bytes, amount );
00160 if ( d->mTotalItems == 0 ) {
00161 d->mDeliveryDone = true;
00162 d->execute();
00163 }
00164 }
00165
00166 void ItemSync::setIncrementalSyncItems( const Item::List &changedItems, const Item::List &removedItems )
00167 {
00168 d->mIncremental = true;
00169 if ( !d->mStreaming )
00170 d->mDeliveryDone = true;
00171 d->mRemoteItems += changedItems;
00172 d->mRemovedRemoteItems += removedItems;
00173 d->mTotalItemsProcessed += changedItems.count() + removedItems.count();
00174 setTotalAmount( KJob::Bytes, d->mTotalItemsProcessed );
00175 if ( d->mTotalItemsProcessed == d->mTotalItems )
00176 d->mDeliveryDone = true;
00177 d->execute();
00178 }
00179
00180 void ItemSync::setFetchScope( ItemFetchScope &fetchScope )
00181 {
00182 d->mFetchScope = fetchScope;
00183 }
00184
00185 ItemFetchScope &ItemSync::fetchScope()
00186 {
00187 return d->mFetchScope;
00188 }
00189
00190 void ItemSync::doStart()
00191 {
00192 ItemFetchJob* job = new ItemFetchJob( d->mSyncCollection, this );
00193 job->setFetchScope( d->mFetchScope );
00194
00195
00196 job->fetchScope().setCacheOnly( true );
00197
00198 connect( job, SIGNAL( result( KJob* ) ), SLOT( slotLocalListDone( KJob* ) ) );
00199 }
00200
00201 bool ItemSync::updateItem( const Item &storedItem, Item &newItem )
00202 {
00203
00204
00205
00206
00207
00208 if ( d->mIncremental )
00209 return true;
00210
00211
00212 if ( storedItem.flags() != newItem.flags() ) {
00213 kDebug() << "Stored flags " << storedItem.flags()
00214 << "new flags " << newItem.flags();
00215 return true;
00216 }
00217
00218
00219 QSet<QByteArray> missingParts = storedItem.loadedPayloadParts();
00220 missingParts.subtract( newItem.loadedPayloadParts() );
00221 if ( !missingParts.isEmpty() )
00222 return true;
00223
00224
00225
00226
00227 if ( storedItem.payloadData() != newItem.payloadData() )
00228 return true;
00229
00230
00231 foreach ( Attribute* attr, newItem.attributes() ) {
00232 if ( !storedItem.hasAttribute( attr->type() ) )
00233 return true;
00234 if ( attr->serialized() != storedItem.attribute( attr->type() )->serialized() )
00235 return true;
00236 }
00237
00238 return false;
00239 }
00240
00241 void ItemSync::Private::slotLocalListDone( KJob * job )
00242 {
00243 if ( job->error() )
00244 return;
00245
00246 const Item::List list = static_cast<ItemFetchJob*>( job )->items();
00247 foreach ( const Item &item, list ) {
00248 if ( item.remoteId().isEmpty() )
00249 continue;
00250 mLocalItemsById.insert( item.id(), item );
00251 mLocalItemsByRemoteId.insert( item.remoteId(), item );
00252 mUnprocessedLocalItems.insert( item );
00253 }
00254
00255 mLocalListDone = true;
00256 execute();
00257 }
00258
00259 void ItemSync::Private::execute()
00260 {
00261 if ( !mLocalListDone )
00262 return;
00263
00264 if ( (mTransactionMode == Single && !mCurrentTransaction) || mTransactionMode == Chunkwise ) {
00265 ++mTransactionJobs;
00266 mCurrentTransaction = new TransactionSequence( q );
00267 connect( mCurrentTransaction, SIGNAL(result(KJob*)), q, SLOT(slotTransactionResult(KJob*)) );
00268 }
00269
00270 processItems();
00271 if ( !mDeliveryDone ) {
00272 if ( mTransactionMode == Chunkwise && mCurrentTransaction ) {
00273 mCurrentTransaction->commit();
00274 mCurrentTransaction = 0;
00275 }
00276 return;
00277 }
00278
00279
00280 if ( !mIncremental ) {
00281 mRemovedRemoteItems = mUnprocessedLocalItems.toList();
00282 mUnprocessedLocalItems.clear();
00283 }
00284
00285 deleteItems( mRemovedRemoteItems );
00286 mLocalItemsById.clear();
00287 mLocalItemsByRemoteId.clear();
00288 mRemovedRemoteItems.clear();
00289
00290 if ( mCurrentTransaction ) {
00291 mCurrentTransaction->commit();
00292 mCurrentTransaction = 0;
00293 }
00294
00295 checkDone();
00296 }
00297
00298 void ItemSync::Private::processItems()
00299 {
00300
00301 foreach ( Item remoteItem, mRemoteItems ) {
00302 #ifndef NDEBUG
00303 if ( remoteItem.remoteId().isEmpty() ) {
00304 kWarning() << "Item " << remoteItem.id() << " does not have a remote identifier";
00305 }
00306 #endif
00307
00308 Item localItem = mLocalItemsById.value( remoteItem.id() );
00309 if ( !localItem.isValid() )
00310 localItem = mLocalItemsByRemoteId.value( remoteItem.remoteId() );
00311 mUnprocessedLocalItems.remove( localItem );
00312
00313 if ( !localItem.isValid() ) {
00314 createLocalItem( remoteItem );
00315 continue;
00316 }
00317
00318 if ( q->updateItem( localItem, remoteItem ) ) {
00319 mPendingJobs++;
00320
00321 remoteItem.setId( localItem.id() );
00322 remoteItem.setRevision( localItem.revision() );
00323 remoteItem.setSize( localItem.size() );
00324 remoteItem.setRemoteId( localItem.remoteId() );
00325 ItemModifyJob *mod = new ItemModifyJob( remoteItem, subjobParent() );
00326 q->connect( mod, SIGNAL( result( KJob* ) ), q, SLOT( slotLocalChangeDone( KJob* ) ) );
00327 } else {
00328 mProgress++;
00329 }
00330 }
00331 mRemoteItems.clear();
00332 }
00333
00334 void ItemSync::Private::deleteItems( const Item::List &items )
00335 {
00336 foreach ( const Item &item, items ) {
00337 Item delItem( item );
00338 if ( !item.isValid() ) {
00339 delItem = mLocalItemsByRemoteId.value( item.remoteId() );
00340 }
00341
00342 if ( !delItem.isValid() ) {
00343 #ifndef NDEBUG
00344 kWarning() << "Delete item (remoteeId=" << delItem.remoteId()
00345 << "mimeType=" << delItem.mimeType()
00346 << ") does not have a valid UID and no item with that remote ID exists either";
00347 #endif
00348 continue;
00349 }
00350
00351 mPendingJobs++;
00352 ItemDeleteJob *job = new ItemDeleteJob( delItem, subjobParent() );
00353 q->connect( job, SIGNAL( result( KJob* ) ), q, SLOT( slotLocalChangeDone( KJob* ) ) );
00354 }
00355 }
00356
00357 void ItemSync::Private::slotLocalChangeDone( KJob * job )
00358 {
00359 if ( job->error() )
00360 return;
00361
00362 mPendingJobs--;
00363 mProgress++;
00364
00365 checkDone();
00366 }
00367
00368 void ItemSync::Private::slotTransactionResult( KJob *job )
00369 {
00370 if ( job->error() )
00371 return;
00372
00373 --mTransactionJobs;
00374 if ( mCurrentTransaction == job )
00375 mCurrentTransaction = 0;
00376
00377 checkDone();
00378 }
00379
00380 Job * ItemSync::Private::subjobParent() const
00381 {
00382 if ( mCurrentTransaction && mTransactionMode != None )
00383 return mCurrentTransaction;
00384 return q;
00385 }
00386
00387 void ItemSync::setStreamingEnabled(bool enable)
00388 {
00389 d->mStreaming = enable;
00390 }
00391
00392 void ItemSync::deliveryDone()
00393 {
00394 Q_ASSERT( d->mStreaming );
00395 d->mDeliveryDone = true;
00396 d->execute();
00397 }
00398
00399 #include "itemsync.moc"