• Skip to content
  • Skip to link menu
KDE 4.3 API Reference
  • KDE API Reference
  • KDE-PIM Libraries
  • Sitemap
  • Contact Us
 

akonadi

itemsync.cpp

00001 /*
00002     Copyright (c) 2007 Tobias Koenig <tokoe@kde.org>
00003     Copyright (c) 2007 Volker Krause <vkrause@kde.org>
00004 
00005     This library is free software; you can redistribute it and/or modify it
00006     under the terms of the GNU Library General Public License as published by
00007     the Free Software Foundation; either version 2 of the License, or (at your
00008     option) any later version.
00009 
00010     This library is distributed in the hope that it will be useful, but WITHOUT
00011     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
00012     FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Library General Public
00013     License for more details.
00014 
00015     You should have received a copy of the GNU Library General Public License
00016     along with this library; see the file COPYING.LIB.  If not, write to the
00017     Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
00018     02110-1301, USA.
00019 */
00020 
00021 #include "itemsync.h"
00022 
00023 #include "collection.h"
00024 #include "item.h"
00025 #include "itemcreatejob.h"
00026 #include "itemdeletejob.h"
00027 #include "itemfetchjob.h"
00028 #include "itemmodifyjob.h"
00029 #include "transactionsequence.h"
00030 #include "itemfetchscope.h"
00031 
00032 #include <kdebug.h>
00033 
00034 #include <QtCore/QStringList>
00035 
00036 using namespace Akonadi;
00037 
00041 class ItemSync::Private
00042 {
00043   public:
00044     Private( ItemSync *parent ) :
00045       q( parent ),
00046       mTransactionMode( Single ),
00047       mCurrentTransaction( 0 ),
00048       mTransactionJobs( 0 ),
00049       mPendingJobs( 0 ),
00050       mProgress( 0 ),
00051       mTotalItems( -1 ),
00052       mTotalItemsProcessed( 0 ),
00053       mStreaming( false ),
00054       mIncremental( false ),
00055       mLocalListDone( false ),
00056       mDeliveryDone( false )
00057     {
00058       // we want to fetch all data by default
00059       mFetchScope.fetchFullPayload();
00060       mFetchScope.fetchAllAttributes();
00061     }
00062 
00063     void createLocalItem( const Item &item );
00064     void checkDone();
00065     void slotLocalListDone( KJob* );
00066     void slotLocalChangeDone( KJob* );
00067     void execute();
00068     void processItems();
00069     void deleteItems( const Item::List &items );
00070     void slotTransactionResult( KJob *job );
00071     Job* subjobParent() const;
00072 
00073     ItemSync *q;
00074     Collection mSyncCollection;
00075     QHash<Item::Id, Akonadi::Item> mLocalItemsById;
00076     QHash<QString, Akonadi::Item> mLocalItemsByRemoteId;
00077     QSet<Akonadi::Item> mUnprocessedLocalItems;
00078 
00079     // transaction mode, TODO: make this public API?
00080     enum TransactionMode {
00081       Single,
00082       Chunkwise,
00083       None
00084     };
00085     TransactionMode mTransactionMode;
00086     TransactionSequence *mCurrentTransaction;
00087     int mTransactionJobs;
00088 
00089     // fetch scope for initial item listing
00090     ItemFetchScope mFetchScope;
00091 
00092     // remote items
00093     Akonadi::Item::List mRemoteItems;
00094 
00095     // removed remote items
00096     Item::List mRemovedRemoteItems;
00097 
00098     // create counter
00099     int mPendingJobs;
00100     int mProgress;
00101     int mTotalItems;
00102     int mTotalItemsProcessed;
00103 
00104     bool mStreaming;
00105     bool mIncremental;
00106     bool mLocalListDone;
00107     bool mDeliveryDone;
00108 };
00109 
00110 void ItemSync::Private::createLocalItem( const Item & item )
00111 {
00112   mPendingJobs++;
00113   ItemCreateJob *create = new ItemCreateJob( item, mSyncCollection, subjobParent() );
00114   q->connect( create, SIGNAL( result( KJob* ) ), q, SLOT( slotLocalChangeDone( KJob* ) ) );
00115 }
00116 
00117 void ItemSync::Private::checkDone()
00118 {
00119   q->setProcessedAmount( KJob::Bytes, mProgress );
00120   if ( mPendingJobs > 0 || !mDeliveryDone || mTransactionJobs > 0 )
00121     return;
00122 
00123   q->emitResult();
00124 }
00125 
00126 ItemSync::ItemSync( const Collection &collection, QObject *parent ) :
00127     Job( parent ),
00128     d( new Private( this ) )
00129 {
00130   d->mSyncCollection = collection;
00131 }
00132 
00133 ItemSync::~ItemSync()
00134 {
00135   delete d;
00136 }
00137 
00138 void ItemSync::setFullSyncItems( const Item::List &items )
00139 {
00140   Q_ASSERT( !d->mIncremental );
00141   if ( !d->mStreaming )
00142     d->mDeliveryDone = true;
00143   d->mRemoteItems += items;
00144   d->mTotalItemsProcessed += items.count();
00145   kDebug() << "Received: " << items.count() << "In total: " << d->mTotalItemsProcessed << " Wanted: " << d->mTotalItems;
00146   setTotalAmount( KJob::Bytes, d->mTotalItemsProcessed );
00147   if ( d->mTotalItemsProcessed == d->mTotalItems )
00148     d->mDeliveryDone = true;
00149   d->execute();
00150 }
00151 
00152 void ItemSync::setTotalItems( int amount )
00153 {
00154   Q_ASSERT( !d->mIncremental );
00155   Q_ASSERT( amount >= 0 );
00156   setStreamingEnabled( true );
00157   kDebug() << amount;
00158   d->mTotalItems = amount;
00159   setTotalAmount( KJob::Bytes, amount );
00160   if ( d->mTotalItems == 0 ) {
00161     d->mDeliveryDone = true;
00162     d->execute();
00163   }
00164 }
00165 
00166 void ItemSync::setIncrementalSyncItems( const Item::List &changedItems, const Item::List &removedItems )
00167 {
00168   d->mIncremental = true;
00169   if ( !d->mStreaming )
00170     d->mDeliveryDone = true;
00171   d->mRemoteItems += changedItems;
00172   d->mRemovedRemoteItems += removedItems;
00173   d->mTotalItemsProcessed += changedItems.count() + removedItems.count();
00174   setTotalAmount( KJob::Bytes, d->mTotalItemsProcessed );
00175   if ( d->mTotalItemsProcessed == d->mTotalItems )
00176     d->mDeliveryDone = true;
00177   d->execute();
00178 }
00179 
00180 void ItemSync::setFetchScope( ItemFetchScope &fetchScope )
00181 {
00182   d->mFetchScope = fetchScope;
00183 }
00184 
00185 ItemFetchScope &ItemSync::fetchScope()
00186 {
00187   return d->mFetchScope;
00188 }
00189 
00190 void ItemSync::doStart()
00191 {
00192   ItemFetchJob* job = new ItemFetchJob( d->mSyncCollection, this );
00193   job->setFetchScope( d->mFetchScope );
00194 
00195   // we only can fetch parts already in the cache, otherwise this will deadlock
00196   job->fetchScope().setCacheOnly( true );
00197 
00198   connect( job, SIGNAL( result( KJob* ) ), SLOT( slotLocalListDone( KJob* ) ) );
00199 }
00200 
00201 bool ItemSync::updateItem( const Item &storedItem, Item &newItem )
00202 {
00203   /*
00204    * We know that this item has changed (as it is part of the
00205    * incremental changed list), so we just put it into the
00206    * storage.
00207    */
00208   if ( d->mIncremental )
00209     return true;
00210 
00211   // Check whether the flags differ
00212   if ( storedItem.flags() != newItem.flags() ) {
00213     kDebug( 5250 ) << "Stored flags "  << storedItem.flags()
00214                     << "new flags " << newItem.flags();
00215     return true;
00216   }
00217 
00218   // Check whether the new item contains unknown parts
00219   QSet<QByteArray> missingParts = storedItem.loadedPayloadParts();
00220   missingParts.subtract( newItem.loadedPayloadParts() );
00221   if ( !missingParts.isEmpty() )
00222     return true;
00223 
00224   // ### FIXME SLOW!!!
00225   // If the available part identifiers don't differ, check
00226   // whether the content of the payload differs
00227   if ( storedItem.payloadData() != newItem.payloadData() )
00228     return true;
00229 
00230   // check if remote attributes have been changed
00231   foreach ( Attribute* attr, newItem.attributes() ) {
00232     if ( !storedItem.hasAttribute( attr->type() ) )
00233       return true;
00234     if ( attr->serialized() != storedItem.attribute( attr->type() )->serialized() )
00235       return true;
00236   }
00237 
00238   return false;
00239 }
00240 
00241 void ItemSync::Private::slotLocalListDone( KJob * job )
00242 {
00243   if ( job->error() )
00244     return;
00245 
00246   const Item::List list = static_cast<ItemFetchJob*>( job )->items();
00247   foreach ( const Item &item, list ) {
00248     mLocalItemsById.insert( item.id(), item );
00249     mLocalItemsByRemoteId.insert( item.remoteId(), item );
00250     mUnprocessedLocalItems.insert( item );
00251   }
00252 
00253   mLocalListDone = true;
00254   execute();
00255 }
00256 
00257 void ItemSync::Private::execute()
00258 {
00259   if ( !mLocalListDone )
00260     return;
00261 
00262   if ( (mTransactionMode == Single && !mCurrentTransaction) || mTransactionMode == Chunkwise ) {
00263     ++mTransactionJobs;
00264     mCurrentTransaction = new TransactionSequence( q );
00265     connect( mCurrentTransaction, SIGNAL(result(KJob*)), q, SLOT(slotTransactionResult(KJob*)) );
00266   }
00267 
00268   processItems();
00269   if ( !mDeliveryDone ) {
00270     if ( mTransactionMode == Chunkwise && mCurrentTransaction ) {
00271       mCurrentTransaction->commit();
00272       mCurrentTransaction = 0;
00273     }
00274     return;
00275   }
00276 
00277   // removed
00278   if ( !mIncremental ) {
00279     mRemovedRemoteItems = mUnprocessedLocalItems.toList();
00280     mUnprocessedLocalItems.clear();
00281   }
00282 
00283   deleteItems( mRemovedRemoteItems );
00284   mLocalItemsById.clear();
00285   mLocalItemsByRemoteId.clear();
00286   mRemovedRemoteItems.clear();
00287 
00288   if ( mCurrentTransaction ) {
00289     mCurrentTransaction->commit();
00290     mCurrentTransaction = 0;
00291   }
00292 
00293   checkDone();
00294 }
00295 
00296 void ItemSync::Private::processItems()
00297 {
00298   // added / updated
00299   foreach ( Item remoteItem, mRemoteItems ) { //krazy:exclude=foreach non-const is needed here
00300 #ifndef NDEBUG
00301     if ( remoteItem.remoteId().isEmpty() ) {
00302       kWarning( 5250 ) << "Item " << remoteItem.id() << " does not have a remote identifier";
00303     }
00304 #endif
00305 
00306     Item localItem = mLocalItemsById.value( remoteItem.id() );
00307     if ( !localItem.isValid() )
00308       localItem = mLocalItemsByRemoteId.value( remoteItem.remoteId() );
00309     mUnprocessedLocalItems.remove( localItem );
00310     // missing locally
00311     if ( !localItem.isValid() ) {
00312       createLocalItem( remoteItem );
00313       continue;
00314     }
00315 
00316     if ( q->updateItem( localItem, remoteItem ) ) {
00317       mPendingJobs++;
00318 
00319       remoteItem.setId( localItem.id() );
00320       remoteItem.setRevision( localItem.revision() );
00321       remoteItem.setSize( localItem.size() );
00322       remoteItem.setRemoteId( localItem.remoteId() );  // in case someone clears remoteId by accident
00323       ItemModifyJob *mod = new ItemModifyJob( remoteItem, subjobParent() );
00324       q->connect( mod, SIGNAL( result( KJob* ) ), q, SLOT( slotLocalChangeDone( KJob* ) ) );
00325     } else {
00326       mProgress++;
00327     }
00328   }
00329   mRemoteItems.clear();
00330 }
00331 
00332 void ItemSync::Private::deleteItems( const Item::List &items )
00333 {
00334   foreach ( const Item &item, items ) {
00335     Item delItem( item );
00336     if ( !item.isValid() ) {
00337       delItem = mLocalItemsByRemoteId.value( item.remoteId() );
00338     }
00339 
00340     if ( !delItem.isValid() ) {
00341 #ifndef NDEBUG
00342       kWarning( 5250 ) << "Delete item (remoteeId=" << delItem.remoteId()
00343                        << "mimeType=" << delItem.mimeType()
00344                        << ") does not have a valid UID and no item with that remote ID exists either";
00345 #endif
00346       continue;
00347     }
00348 
00349     mPendingJobs++;
00350     ItemDeleteJob *job = new ItemDeleteJob( delItem, subjobParent() );
00351     q->connect( job, SIGNAL( result( KJob* ) ), q, SLOT( slotLocalChangeDone( KJob* ) ) );
00352   }
00353 }
00354 
00355 void ItemSync::Private::slotLocalChangeDone( KJob * job )
00356 {
00357   if ( job->error() )
00358     return;
00359 
00360   mPendingJobs--;
00361   mProgress++;
00362 
00363   checkDone();
00364 }
00365 
00366 void ItemSync::Private::slotTransactionResult( KJob *job )
00367 {
00368   if ( job->error() )
00369     return;
00370 
00371   --mTransactionJobs;
00372   if ( mCurrentTransaction == job )
00373     mCurrentTransaction = 0;
00374 
00375   checkDone();
00376 }
00377 
00378 Job * ItemSync::Private::subjobParent() const
00379 {
00380   if ( mCurrentTransaction && mTransactionMode != None )
00381     return mCurrentTransaction;
00382   return q;
00383 }
00384 
00385 void ItemSync::setStreamingEnabled(bool enable)
00386 {
00387   d->mStreaming = enable;
00388 }
00389 
00390 void ItemSync::deliveryDone()
00391 {
00392   Q_ASSERT( d->mStreaming );
00393   d->mDeliveryDone = true;
00394   d->execute();
00395 }
00396 
00397 #include "itemsync.moc"

akonadi

Skip menu "akonadi"
  • Main Page
  • Modules
  • Namespace List
  • Class Hierarchy
  • Alphabetical List
  • Class List
  • File List
  • Namespace Members
  • Class Members
  • Related Pages

KDE-PIM Libraries

Skip menu "KDE-PIM Libraries"
  • akonadi
  • kabc
  • kblog
  • kcal
  • kholidays
  • kimap
  • kioslave
  •   imap4
  •   mbox
  • kldap
  • kmime
  • kpimidentities
  • kpimtextedit
  •   richtextbuilders
  • kpimutils
  • kresources
  • ktnef
  • kxmlrpcclient
  • mailtransport
  • microblog
  • qgpgme
  • syndication
  •   atom
  •   rdf
  •   rss2
Generated for KDE-PIM Libraries by doxygen 1.5.9
This website is maintained by Adriaan de Groot and Allen Winter.
KDE® and the K Desktop Environment® logo are registered trademarks of KDE e.V. | Legal