akonadi
resourcescheduler.cpp
00001 /* 00002 Copyright (c) 2007 Volker Krause <vkrause@kde.org> 00003 00004 This library is free software; you can redistribute it and/or modify it 00005 under the terms of the GNU Library General Public License as published by 00006 the Free Software Foundation; either version 2 of the License, or (at your 00007 option) any later version. 00008 00009 This library is distributed in the hope that it will be useful, but WITHOUT 00010 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 00011 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public 00012 License for more details. 00013 00014 You should have received a copy of the GNU Library General Public License 00015 along with this library; see the file COPYING.LIB. If not, write to the 00016 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 00017 02110-1301, USA. 00018 */ 00019 00020 #include "resourcescheduler_p.h" 00021 00022 #include "dbusconnectionpool.h" 00023 00024 #include <kdebug.h> 00025 #include <klocale.h> 00026 00027 #include <QtCore/QTimer> 00028 #include <QtDBus/QDBusInterface> 00029 #include <QtDBus/QDBusConnectionInterface> 00030 #include <boost/graph/graph_concepts.hpp> 00031 00032 using namespace Akonadi; 00033 00034 qint64 ResourceScheduler::Task::latestSerial = 0; 00035 static QDBusAbstractInterface *s_resourcetracker = 0; 00036 00037 //@cond PRIVATE 00038 00039 ResourceScheduler::ResourceScheduler( QObject *parent ) : 00040 QObject( parent ), 00041 mCurrentTasksQueue( -1 ), 00042 mOnline( false ) 00043 { 00044 } 00045 00046 void ResourceScheduler::scheduleFullSync() 00047 { 00048 Task t; 00049 t.type = SyncAll; 00050 TaskList& queue = queueForTaskType( t.type ); 00051 if ( queue.contains( t ) || mCurrentTask == t ) 00052 return; 00053 queue << t; 00054 signalTaskToTracker( t, "SyncAll" ); 00055 scheduleNext(); 00056 } 00057 00058 void ResourceScheduler::scheduleCollectionTreeSync() 00059 { 00060 Task t; 00061 t.type = SyncCollectionTree; 00062 TaskList& queue = queueForTaskType( t.type ); 00063 if ( queue.contains( t ) || mCurrentTask == t ) 00064 return; 00065 queue << t; 00066 signalTaskToTracker( t, "SyncCollectionTree" ); 00067 scheduleNext(); 00068 } 00069 00070 void ResourceScheduler::scheduleSync(const Collection & col) 00071 { 00072 Task t; 00073 t.type = SyncCollection; 00074 t.collection = col; 00075 TaskList& queue = queueForTaskType( t.type ); 00076 if ( queue.contains( t ) || mCurrentTask == t ) 00077 return; 00078 queue << t; 00079 signalTaskToTracker( t, "SyncCollection" ); 00080 scheduleNext(); 00081 } 00082 00083 void ResourceScheduler::scheduleAttributesSync( const Collection &collection ) 00084 { 00085 Task t; 00086 t.type = SyncCollectionAttributes; 00087 t.collection = collection; 00088 00089 TaskList& queue = queueForTaskType( t.type ); 00090 if ( queue.contains( t ) || mCurrentTask == t ) 00091 return; 00092 queue << t; 00093 signalTaskToTracker( t, "SyncCollectionAttributes" ); 00094 scheduleNext(); 00095 } 00096 00097 void ResourceScheduler::scheduleItemFetch(const Item & item, const QSet<QByteArray> &parts, const QDBusMessage & msg) 00098 { 00099 Task t; 00100 t.type = FetchItem; 00101 t.item = item; 00102 t.itemParts = parts; 00103 00104 // if the current task does already fetch the requested item, break here but 00105 // keep the dbus message, so we can send the reply later on 00106 if ( mCurrentTask == t ) { 00107 mCurrentTask.dbusMsgs << msg; 00108 return; 00109 } 00110 00111 // If this task is already in the queue, merge with it. 00112 TaskList& queue = queueForTaskType( t.type ); 00113 const int idx = queue.indexOf( t ); 00114 if ( idx != -1 ) { 00115 queue[ idx ].dbusMsgs << msg; 00116 return; 00117 } 00118 00119 t.dbusMsgs << msg; 00120 queue << t; 00121 signalTaskToTracker( t, "FetchItem" ); 00122 scheduleNext(); 00123 } 00124 00125 void ResourceScheduler::scheduleResourceCollectionDeletion() 00126 { 00127 Task t; 00128 t.type = DeleteResourceCollection; 00129 TaskList& queue = queueForTaskType( t.type ); 00130 if ( queue.contains( t ) || mCurrentTask == t ) 00131 return; 00132 queue << t; 00133 signalTaskToTracker( t, "DeleteResourceCollection" ); 00134 scheduleNext(); 00135 } 00136 00137 void ResourceScheduler::scheduleChangeReplay() 00138 { 00139 Task t; 00140 t.type = ChangeReplay; 00141 TaskList& queue = queueForTaskType( t.type ); 00142 // see ResourceBase::changeProcessed() for why we do not check for mCurrentTask == t here like in the other tasks 00143 if ( queue.contains( t ) ) 00144 return; 00145 queue << t; 00146 signalTaskToTracker( t, "ChangeReplay" ); 00147 scheduleNext(); 00148 } 00149 00150 void Akonadi::ResourceScheduler::scheduleFullSyncCompletion() 00151 { 00152 Task t; 00153 t.type = SyncAllDone; 00154 TaskList& queue = queueForTaskType( t.type ); 00155 // no compression here, all this does is emitting a D-Bus signal anyway, and compression can trigger races on the receiver side with the signal being lost 00156 queue << t; 00157 signalTaskToTracker( t, "SyncAllDone" ); 00158 scheduleNext(); 00159 } 00160 00161 void Akonadi::ResourceScheduler::scheduleCustomTask( QObject *receiver, const char* methodName, const QVariant &argument, ResourceBase::SchedulePriority priority ) 00162 { 00163 Task t; 00164 t.type = Custom; 00165 t.receiver = receiver; 00166 t.methodName = methodName; 00167 t.argument = argument; 00168 QueueType queueType = GenericTaskQueue; 00169 if ( priority == ResourceBase::AfterChangeReplay ) 00170 queueType = AfterChangeReplayQueue; 00171 else if ( priority == ResourceBase::Prepend ) 00172 queueType = PrependTaskQueue; 00173 TaskList& queue = mTaskList[ queueType ]; 00174 00175 if ( queue.contains( t ) ) 00176 return; 00177 00178 switch (priority) { 00179 case ResourceBase::Prepend: 00180 queue.prepend( t ); 00181 break; 00182 default: 00183 queue.append(t); 00184 break; 00185 } 00186 00187 signalTaskToTracker( t, "Custom-" + t.methodName ); 00188 scheduleNext(); 00189 } 00190 00191 void ResourceScheduler::taskDone() 00192 { 00193 if ( isEmpty() ) 00194 emit status( AgentBase::Idle, i18nc( "@info:status Application ready for work", "Ready" ) ); 00195 00196 if ( s_resourcetracker ) { 00197 QList<QVariant> argumentList; 00198 argumentList << QString::number( mCurrentTask.serial ) 00199 << QString(); 00200 s_resourcetracker->asyncCallWithArgumentList(QLatin1String( "jobEnded" ), argumentList); 00201 } 00202 00203 mCurrentTask = Task(); 00204 mCurrentTasksQueue = -1; 00205 scheduleNext(); 00206 } 00207 00208 void ResourceScheduler::deferTask() 00209 { 00210 if ( mCurrentTask.type == Invalid ) 00211 return; 00212 00213 if ( s_resourcetracker ) { 00214 QList<QVariant> argumentList; 00215 argumentList << QString::number( mCurrentTask.serial ) 00216 << QString(); 00217 s_resourcetracker->asyncCallWithArgumentList(QLatin1String( "jobEnded" ), argumentList); 00218 } 00219 00220 Task t = mCurrentTask; 00221 mCurrentTask = Task(); 00222 00223 Q_ASSERT( mCurrentTasksQueue >= 0 && mCurrentTasksQueue < NQueueCount ); 00224 mTaskList[mCurrentTasksQueue].prepend( t ); 00225 mCurrentTasksQueue = -1; 00226 00227 signalTaskToTracker( t, "DeferedTask" ); 00228 00229 scheduleNext(); 00230 } 00231 00232 bool ResourceScheduler::isEmpty() 00233 { 00234 for ( int i = 0; i < NQueueCount; ++i ) { 00235 if ( !mTaskList[i].isEmpty() ) 00236 return false; 00237 } 00238 return true; 00239 } 00240 00241 void ResourceScheduler::scheduleNext() 00242 { 00243 if ( mCurrentTask.type != Invalid || isEmpty() || !mOnline ) 00244 return; 00245 QTimer::singleShot( 0, this, SLOT( executeNext() ) ); 00246 } 00247 00248 void ResourceScheduler::executeNext() 00249 { 00250 if ( mCurrentTask.type != Invalid || isEmpty() ) 00251 return; 00252 00253 for ( int i = 0; i < NQueueCount; ++i ) { 00254 if ( !mTaskList[ i ].isEmpty() ) { 00255 mCurrentTask = mTaskList[ i ].takeFirst(); 00256 mCurrentTasksQueue = i; 00257 break; 00258 } 00259 } 00260 00261 if ( s_resourcetracker ) { 00262 QList<QVariant> argumentList; 00263 argumentList << QString::number( mCurrentTask.serial ); 00264 s_resourcetracker->asyncCallWithArgumentList(QLatin1String( "jobStarted" ), argumentList); 00265 } 00266 00267 switch ( mCurrentTask.type ) { 00268 case SyncAll: 00269 emit executeFullSync(); 00270 break; 00271 case SyncCollectionTree: 00272 emit executeCollectionTreeSync(); 00273 break; 00274 case SyncCollection: 00275 emit executeCollectionSync( mCurrentTask.collection ); 00276 break; 00277 case SyncCollectionAttributes: 00278 emit executeCollectionAttributesSync( mCurrentTask.collection ); 00279 break; 00280 case FetchItem: 00281 emit executeItemFetch( mCurrentTask.item, mCurrentTask.itemParts ); 00282 break; 00283 case DeleteResourceCollection: 00284 emit executeResourceCollectionDeletion(); 00285 break; 00286 case ChangeReplay: 00287 emit executeChangeReplay(); 00288 break; 00289 case SyncAllDone: 00290 emit fullSyncComplete(); 00291 break; 00292 case Custom: 00293 { 00294 const QByteArray methodSig = mCurrentTask.methodName + "(QVariant)"; 00295 const bool hasSlotWithVariant = mCurrentTask.receiver->metaObject()->indexOfMethod(methodSig) != -1; 00296 bool success = false; 00297 if ( hasSlotWithVariant ) { 00298 success = QMetaObject::invokeMethod( mCurrentTask.receiver, mCurrentTask.methodName, Q_ARG(QVariant, mCurrentTask.argument) ); 00299 Q_ASSERT_X( success || !mCurrentTask.argument.isValid(), "ResourceScheduler::executeNext", "Valid argument was provided but the method wasn't found" ); 00300 } 00301 if ( !success ) 00302 success = QMetaObject::invokeMethod( mCurrentTask.receiver, mCurrentTask.methodName ); 00303 00304 if ( !success ) 00305 kError() << "Could not invoke slot" << mCurrentTask.methodName << "on" << mCurrentTask.receiver << "with argument" << mCurrentTask.argument; 00306 break; 00307 } 00308 default: { 00309 kError() << "Unhandled task type" << mCurrentTask.type; 00310 dump(); 00311 Q_ASSERT( false ); 00312 } 00313 } 00314 } 00315 00316 ResourceScheduler::Task ResourceScheduler::currentTask() const 00317 { 00318 return mCurrentTask; 00319 } 00320 00321 void ResourceScheduler::setOnline(bool state) 00322 { 00323 if ( mOnline == state ) 00324 return; 00325 mOnline = state; 00326 if ( mOnline ) { 00327 scheduleNext(); 00328 } else { 00329 if ( mCurrentTask.type != Invalid ) { 00330 // abort running task 00331 queueForTaskType( mCurrentTask.type ).prepend( mCurrentTask ); 00332 mCurrentTask = Task(); 00333 mCurrentTasksQueue = -1; 00334 } 00335 // abort pending synchronous tasks, might take longer until the resource goes online again 00336 TaskList& itemFetchQueue = queueForTaskType( FetchItem ); 00337 for ( QList< Task >::iterator it = itemFetchQueue.begin(); it != itemFetchQueue.end(); ) { 00338 if ( (*it).type == FetchItem ) { 00339 (*it).sendDBusReplies( false ); 00340 it = itemFetchQueue.erase( it ); 00341 if ( s_resourcetracker ) { 00342 QList<QVariant> argumentList; 00343 argumentList << QString::number( mCurrentTask.serial ) 00344 << QLatin1String( "Job canceled." ); 00345 s_resourcetracker->asyncCallWithArgumentList( QLatin1String( "jobEnded" ), argumentList ); 00346 } 00347 } else { 00348 ++it; 00349 } 00350 } 00351 } 00352 } 00353 00354 void ResourceScheduler::signalTaskToTracker( const Task &task, const QByteArray &taskType ) 00355 { 00356 // if there's a job tracer running, tell it about the new job 00357 if ( !s_resourcetracker && DBusConnectionPool::threadConnection().interface()->isServiceRegistered(QLatin1String( "org.kde.akonadiconsole" ) ) ) { 00358 s_resourcetracker = new QDBusInterface( QLatin1String( "org.kde.akonadiconsole" ), 00359 QLatin1String( "/resourcesJobtracker" ), 00360 QLatin1String( "org.freedesktop.Akonadi.JobTracker" ), 00361 DBusConnectionPool::threadConnection(), 0 ); 00362 } 00363 00364 if ( s_resourcetracker ) { 00365 QList<QVariant> argumentList; 00366 argumentList << static_cast<AgentBase*>( parent() )->identifier() 00367 << QString::number( task.serial ) 00368 << QString() 00369 << QString::fromLatin1( taskType ); 00370 s_resourcetracker->asyncCallWithArgumentList(QLatin1String( "jobCreated" ), argumentList); 00371 } 00372 } 00373 00374 void ResourceScheduler::collectionRemoved( const Akonadi::Collection &collection ) 00375 { 00376 if ( !collection.isValid() ) // should not happen, but you never know... 00377 return; 00378 TaskList& queue = queueForTaskType( SyncCollection ); 00379 for ( QList<Task>::iterator it = queue.begin(); it != queue.end(); ) { 00380 if ( (*it).type == SyncCollection && (*it).collection == collection ) { 00381 it = queue.erase( it ); 00382 kDebug() << " erasing"; 00383 } else 00384 ++it; 00385 } 00386 } 00387 00388 void ResourceScheduler::Task::sendDBusReplies( bool success ) 00389 { 00390 Q_FOREACH( const QDBusMessage &msg, dbusMsgs ) { 00391 QDBusMessage reply( msg ); 00392 reply << success; 00393 DBusConnectionPool::threadConnection().send( reply ); 00394 } 00395 } 00396 00397 ResourceScheduler::QueueType ResourceScheduler::queueTypeForTaskType( TaskType type ) 00398 { 00399 switch( type ) { 00400 case ChangeReplay: 00401 return ChangeReplayQueue; 00402 case FetchItem: 00403 return ItemFetchQueue; 00404 default: 00405 return GenericTaskQueue; 00406 } 00407 } 00408 00409 ResourceScheduler::TaskList& ResourceScheduler::queueForTaskType( TaskType type ) 00410 { 00411 const QueueType qt = queueTypeForTaskType( type ); 00412 return mTaskList[ qt ]; 00413 } 00414 00415 void ResourceScheduler::dump() 00416 { 00417 kDebug() << "ResourceScheduler: Online:" << mOnline; 00418 kDebug() << " current task:" << mCurrentTask; 00419 for ( int i = 0; i < NQueueCount; ++i ) { 00420 const TaskList& queue = mTaskList[i]; 00421 kDebug() << " queue" << i << queue.size() << "tasks:"; 00422 for ( QList<Task>::const_iterator it = queue.begin(); it != queue.end(); ++it ) { 00423 kDebug() << " " << (*it); 00424 } 00425 } 00426 } 00427 00428 void ResourceScheduler::clear() 00429 { 00430 kDebug() << "Clearing ResourceScheduler queues:"; 00431 for ( int i = 0; i < NQueueCount; ++i ) { 00432 TaskList& queue = mTaskList[i]; 00433 queue.clear(); 00434 } 00435 mCurrentTask = Task(); 00436 mCurrentTasksQueue = -1; 00437 } 00438 00439 void Akonadi::ResourceScheduler::cancelQueues() 00440 { 00441 for ( int i = 0; i < NQueueCount; ++i ) { 00442 TaskList& queue = mTaskList[i]; 00443 if ( s_resourcetracker ) { 00444 foreach ( const Task &t, queue ) { 00445 QList<QVariant> argumentList; 00446 argumentList << QString::number( t.serial ) << QString(); 00447 s_resourcetracker->asyncCallWithArgumentList(QLatin1String( "jobEnded" ), argumentList); 00448 } 00449 } 00450 queue.clear(); 00451 } 00452 } 00453 00454 static const char s_taskTypes[][25] = { 00455 "Invalid", 00456 "SyncAll", 00457 "SyncCollectionTree", 00458 "SyncCollection", 00459 "SyncCollectionAttributes", 00460 "FetchItem", 00461 "ChangeReplay", 00462 "DeleteResourceCollection", 00463 "SyncAllDone", 00464 "Custom" 00465 }; 00466 00467 QDebug Akonadi::operator<<( QDebug d, const ResourceScheduler::Task& task ) 00468 { 00469 d << task.serial << s_taskTypes[task.type]; 00470 if ( task.type != ResourceScheduler::Invalid ) { 00471 if ( task.collection.isValid() ) 00472 d << "collection" << task.collection.id(); 00473 if ( task.item.id() != -1 ) 00474 d << "item" << task.item.id(); 00475 if ( !task.methodName.isEmpty() ) 00476 d << task.methodName << task.argument; 00477 } 00478 return d; 00479 } 00480 00481 //@endcond 00482 00483 #include "resourcescheduler_p.moc"