akonadi
resourcescheduler.cpp
00001 /* 00002 Copyright (c) 2007 Volker Krause <vkrause@kde.org> 00003 00004 This library is free software; you can redistribute it and/or modify it 00005 under the terms of the GNU Library General Public License as published by 00006 the Free Software Foundation; either version 2 of the License, or (at your 00007 option) any later version. 00008 00009 This library is distributed in the hope that it will be useful, but WITHOUT 00010 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 00011 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public 00012 License for more details. 00013 00014 You should have received a copy of the GNU Library General Public License 00015 along with this library; see the file COPYING.LIB. If not, write to the 00016 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 00017 02110-1301, USA. 00018 */ 00019 00020 #include "resourcescheduler_p.h" 00021 00022 #include "dbusconnectionpool.h" 00023 00024 #include <kdebug.h> 00025 #include <klocale.h> 00026 00027 #include <QtCore/QTimer> 00028 #include <QtDBus/QDBusInterface> 00029 #include <QtDBus/QDBusConnectionInterface> 00030 #include <boost/graph/graph_concepts.hpp> 00031 00032 using namespace Akonadi; 00033 00034 qint64 ResourceScheduler::Task::latestSerial = 0; 00035 static QDBusAbstractInterface *s_resourcetracker = 0; 00036 00037 //@cond PRIVATE 00038 00039 ResourceScheduler::ResourceScheduler( QObject *parent ) : 00040 QObject( parent ), 00041 mCurrentTasksQueue( -1 ), 00042 mOnline( false ) 00043 { 00044 } 00045 00046 void ResourceScheduler::scheduleFullSync() 00047 { 00048 Task t; 00049 t.type = SyncAll; 00050 TaskList& queue = queueForTaskType( t.type ); 00051 if ( queue.contains( t ) || mCurrentTask == t ) 00052 return; 00053 queue << t; 00054 signalTaskToTracker( t, "SyncAll" ); 00055 scheduleNext(); 00056 } 00057 00058 void ResourceScheduler::scheduleCollectionTreeSync() 00059 { 00060 Task t; 00061 t.type = SyncCollectionTree; 00062 TaskList& queue = queueForTaskType( t.type ); 00063 if ( queue.contains( t ) || mCurrentTask == t ) 00064 return; 00065 queue << t; 00066 signalTaskToTracker( t, "SyncCollectionTree" ); 00067 scheduleNext(); 00068 } 00069 00070 void ResourceScheduler::scheduleSync(const Collection & col) 00071 { 00072 Task t; 00073 t.type = SyncCollection; 00074 t.collection = col; 00075 TaskList& queue = queueForTaskType( t.type ); 00076 if ( queue.contains( t ) || mCurrentTask == t ) 00077 return; 00078 queue << t; 00079 signalTaskToTracker( t, "SyncCollection" ); 00080 scheduleNext(); 00081 } 00082 00083 void ResourceScheduler::scheduleAttributesSync( const Collection &collection ) 00084 { 00085 Task t; 00086 t.type = SyncCollectionAttributes; 00087 t.collection = collection; 00088 00089 TaskList& queue = queueForTaskType( t.type ); 00090 if ( queue.contains( t ) || mCurrentTask == t ) 00091 return; 00092 queue << t; 00093 signalTaskToTracker( t, "SyncCollectionAttributes" ); 00094 scheduleNext(); 00095 } 00096 00097 void ResourceScheduler::scheduleItemFetch(const Item & item, const QSet<QByteArray> &parts, const QDBusMessage & msg) 00098 { 00099 Task t; 00100 t.type = FetchItem; 00101 t.item = item; 00102 t.itemParts = parts; 00103 00104 // if the current task does already fetch the requested item, break here but 00105 // keep the dbus message, so we can send the reply later on 00106 if ( mCurrentTask == t ) { 00107 mCurrentTask.dbusMsgs << msg; 00108 return; 00109 } 00110 00111 // If this task is already in the queue, merge with it. 00112 TaskList& queue = queueForTaskType( t.type ); 00113 const int idx = queue.indexOf( t ); 00114 if ( idx != -1 ) { 00115 queue[ idx ].dbusMsgs << msg; 00116 return; 00117 } 00118 00119 t.dbusMsgs << msg; 00120 queue << t; 00121 signalTaskToTracker( t, "FetchItem" ); 00122 scheduleNext(); 00123 } 00124 00125 void ResourceScheduler::scheduleResourceCollectionDeletion() 00126 { 00127 Task t; 00128 t.type = DeleteResourceCollection; 00129 TaskList& queue = queueForTaskType( t.type ); 00130 if ( queue.contains( t ) || mCurrentTask == t ) 00131 return; 00132 queue << t; 00133 signalTaskToTracker( t, "DeleteResourceCollection" ); 00134 scheduleNext(); 00135 } 00136 00137 void ResourceScheduler::scheduleCacheInvalidation( const Collection &collection ) 00138 { 00139 Task t; 00140 t.type = InvalideCacheForCollection; 00141 t.collection = collection; 00142 TaskList& queue = queueForTaskType( t.type ); 00143 if ( queue.contains( t ) || mCurrentTask == t ) 00144 return; 00145 queue << t; 00146 signalTaskToTracker( t, "InvalideCacheForCollection" ); 00147 scheduleNext(); 00148 } 00149 00150 void ResourceScheduler::scheduleChangeReplay() 00151 { 00152 Task t; 00153 t.type = ChangeReplay; 00154 TaskList& queue = queueForTaskType( t.type ); 00155 // see ResourceBase::changeProcessed() for why we do not check for mCurrentTask == t here like in the other tasks 00156 if ( queue.contains( t ) ) 00157 return; 00158 queue << t; 00159 signalTaskToTracker( t, "ChangeReplay" ); 00160 scheduleNext(); 00161 } 00162 00163 void Akonadi::ResourceScheduler::scheduleFullSyncCompletion() 00164 { 00165 Task t; 00166 t.type = SyncAllDone; 00167 TaskList& queue = queueForTaskType( t.type ); 00168 // no compression here, all this does is emitting a D-Bus signal anyway, and compression can trigger races on the receiver side with the signal being lost 00169 queue << t; 00170 signalTaskToTracker( t, "SyncAllDone" ); 00171 scheduleNext(); 00172 } 00173 00174 void Akonadi::ResourceScheduler::scheduleCollectionTreeSyncCompletion() 00175 { 00176 Task t; 00177 t.type = SyncCollectionTreeDone; 00178 TaskList& queue = queueForTaskType( t.type ); 00179 // no compression here, all this does is emitting a D-Bus signal anyway, and compression can trigger races on the receiver side with the signal being lost 00180 queue << t; 00181 signalTaskToTracker( t, "SyncCollectionTreeDone" ); 00182 scheduleNext(); 00183 } 00184 00185 void Akonadi::ResourceScheduler::scheduleCustomTask( QObject *receiver, const char* methodName, const QVariant &argument, ResourceBase::SchedulePriority priority ) 00186 { 00187 Task t; 00188 t.type = Custom; 00189 t.receiver = receiver; 00190 t.methodName = methodName; 00191 t.argument = argument; 00192 QueueType queueType = GenericTaskQueue; 00193 if ( priority == ResourceBase::AfterChangeReplay ) 00194 queueType = AfterChangeReplayQueue; 00195 else if ( priority == ResourceBase::Prepend ) 00196 queueType = PrependTaskQueue; 00197 TaskList& queue = mTaskList[ queueType ]; 00198 00199 if ( queue.contains( t ) ) 00200 return; 00201 00202 switch (priority) { 00203 case ResourceBase::Prepend: 00204 queue.prepend( t ); 00205 break; 00206 default: 00207 queue.append(t); 00208 break; 00209 } 00210 00211 signalTaskToTracker( t, "Custom-" + t.methodName ); 00212 scheduleNext(); 00213 } 00214 00215 void ResourceScheduler::taskDone() 00216 { 00217 if ( isEmpty() ) 00218 emit status( AgentBase::Idle, i18nc( "@info:status Application ready for work", "Ready" ) ); 00219 00220 if ( s_resourcetracker ) { 00221 QList<QVariant> argumentList; 00222 argumentList << QString::number( mCurrentTask.serial ) 00223 << QString(); 00224 s_resourcetracker->asyncCallWithArgumentList(QLatin1String( "jobEnded" ), argumentList); 00225 } 00226 00227 mCurrentTask = Task(); 00228 mCurrentTasksQueue = -1; 00229 scheduleNext(); 00230 } 00231 00232 void ResourceScheduler::deferTask() 00233 { 00234 if ( mCurrentTask.type == Invalid ) 00235 return; 00236 00237 if ( s_resourcetracker ) { 00238 QList<QVariant> argumentList; 00239 argumentList << QString::number( mCurrentTask.serial ) 00240 << QString(); 00241 s_resourcetracker->asyncCallWithArgumentList(QLatin1String( "jobEnded" ), argumentList); 00242 } 00243 00244 Task t = mCurrentTask; 00245 mCurrentTask = Task(); 00246 00247 Q_ASSERT( mCurrentTasksQueue >= 0 && mCurrentTasksQueue < NQueueCount ); 00248 mTaskList[mCurrentTasksQueue].prepend( t ); 00249 mCurrentTasksQueue = -1; 00250 00251 signalTaskToTracker( t, "DeferedTask" ); 00252 00253 scheduleNext(); 00254 } 00255 00256 bool ResourceScheduler::isEmpty() 00257 { 00258 for ( int i = 0; i < NQueueCount; ++i ) { 00259 if ( !mTaskList[i].isEmpty() ) 00260 return false; 00261 } 00262 return true; 00263 } 00264 00265 void ResourceScheduler::scheduleNext() 00266 { 00267 if ( mCurrentTask.type != Invalid || isEmpty() || !mOnline ) 00268 return; 00269 QTimer::singleShot( 0, this, SLOT(executeNext()) ); 00270 } 00271 00272 void ResourceScheduler::executeNext() 00273 { 00274 if ( mCurrentTask.type != Invalid || isEmpty() ) 00275 return; 00276 00277 for ( int i = 0; i < NQueueCount; ++i ) { 00278 if ( !mTaskList[ i ].isEmpty() ) { 00279 mCurrentTask = mTaskList[ i ].takeFirst(); 00280 mCurrentTasksQueue = i; 00281 break; 00282 } 00283 } 00284 00285 if ( s_resourcetracker ) { 00286 QList<QVariant> argumentList; 00287 argumentList << QString::number( mCurrentTask.serial ); 00288 s_resourcetracker->asyncCallWithArgumentList(QLatin1String( "jobStarted" ), argumentList); 00289 } 00290 00291 switch ( mCurrentTask.type ) { 00292 case SyncAll: 00293 emit executeFullSync(); 00294 break; 00295 case SyncCollectionTree: 00296 emit executeCollectionTreeSync(); 00297 break; 00298 case SyncCollection: 00299 emit executeCollectionSync( mCurrentTask.collection ); 00300 break; 00301 case SyncCollectionAttributes: 00302 emit executeCollectionAttributesSync( mCurrentTask.collection ); 00303 break; 00304 case FetchItem: 00305 emit executeItemFetch( mCurrentTask.item, mCurrentTask.itemParts ); 00306 break; 00307 case DeleteResourceCollection: 00308 emit executeResourceCollectionDeletion(); 00309 break; 00310 case InvalideCacheForCollection: 00311 emit executeCacheInvalidation( mCurrentTask.collection ); 00312 break; 00313 case ChangeReplay: 00314 emit executeChangeReplay(); 00315 break; 00316 case SyncAllDone: 00317 emit fullSyncComplete(); 00318 break; 00319 case SyncCollectionTreeDone: 00320 emit collectionTreeSyncComplete(); 00321 break; 00322 case Custom: 00323 { 00324 const QByteArray methodSig = mCurrentTask.methodName + "(QVariant)"; 00325 const bool hasSlotWithVariant = mCurrentTask.receiver->metaObject()->indexOfMethod(methodSig) != -1; 00326 bool success = false; 00327 if ( hasSlotWithVariant ) { 00328 success = QMetaObject::invokeMethod( mCurrentTask.receiver, mCurrentTask.methodName, Q_ARG(QVariant, mCurrentTask.argument) ); 00329 Q_ASSERT_X( success || !mCurrentTask.argument.isValid(), "ResourceScheduler::executeNext", "Valid argument was provided but the method wasn't found" ); 00330 } 00331 if ( !success ) 00332 success = QMetaObject::invokeMethod( mCurrentTask.receiver, mCurrentTask.methodName ); 00333 00334 if ( !success ) 00335 kError() << "Could not invoke slot" << mCurrentTask.methodName << "on" << mCurrentTask.receiver << "with argument" << mCurrentTask.argument; 00336 break; 00337 } 00338 default: { 00339 kError() << "Unhandled task type" << mCurrentTask.type; 00340 dump(); 00341 Q_ASSERT( false ); 00342 } 00343 } 00344 } 00345 00346 ResourceScheduler::Task ResourceScheduler::currentTask() const 00347 { 00348 return mCurrentTask; 00349 } 00350 00351 void ResourceScheduler::setOnline(bool state) 00352 { 00353 if ( mOnline == state ) 00354 return; 00355 mOnline = state; 00356 if ( mOnline ) { 00357 scheduleNext(); 00358 } else { 00359 if ( mCurrentTask.type != Invalid ) { 00360 // abort running task 00361 queueForTaskType( mCurrentTask.type ).prepend( mCurrentTask ); 00362 mCurrentTask = Task(); 00363 mCurrentTasksQueue = -1; 00364 } 00365 // abort pending synchronous tasks, might take longer until the resource goes online again 00366 TaskList& itemFetchQueue = queueForTaskType( FetchItem ); 00367 for ( QList< Task >::iterator it = itemFetchQueue.begin(); it != itemFetchQueue.end(); ) { 00368 if ( (*it).type == FetchItem ) { 00369 (*it).sendDBusReplies( false ); 00370 it = itemFetchQueue.erase( it ); 00371 if ( s_resourcetracker ) { 00372 QList<QVariant> argumentList; 00373 argumentList << QString::number( mCurrentTask.serial ) 00374 << QLatin1String( "Job canceled." ); 00375 s_resourcetracker->asyncCallWithArgumentList( QLatin1String( "jobEnded" ), argumentList ); 00376 } 00377 } else { 00378 ++it; 00379 } 00380 } 00381 } 00382 } 00383 00384 void ResourceScheduler::signalTaskToTracker( const Task &task, const QByteArray &taskType ) 00385 { 00386 // if there's a job tracer running, tell it about the new job 00387 if ( !s_resourcetracker && DBusConnectionPool::threadConnection().interface()->isServiceRegistered(QLatin1String( "org.kde.akonadiconsole" ) ) ) { 00388 s_resourcetracker = new QDBusInterface( QLatin1String( "org.kde.akonadiconsole" ), 00389 QLatin1String( "/resourcesJobtracker" ), 00390 QLatin1String( "org.freedesktop.Akonadi.JobTracker" ), 00391 DBusConnectionPool::threadConnection(), 0 ); 00392 } 00393 00394 if ( s_resourcetracker ) { 00395 QList<QVariant> argumentList; 00396 argumentList << static_cast<AgentBase*>( parent() )->identifier() 00397 << QString::number( task.serial ) 00398 << QString() 00399 << QString::fromLatin1( taskType ); 00400 s_resourcetracker->asyncCallWithArgumentList(QLatin1String( "jobCreated" ), argumentList); 00401 } 00402 } 00403 00404 void ResourceScheduler::collectionRemoved( const Akonadi::Collection &collection ) 00405 { 00406 if ( !collection.isValid() ) // should not happen, but you never know... 00407 return; 00408 TaskList& queue = queueForTaskType( SyncCollection ); 00409 for ( QList<Task>::iterator it = queue.begin(); it != queue.end(); ) { 00410 if ( (*it).type == SyncCollection && (*it).collection == collection ) { 00411 it = queue.erase( it ); 00412 kDebug() << " erasing"; 00413 } else 00414 ++it; 00415 } 00416 } 00417 00418 void ResourceScheduler::Task::sendDBusReplies( bool success ) 00419 { 00420 Q_FOREACH( const QDBusMessage &msg, dbusMsgs ) { 00421 QDBusMessage reply( msg ); 00422 reply << success; 00423 DBusConnectionPool::threadConnection().send( reply ); 00424 } 00425 } 00426 00427 ResourceScheduler::QueueType ResourceScheduler::queueTypeForTaskType( TaskType type ) 00428 { 00429 switch( type ) { 00430 case ChangeReplay: 00431 return ChangeReplayQueue; 00432 case FetchItem: 00433 return ItemFetchQueue; 00434 default: 00435 return GenericTaskQueue; 00436 } 00437 } 00438 00439 ResourceScheduler::TaskList& ResourceScheduler::queueForTaskType( TaskType type ) 00440 { 00441 const QueueType qt = queueTypeForTaskType( type ); 00442 return mTaskList[ qt ]; 00443 } 00444 00445 void ResourceScheduler::dump() 00446 { 00447 kDebug() << dumpToString(); 00448 } 00449 00450 QString ResourceScheduler::dumpToString() const 00451 { 00452 QString ret; 00453 QTextStream str( &ret ); 00454 str << "ResourceScheduler: " << (mOnline?"Online":"Offline") << endl; 00455 str << " current task: " << mCurrentTask << endl; 00456 for ( int i = 0; i < NQueueCount; ++i ) { 00457 const TaskList& queue = mTaskList[i]; 00458 if (queue.isEmpty()) { 00459 str << " queue " << i << " is empty" << endl; 00460 } else { 00461 str << " queue " << i << " " << queue.size() << " tasks:" << endl; 00462 for ( QList<Task>::const_iterator it = queue.begin(); it != queue.end(); ++it ) { 00463 str << " " << (*it) << endl; 00464 } 00465 } 00466 } 00467 return ret; 00468 } 00469 00470 void ResourceScheduler::clear() 00471 { 00472 kDebug() << "Clearing ResourceScheduler queues:"; 00473 for ( int i = 0; i < NQueueCount; ++i ) { 00474 TaskList& queue = mTaskList[i]; 00475 queue.clear(); 00476 } 00477 mCurrentTask = Task(); 00478 mCurrentTasksQueue = -1; 00479 } 00480 00481 void Akonadi::ResourceScheduler::cancelQueues() 00482 { 00483 for ( int i = 0; i < NQueueCount; ++i ) { 00484 TaskList& queue = mTaskList[i]; 00485 if ( s_resourcetracker ) { 00486 foreach ( const Task &t, queue ) { 00487 QList<QVariant> argumentList; 00488 argumentList << QString::number( t.serial ) << QString(); 00489 s_resourcetracker->asyncCallWithArgumentList(QLatin1String( "jobEnded" ), argumentList); 00490 } 00491 } 00492 queue.clear(); 00493 } 00494 } 00495 00496 static const char s_taskTypes[][27] = { 00497 "Invalid (no task)", 00498 "SyncAll", 00499 "SyncCollectionTree", 00500 "SyncCollection", 00501 "SyncCollectionAttributes", 00502 "FetchItem", 00503 "ChangeReplay", 00504 "DeleteResourceCollection", 00505 "InvalideCacheForCollection", 00506 "SyncAllDone", 00507 "SyncCollectionTreeDone", 00508 "Custom" 00509 }; 00510 00511 QTextStream& Akonadi::operator<<( QTextStream& d, const ResourceScheduler::Task& task ) 00512 { 00513 d << task.serial << " " << s_taskTypes[task.type] << " "; 00514 if ( task.type != ResourceScheduler::Invalid ) { 00515 if ( task.collection.isValid() ) 00516 d << "collection " << task.collection.id() << " "; 00517 if ( task.item.id() != -1 ) 00518 d << "item " << task.item.id() << " "; 00519 if ( !task.methodName.isEmpty() ) 00520 d << task.methodName << " " << task.argument.toString(); 00521 } 00522 return d; 00523 } 00524 00525 QDebug Akonadi::operator<<( QDebug d, const ResourceScheduler::Task& task ) 00526 { 00527 QString s; 00528 QTextStream str( &s ); 00529 str << task; 00530 d << s; 00531 return d; 00532 } 00533 00534 //@endcond 00535 00536 #include "resourcescheduler_p.moc"
This file is part of the KDE documentation.
Documentation copyright © 1996-2012 The KDE developers.
Generated on Mon Aug 27 2012 22:09:24 by doxygen 1.7.5 written by Dimitri van Heesch, © 1997-2006
Documentation copyright © 1996-2012 The KDE developers.
Generated on Mon Aug 27 2012 22:09:24 by doxygen 1.7.5 written by Dimitri van Heesch, © 1997-2006
KDE's Doxygen guidelines are available online.