20 #include "resourcescheduler_p.h"
22 #include "dbusconnectionpool.h"
23 #include "recursivemover_p.h"
26 #include <klocalizedstring.h>
28 #include <QtCore/QTimer>
29 #include <QtDBus/QDBusInterface>
30 #include <QtDBus/QDBusConnectionInterface>
31 #include <boost/graph/graph_concepts.hpp>
33 using namespace Akonadi;
35 qint64 ResourceScheduler::Task::latestSerial = 0;
36 static QDBusAbstractInterface *s_resourcetracker = 0;
40 ResourceScheduler::ResourceScheduler( QObject *parent ) :
42 mCurrentTasksQueue( -1 ),
47 void ResourceScheduler::scheduleFullSync()
51 TaskList& queue = queueForTaskType( t.type );
52 if ( queue.contains( t ) || mCurrentTask == t )
55 signalTaskToTracker( t,
"SyncAll" );
59 void ResourceScheduler::scheduleCollectionTreeSync()
62 t.type = SyncCollectionTree;
63 TaskList& queue = queueForTaskType( t.type );
64 if ( queue.contains( t ) || mCurrentTask == t )
67 signalTaskToTracker( t,
"SyncCollectionTree" );
71 void ResourceScheduler::scheduleSync(
const Collection & col)
74 t.type = SyncCollection;
76 TaskList& queue = queueForTaskType( t.type );
77 if ( queue.contains( t ) || mCurrentTask == t )
80 signalTaskToTracker( t,
"SyncCollection" );
84 void ResourceScheduler::scheduleAttributesSync(
const Collection &collection )
87 t.type = SyncCollectionAttributes;
88 t.collection = collection;
90 TaskList& queue = queueForTaskType( t.type );
91 if ( queue.contains( t ) || mCurrentTask == t )
94 signalTaskToTracker( t,
"SyncCollectionAttributes" );
98 void ResourceScheduler::scheduleItemFetch(
const Item & item,
const QSet<QByteArray> &parts,
const QDBusMessage & msg)
107 if ( mCurrentTask == t ) {
108 mCurrentTask.dbusMsgs << msg;
113 TaskList& queue = queueForTaskType( t.type );
114 const int idx = queue.indexOf( t );
116 queue[ idx ].dbusMsgs << msg;
122 signalTaskToTracker( t,
"FetchItem" );
126 void ResourceScheduler::scheduleResourceCollectionDeletion()
129 t.type = DeleteResourceCollection;
130 TaskList& queue = queueForTaskType( t.type );
131 if ( queue.contains( t ) || mCurrentTask == t )
134 signalTaskToTracker( t,
"DeleteResourceCollection" );
138 void ResourceScheduler::scheduleCacheInvalidation(
const Collection &collection )
141 t.type = InvalideCacheForCollection;
142 t.collection = collection;
143 TaskList& queue = queueForTaskType( t.type );
144 if ( queue.contains( t ) || mCurrentTask == t )
147 signalTaskToTracker( t,
"InvalideCacheForCollection" );
151 void ResourceScheduler::scheduleChangeReplay()
154 t.type = ChangeReplay;
155 TaskList& queue = queueForTaskType( t.type );
157 if ( queue.contains( t ) )
160 signalTaskToTracker( t,
"ChangeReplay" );
167 t.type = RecursiveMoveReplay;
168 t.collection = movedCollection;
169 t.argument = QVariant::fromValue( mover );
170 TaskList &queue = queueForTaskType( t.type );
172 if ( queue.contains( t ) || mCurrentTask == t )
176 signalTaskToTracker( t,
"RecursiveMoveReplay" );
180 void Akonadi::ResourceScheduler::scheduleFullSyncCompletion()
183 t.type = SyncAllDone;
184 TaskList& queue = queueForTaskType( t.type );
187 signalTaskToTracker( t,
"SyncAllDone" );
191 void Akonadi::ResourceScheduler::scheduleCollectionTreeSyncCompletion()
194 t.type = SyncCollectionTreeDone;
195 TaskList& queue = queueForTaskType( t.type );
198 signalTaskToTracker( t,
"SyncCollectionTreeDone" );
202 void Akonadi::ResourceScheduler::scheduleCustomTask( QObject *receiver,
const char* methodName,
const QVariant &argument,
ResourceBase::SchedulePriority priority )
206 t.receiver = receiver;
207 t.methodName = methodName;
208 t.argument = argument;
209 QueueType queueType = GenericTaskQueue;
211 queueType = AfterChangeReplayQueue;
213 queueType = PrependTaskQueue;
214 TaskList& queue = mTaskList[ queueType ];
216 if ( queue.contains( t ) )
228 signalTaskToTracker( t,
"Custom-" + t.methodName );
232 void ResourceScheduler::taskDone()
235 emit status(
AgentBase::Idle, i18nc(
"@info:status Application ready for work",
"Ready" ) );
237 if ( s_resourcetracker ) {
238 QList<QVariant> argumentList;
239 argumentList << QString::number( mCurrentTask.serial )
241 s_resourcetracker->asyncCallWithArgumentList(QLatin1String(
"jobEnded" ), argumentList);
244 mCurrentTask = Task();
245 mCurrentTasksQueue = -1;
249 void ResourceScheduler::deferTask()
251 if ( mCurrentTask.type == Invalid )
254 if ( s_resourcetracker ) {
255 QList<QVariant> argumentList;
256 argumentList << QString::number( mCurrentTask.serial )
258 s_resourcetracker->asyncCallWithArgumentList(QLatin1String(
"jobEnded" ), argumentList);
261 Task t = mCurrentTask;
262 mCurrentTask = Task();
264 Q_ASSERT( mCurrentTasksQueue >= 0 && mCurrentTasksQueue < NQueueCount );
265 mTaskList[mCurrentTasksQueue].prepend( t );
266 mCurrentTasksQueue = -1;
268 signalTaskToTracker( t,
"DeferedTask" );
273 bool ResourceScheduler::isEmpty()
275 for (
int i = 0; i < NQueueCount; ++i ) {
276 if ( !mTaskList[i].isEmpty() )
282 void ResourceScheduler::scheduleNext()
284 if ( mCurrentTask.type != Invalid || isEmpty() || !mOnline )
286 QTimer::singleShot( 0,
this, SLOT(executeNext()) );
289 void ResourceScheduler::executeNext()
291 if ( mCurrentTask.type != Invalid || isEmpty() )
294 for (
int i = 0; i < NQueueCount; ++i ) {
295 if ( !mTaskList[ i ].isEmpty() ) {
296 mCurrentTask = mTaskList[ i ].takeFirst();
297 mCurrentTasksQueue = i;
302 if ( s_resourcetracker ) {
303 QList<QVariant> argumentList;
304 argumentList << QString::number( mCurrentTask.serial );
305 s_resourcetracker->asyncCallWithArgumentList(QLatin1String(
"jobStarted" ), argumentList);
308 switch ( mCurrentTask.type ) {
310 emit executeFullSync();
312 case SyncCollectionTree:
313 emit executeCollectionTreeSync();
316 emit executeCollectionSync( mCurrentTask.collection );
318 case SyncCollectionAttributes:
319 emit executeCollectionAttributesSync( mCurrentTask.collection );
322 emit executeItemFetch( mCurrentTask.item, mCurrentTask.itemParts );
324 case DeleteResourceCollection:
325 emit executeResourceCollectionDeletion();
327 case InvalideCacheForCollection:
328 emit executeCacheInvalidation( mCurrentTask.collection );
331 emit executeChangeReplay();
333 case RecursiveMoveReplay:
334 emit executeRecursiveMoveReplay( mCurrentTask.argument.value<
RecursiveMover*>() );
337 emit fullSyncComplete();
339 case SyncCollectionTreeDone:
340 emit collectionTreeSyncComplete();
344 const QByteArray methodSig = mCurrentTask.methodName +
"(QVariant)";
345 const bool hasSlotWithVariant = mCurrentTask.receiver->metaObject()->indexOfMethod(methodSig) != -1;
346 bool success =
false;
347 if ( hasSlotWithVariant ) {
348 success = QMetaObject::invokeMethod( mCurrentTask.receiver, mCurrentTask.methodName, Q_ARG(QVariant, mCurrentTask.argument) );
349 Q_ASSERT_X( success || !mCurrentTask.argument.isValid(),
"ResourceScheduler::executeNext",
"Valid argument was provided but the method wasn't found" );
352 success = QMetaObject::invokeMethod( mCurrentTask.receiver, mCurrentTask.methodName );
355 kError() <<
"Could not invoke slot" << mCurrentTask.methodName <<
"on" << mCurrentTask.receiver <<
"with argument" << mCurrentTask.argument;
359 kError() <<
"Unhandled task type" << mCurrentTask.type;
366 ResourceScheduler::Task ResourceScheduler::currentTask()
const
371 void ResourceScheduler::setOnline(
bool state)
373 if ( mOnline == state )
379 if ( mCurrentTask.type != Invalid ) {
381 queueForTaskType( mCurrentTask.type ).prepend( mCurrentTask );
382 mCurrentTask = Task();
383 mCurrentTasksQueue = -1;
386 TaskList& itemFetchQueue = queueForTaskType( FetchItem );
387 for ( QList< Task >::iterator it = itemFetchQueue.begin(); it != itemFetchQueue.end(); ) {
388 if ( (*it).type == FetchItem ) {
389 (*it).sendDBusReplies( i18nc(
"@info",
"Job canceled." ) );
390 it = itemFetchQueue.erase( it );
391 if ( s_resourcetracker ) {
392 QList<QVariant> argumentList;
393 argumentList << QString::number( mCurrentTask.serial )
394 << i18nc(
"@info",
"Job canceled." );
395 s_resourcetracker->asyncCallWithArgumentList( QLatin1String(
"jobEnded" ), argumentList );
404 void ResourceScheduler::signalTaskToTracker(
const Task &task,
const QByteArray &taskType )
407 if ( !s_resourcetracker && DBusConnectionPool::threadConnection().interface()->isServiceRegistered(QLatin1String(
"org.kde.akonadiconsole" ) ) ) {
408 s_resourcetracker =
new QDBusInterface( QLatin1String(
"org.kde.akonadiconsole" ),
409 QLatin1String(
"/resourcesJobtracker" ),
410 QLatin1String(
"org.freedesktop.Akonadi.JobTracker" ),
411 DBusConnectionPool::threadConnection(), 0 );
414 if ( s_resourcetracker ) {
415 QList<QVariant> argumentList;
416 argumentList << static_cast<AgentBase*>( parent() )->identifier()
417 << QString::number( task.serial )
419 << QString::fromLatin1( taskType );
420 s_resourcetracker->asyncCallWithArgumentList(QLatin1String(
"jobCreated" ), argumentList);
428 TaskList& queue = queueForTaskType( SyncCollection );
429 for ( QList<Task>::iterator it = queue.begin(); it != queue.end(); ) {
430 if ( (*it).type == SyncCollection && (*it).collection == collection ) {
431 it = queue.erase( it );
432 kDebug() <<
" erasing";
438 void ResourceScheduler::Task::sendDBusReplies(
const QString &errorMsg )
440 Q_FOREACH(
const QDBusMessage &msg, dbusMsgs ) {
441 QDBusMessage reply( msg.createReply() );
442 const QString methodName = msg.member();
443 if (methodName == QLatin1String(
"requestItemDelivery")) {
444 reply << errorMsg.isEmpty();
445 }
else if (methodName == QLatin1String(
"requestItemDeliveryV2")) {
447 }
else if (methodName.isEmpty()) {
450 kFatal() <<
"Got unexpected member:" << methodName;
452 DBusConnectionPool::threadConnection().send( reply );
456 ResourceScheduler::QueueType ResourceScheduler::queueTypeForTaskType( TaskType type )
460 case RecursiveMoveReplay:
461 return ChangeReplayQueue;
463 return ItemFetchQueue;
465 return GenericTaskQueue;
469 ResourceScheduler::TaskList& ResourceScheduler::queueForTaskType( TaskType type )
471 const QueueType qt = queueTypeForTaskType( type );
472 return mTaskList[ qt ];
475 void ResourceScheduler::dump()
477 kDebug() << dumpToString();
480 QString ResourceScheduler::dumpToString()
const
483 QTextStream str( &ret );
484 str <<
"ResourceScheduler: " << (mOnline?
"Online":
"Offline") << endl;
485 str <<
" current task: " << mCurrentTask << endl;
486 for (
int i = 0; i < NQueueCount; ++i ) {
487 const TaskList& queue = mTaskList[i];
488 if (queue.isEmpty()) {
489 str <<
" queue " << i <<
" is empty" << endl;
491 str <<
" queue " << i <<
" " << queue.size() <<
" tasks:" << endl;
492 for ( QList<Task>::const_iterator it = queue.begin(); it != queue.end(); ++it ) {
493 str <<
" " << (*it) << endl;
500 void ResourceScheduler::clear()
502 kDebug() <<
"Clearing ResourceScheduler queues:";
503 for (
int i = 0; i < NQueueCount; ++i ) {
504 TaskList& queue = mTaskList[i];
507 mCurrentTask = Task();
508 mCurrentTasksQueue = -1;
511 void Akonadi::ResourceScheduler::cancelQueues()
513 for (
int i = 0; i < NQueueCount; ++i ) {
514 TaskList& queue = mTaskList[i];
515 if ( s_resourcetracker ) {
516 foreach (
const Task &t, queue ) {
517 QList<QVariant> argumentList;
518 argumentList << QString::number( t.serial ) << QString();
519 s_resourcetracker->asyncCallWithArgumentList(QLatin1String(
"jobEnded" ), argumentList);
526 static const char s_taskTypes[][27] = {
529 "SyncCollectionTree",
531 "SyncCollectionAttributes",
534 "RecursiveMoveReplay",
535 "DeleteResourceCollection",
536 "InvalideCacheForCollection",
538 "SyncCollectionTreeDone",
542 QTextStream& Akonadi::operator<<( QTextStream& d,
const ResourceScheduler::Task& task )
544 d << task.serial <<
" " << s_taskTypes[task.type] <<
" ";
545 if ( task.type != ResourceScheduler::Invalid ) {
546 if ( task.collection.isValid() )
547 d <<
"collection " << task.collection.id() <<
" ";
548 if ( task.item.id() != -1 )
549 d <<
"item " << task.item.id() <<
" ";
550 if ( !task.methodName.isEmpty() )
551 d << task.methodName <<
" " << task.argument.toString();
556 QDebug Akonadi::operator<<( QDebug d,
const ResourceScheduler::Task& task )
559 QTextStream str( &s );
567 #include "moc_resourcescheduler_p.cpp"
Helper class for expanding inter-resource collection moves inside ResourceBase.
Represents a collection of PIM items.
SchedulePriority
Describes the scheduling priority of a task that has been queued for execution.
The task is scheduled after the last ChangeReplay task in the queue.
The task will be executed as soon as the current task has finished.
bool isValid() const
Returns whether the entity is valid.
The agent does currently nothing.