20#include "resourcescheduler_p.h"
22#include "dbusconnectionpool.h"
23#include "recursivemover_p.h"
26#include <klocalizedstring.h>
28#include <QtCore/QTimer>
29#include <QtDBus/QDBusInterface>
30#include <QtDBus/QDBusConnectionInterface>
31#include <boost/graph/graph_concepts.hpp>
35qint64 ResourceScheduler::Task::latestSerial = 0;
36static QDBusAbstractInterface *s_resourcetracker = 0;
40ResourceScheduler::ResourceScheduler( QObject *parent ) :
42 mCurrentTasksQueue( -1 ),
47void ResourceScheduler::scheduleFullSync()
51 TaskList& queue = queueForTaskType( t.type );
52 if ( queue.contains( t ) || mCurrentTask == t )
55 signalTaskToTracker( t,
"SyncAll" );
59void ResourceScheduler::scheduleCollectionTreeSync()
62 t.type = SyncCollectionTree;
63 TaskList& queue = queueForTaskType( t.type );
64 if ( queue.contains( t ) || mCurrentTask == t )
67 signalTaskToTracker( t,
"SyncCollectionTree" );
71void ResourceScheduler::scheduleSync(
const Collection & col)
74 t.type = SyncCollection;
76 TaskList& queue = queueForTaskType( t.type );
77 if ( queue.contains( t ) || mCurrentTask == t )
80 signalTaskToTracker( t,
"SyncCollection", QString::number( col.
id() ) );
84void ResourceScheduler::scheduleAttributesSync(
const Collection &collection )
87 t.type = SyncCollectionAttributes;
88 t.collection = collection;
90 TaskList& queue = queueForTaskType( t.type );
91 if ( queue.contains( t ) || mCurrentTask == t )
94 signalTaskToTracker( t,
"SyncCollectionAttributes", QString::number( collection.id() ) );
98void ResourceScheduler::scheduleItemFetch(
const Item & item,
const QSet<QByteArray> &parts,
const QDBusMessage & msg)
107 if ( mCurrentTask == t ) {
108 mCurrentTask.dbusMsgs << msg;
113 TaskList& queue = queueForTaskType( t.type );
114 const int idx = queue.indexOf( t );
116 queue[ idx ].dbusMsgs << msg;
122 signalTaskToTracker( t,
"FetchItem", QString::number( item.id() ) );
126void ResourceScheduler::scheduleResourceCollectionDeletion()
129 t.type = DeleteResourceCollection;
130 TaskList& queue = queueForTaskType( t.type );
131 if ( queue.contains( t ) || mCurrentTask == t )
134 signalTaskToTracker( t,
"DeleteResourceCollection" );
138void ResourceScheduler::scheduleCacheInvalidation(
const Collection &collection )
141 t.type = InvalideCacheForCollection;
142 t.collection = collection;
143 TaskList& queue = queueForTaskType( t.type );
144 if ( queue.contains( t ) || mCurrentTask == t )
147 signalTaskToTracker( t,
"InvalideCacheForCollection", QString::number( collection.id() ) );
151void ResourceScheduler::scheduleChangeReplay()
154 t.type = ChangeReplay;
155 TaskList& queue = queueForTaskType( t.type );
157 if ( queue.contains( t ) )
160 signalTaskToTracker( t,
"ChangeReplay" );
167 t.type = RecursiveMoveReplay;
168 t.collection = movedCollection;
169 t.argument = QVariant::fromValue( mover );
170 TaskList &queue = queueForTaskType( t.type );
172 if ( queue.contains( t ) || mCurrentTask == t )
176 signalTaskToTracker( t,
"RecursiveMoveReplay", QString::number( t.collection.id() ) );
180void Akonadi::ResourceScheduler::scheduleFullSyncCompletion()
183 t.type = SyncAllDone;
184 TaskList& queue = queueForTaskType( t.type );
187 signalTaskToTracker( t,
"SyncAllDone" );
191void Akonadi::ResourceScheduler::scheduleCollectionTreeSyncCompletion()
194 t.type = SyncCollectionTreeDone;
195 TaskList& queue = queueForTaskType( t.type );
198 signalTaskToTracker( t,
"SyncCollectionTreeDone" );
202void Akonadi::ResourceScheduler::scheduleCustomTask( QObject *receiver,
const char* methodName,
const QVariant &argument,
ResourceBase::SchedulePriority priority )
206 t.receiver = receiver;
207 t.methodName = methodName;
208 t.argument = argument;
209 QueueType queueType = GenericTaskQueue;
211 queueType = AfterChangeReplayQueue;
213 queueType = PrependTaskQueue;
214 TaskList& queue = mTaskList[ queueType ];
216 if ( queue.contains( t ) )
228 signalTaskToTracker( t,
"Custom-" + t.methodName );
232void ResourceScheduler::taskDone()
235 emit status(
AgentBase::Idle, i18nc(
"@info:status Application ready for work",
"Ready" ) );
237 if ( s_resourcetracker ) {
238 QList<QVariant> argumentList;
239 argumentList << QString::number( mCurrentTask.serial )
241 s_resourcetracker->asyncCallWithArgumentList(QLatin1String(
"jobEnded" ), argumentList);
244 mCurrentTask = Task();
245 mCurrentTasksQueue = -1;
249void ResourceScheduler::deferTask()
251 if ( mCurrentTask.type == Invalid )
254 if ( s_resourcetracker ) {
255 QList<QVariant> argumentList;
256 argumentList << QString::number( mCurrentTask.serial )
258 s_resourcetracker->asyncCallWithArgumentList(QLatin1String(
"jobEnded" ), argumentList);
261 Task t = mCurrentTask;
262 mCurrentTask = Task();
264 Q_ASSERT( mCurrentTasksQueue >= 0 && mCurrentTasksQueue < NQueueCount );
265 mTaskList[mCurrentTasksQueue].prepend( t );
266 mCurrentTasksQueue = -1;
268 signalTaskToTracker( t,
"DeferedTask" );
273bool ResourceScheduler::isEmpty()
275 for (
int i = 0; i < NQueueCount; ++i ) {
276 if ( !mTaskList[i].isEmpty() )
282void ResourceScheduler::scheduleNext()
284 if ( mCurrentTask.type != Invalid || isEmpty() || !mOnline )
286 QTimer::singleShot( 0,
this, SLOT(executeNext()) );
289void ResourceScheduler::executeNext()
291 if ( mCurrentTask.type != Invalid || isEmpty() )
294 for (
int i = 0; i < NQueueCount; ++i ) {
295 if ( !mTaskList[ i ].isEmpty() ) {
296 mCurrentTask = mTaskList[ i ].takeFirst();
297 mCurrentTasksQueue = i;
302 if ( s_resourcetracker ) {
303 QList<QVariant> argumentList;
304 argumentList << QString::number( mCurrentTask.serial );
305 s_resourcetracker->asyncCallWithArgumentList(QLatin1String(
"jobStarted" ), argumentList);
308 switch ( mCurrentTask.type ) {
310 emit executeFullSync();
312 case SyncCollectionTree:
313 emit executeCollectionTreeSync();
316 emit executeCollectionSync( mCurrentTask.collection );
318 case SyncCollectionAttributes:
319 emit executeCollectionAttributesSync( mCurrentTask.collection );
322 emit executeItemFetch( mCurrentTask.item, mCurrentTask.itemParts );
324 case DeleteResourceCollection:
325 emit executeResourceCollectionDeletion();
327 case InvalideCacheForCollection:
328 emit executeCacheInvalidation( mCurrentTask.collection );
331 emit executeChangeReplay();
333 case RecursiveMoveReplay:
334 emit executeRecursiveMoveReplay( mCurrentTask.argument.value<
RecursiveMover*>() );
337 emit fullSyncComplete();
339 case SyncCollectionTreeDone:
340 emit collectionTreeSyncComplete();
344 const QByteArray methodSig = mCurrentTask.methodName +
"(QVariant)";
345 const bool hasSlotWithVariant = mCurrentTask.receiver->metaObject()->indexOfMethod(methodSig) != -1;
346 bool success =
false;
347 if ( hasSlotWithVariant ) {
348 success = QMetaObject::invokeMethod( mCurrentTask.receiver, mCurrentTask.methodName, Q_ARG(QVariant, mCurrentTask.argument) );
349 Q_ASSERT_X( success || !mCurrentTask.argument.isValid(),
"ResourceScheduler::executeNext",
"Valid argument was provided but the method wasn't found" );
352 success = QMetaObject::invokeMethod( mCurrentTask.receiver, mCurrentTask.methodName );
355 kError() <<
"Could not invoke slot" << mCurrentTask.methodName <<
"on" << mCurrentTask.receiver <<
"with argument" << mCurrentTask.argument;
359 kError() <<
"Unhandled task type" << mCurrentTask.type;
366ResourceScheduler::Task ResourceScheduler::currentTask()
const
371void ResourceScheduler::setOnline(
bool state)
373 if ( mOnline == state )
379 if ( mCurrentTask.type != Invalid ) {
381 queueForTaskType( mCurrentTask.type ).prepend( mCurrentTask );
382 mCurrentTask = Task();
383 mCurrentTasksQueue = -1;
386 TaskList& itemFetchQueue = queueForTaskType( FetchItem );
387 for ( QList< Task >::iterator it = itemFetchQueue.begin(); it != itemFetchQueue.end(); ) {
388 if ( (*it).type == FetchItem ) {
389 (*it).sendDBusReplies( i18nc(
"@info",
"Job canceled." ) );
390 it = itemFetchQueue.erase( it );
391 if ( s_resourcetracker ) {
392 QList<QVariant> argumentList;
393 argumentList << QString::number( mCurrentTask.serial )
394 << i18nc(
"@info",
"Job canceled." );
395 s_resourcetracker->asyncCallWithArgumentList( QLatin1String(
"jobEnded" ), argumentList );
404void ResourceScheduler::signalTaskToTracker(
const Task &task,
const QByteArray &taskType,
const QString &debugString )
407 if ( !s_resourcetracker && DBusConnectionPool::threadConnection().interface()->isServiceRegistered(QLatin1String(
"org.kde.akonadiconsole" ) ) ) {
408 s_resourcetracker =
new QDBusInterface( QLatin1String(
"org.kde.akonadiconsole" ),
409 QLatin1String(
"/resourcesJobtracker" ),
410 QLatin1String(
"org.freedesktop.Akonadi.JobTracker" ),
411 DBusConnectionPool::threadConnection(), 0 );
414 if ( s_resourcetracker ) {
415 QList<QVariant> argumentList;
416 argumentList << static_cast<AgentBase*>( parent() )->identifier()
417 << QString::number( task.serial )
419 << QString::fromLatin1( taskType )
422 s_resourcetracker->asyncCallWithArgumentList(QLatin1String(
"jobCreated" ), argumentList);
428 if ( !collection.isValid() )
430 TaskList& queue = queueForTaskType( SyncCollection );
431 for ( QList<Task>::iterator it = queue.begin(); it != queue.end(); ) {
432 if ( (*it).type == SyncCollection && (*it).collection == collection ) {
433 it = queue.erase( it );
434 kDebug() <<
" erasing";
440void ResourceScheduler::Task::sendDBusReplies(
const QString &errorMsg )
442 Q_FOREACH(
const QDBusMessage &msg, dbusMsgs ) {
443 QDBusMessage reply( msg.createReply() );
444 const QString methodName = msg.member();
445 if (methodName == QLatin1String(
"requestItemDelivery")) {
446 reply << errorMsg.isEmpty();
447 }
else if (methodName == QLatin1String(
"requestItemDeliveryV2")) {
449 }
else if (methodName.isEmpty()) {
452 kFatal() <<
"Got unexpected member:" << methodName;
454 DBusConnectionPool::threadConnection().send( reply );
458ResourceScheduler::QueueType ResourceScheduler::queueTypeForTaskType( TaskType type )
462 case RecursiveMoveReplay:
463 return ChangeReplayQueue;
465 case SyncCollectionAttributes:
466 return UserActionQueue;
468 return GenericTaskQueue;
472ResourceScheduler::TaskList& ResourceScheduler::queueForTaskType( TaskType type )
474 const QueueType qt = queueTypeForTaskType( type );
475 return mTaskList[ qt ];
478void ResourceScheduler::dump()
480 kDebug() << dumpToString();
483QString ResourceScheduler::dumpToString()
const
486 QTextStream str( &ret );
487 str <<
"ResourceScheduler: " << (mOnline?
"Online":
"Offline") << endl;
488 str <<
" current task: " << mCurrentTask << endl;
489 for (
int i = 0; i < NQueueCount; ++i ) {
490 const TaskList& queue = mTaskList[i];
491 if (queue.isEmpty()) {
492 str <<
" queue " << i <<
" is empty" << endl;
494 str <<
" queue " << i <<
" " << queue.size() <<
" tasks:" << endl;
495 for ( QList<Task>::const_iterator it = queue.begin(); it != queue.end(); ++it ) {
496 str <<
" " << (*it) << endl;
503void ResourceScheduler::clear()
505 kDebug() <<
"Clearing ResourceScheduler queues:";
506 for (
int i = 0; i < NQueueCount; ++i ) {
507 TaskList& queue = mTaskList[i];
510 mCurrentTask = Task();
511 mCurrentTasksQueue = -1;
514void Akonadi::ResourceScheduler::cancelQueues()
516 for (
int i = 0; i < NQueueCount; ++i ) {
517 TaskList& queue = mTaskList[i];
518 if ( s_resourcetracker ) {
519 foreach (
const Task &t, queue ) {
520 QList<QVariant> argumentList;
521 argumentList << QString::number( t.serial ) << QString();
522 s_resourcetracker->asyncCallWithArgumentList(QLatin1String(
"jobEnded" ), argumentList);
529static const char s_taskTypes[][27] = {
532 "SyncCollectionTree",
534 "SyncCollectionAttributes",
537 "RecursiveMoveReplay",
538 "DeleteResourceCollection",
539 "InvalideCacheForCollection",
541 "SyncCollectionTreeDone",
545QTextStream& Akonadi::operator<<( QTextStream& d,
const ResourceScheduler::Task& task )
547 d << task.serial <<
" " << s_taskTypes[task.type] <<
" ";
548 if ( task.type != ResourceScheduler::Invalid ) {
549 if ( task.collection.isValid() )
550 d <<
"collection " << task.collection.id() <<
" ";
551 if ( task.item.id() != -1 )
552 d <<
"item " << task.item.id() <<
" ";
553 if ( !task.methodName.isEmpty() )
554 d << task.methodName <<
" " << task.argument.toString();
559QDebug Akonadi::operator<<( QDebug d,
const ResourceScheduler::Task& task )
562 QTextStream str( &s );
570#include "moc_resourcescheduler_p.cpp"
@ Idle
The agent does currently nothing.
Represents a collection of PIM items.
Id id() const
Returns the unique identifier of the entity.
Helper class for expanding inter-resource collection moves inside ResourceBase.
SchedulePriority
Describes the scheduling priority of a task that has been queued for execution.
@ Prepend
The task will be executed as soon as the current task has finished.
@ AfterChangeReplay
The task is scheduled after the last ChangeReplay task in the queue.
FreeBusyManager::Singleton.