21#include "resourcebase.h"
22#include "agentbase_p.h"
24#include "resourceadaptor.h"
25#include "collectiondeletejob.h"
26#include "collectionsync_p.h"
27#include "dbusconnectionpool.h"
29#include "kdepimlibs-version.h"
30#include "resourcescheduler_p.h"
31#include "tracerinterface.h"
32#include "xdgbasedirs_p.h"
34#include "changerecorder.h"
35#include "collectionfetchjob.h"
36#include "collectionfetchscope.h"
37#include "collectionmodifyjob.h"
38#include "invalidatecachejob_p.h"
39#include "itemfetchjob.h"
40#include "itemfetchscope.h"
41#include "itemmodifyjob.h"
42#include "itemmodifyjob_p.h"
44#include "resourceselectjob_p.h"
46#include "servermanager_p.h"
47#include "recursivemover_p.h"
49#include <kaboutdata.h>
50#include <kcmdlineargs.h>
52#include <klocalizedstring.h>
54#include <akonadi/tagmodifyjob.h>
56#include <QtCore/QDebug>
58#include <QtCore/QHash>
59#include <QtCore/QSettings>
60#include <QtCore/QTimer>
61#include <QApplication>
62#include <QtDBus/QtDBus>
69 Q_CLASSINFO(
"D-Bus Interface",
"org.kde.dfaure")
76 , mItemSyncFetchScope(0)
77 , mItemTransactionMode(
ItemSync::SingleTransaction)
79 , mCollectionSyncer(0)
80 , mHierarchicalRid(false)
81 , mUnemittedProgress(0)
82 , mAutomaticProgressReporting(true)
83 , mDisableAutomaticItemDeliveryDone(false)
84 , mItemSyncBatchSize(10)
85 , mCurrentCollectionFetchJob(0)
87 Internal::setClientType(Internal::Resource);
88 mStatusMessage = defaultReadyMessage();
89 mProgressEmissionCompressor.setInterval(1000);
90 mProgressEmissionCompressor.setSingleShot(
true);
92 mKeepLocalCollectionChanges <<
"ENTITYDISPLAY";
95 ~ResourceBasePrivate()
97 delete mItemSyncFetchScope;
105 if (!DBusConnectionPool::threadConnection().registerService(serviceId)) {
106 QString reason = DBusConnectionPool::threadConnection().lastError().message();
107 if (reason.isEmpty()) {
108 reason = QString::fromLatin1(
"this service is probably running already.");
110 kError() <<
"Unable to register service" << serviceId <<
"at D-Bus:" << reason;
112 if (QThread::currentThread() == QCoreApplication::instance()->thread()) {
113 QCoreApplication::instance()->exit(1);
117 AgentBasePrivate::delayedInit();
121 virtual void changeProcessed()
123 if (m_recursiveMover) {
124 m_recursiveMover->changeProcessed();
125 QTimer::singleShot(0, m_recursiveMover, SLOT(replayNext()));
130 if (!mChangeRecorder->
isEmpty()) {
131 scheduler->scheduleChangeReplay();
133 scheduler->taskDone();
136 void slotAbortRequested();
138 void slotDeliveryDone(KJob *job);
139 void slotCollectionSyncDone(KJob *job);
140 void slotLocalListDone(KJob *job);
141 void slotSynchronizeCollection(
const Collection &col);
142 void slotItemRetrievalCollectionFetchDone(KJob *job);
143 void slotCollectionListDone(KJob *job);
144 void slotSynchronizeCollectionAttributes(
const Collection &col);
145 void slotCollectionListForAttributesDone(KJob *job);
146 void slotCollectionAttributesSyncDone(KJob *job);
147 void slotAttributeRetrievalCollectionFetchDone(KJob *job);
149 void slotItemSyncDone(KJob *job);
151 void slotPercent(KJob *job,
unsigned long percent);
152 void slotDelayedEmitProgress();
153 void slotDeleteResourceCollection();
154 void slotDeleteResourceCollectionDone(KJob *job);
155 void slotCollectionDeletionDone(KJob *job);
159 void slotPrepareItemRetrieval(
const Akonadi::Item &item);
160 void slotPrepareItemRetrievalResult(KJob *job);
162 void changeCommittedResult(KJob *job);
165 void slotRecursiveMoveReplayResult(KJob *job);
167 void slotSessionReconnected()
174 void createItemSyncInstanceIfMissing()
177 Q_ASSERT_X(scheduler->currentTask().type == ResourceScheduler::SyncCollection,
178 "createItemSyncInstance",
"Calling items retrieval methods although no item retrieval is in progress");
180 mItemSyncer =
new ItemSync(q->currentCollection());
181 mItemSyncer->setTransactionMode(mItemTransactionMode);
182 mItemSyncer->setBatchSize(mItemSyncBatchSize);
183 mItemSyncer->setMergeMode(mItemMergeMode);
184 if (mItemSyncFetchScope) {
185 mItemSyncer->setFetchScope(*mItemSyncFetchScope);
187 mItemSyncer->setDisableAutomaticDeliveryDone(mDisableAutomaticItemDeliveryDone);
188 mItemSyncer->setProperty(
"collection", QVariant::fromValue(q->currentCollection()));
189 connect(mItemSyncer, SIGNAL(percent(KJob*,ulong)), q, SLOT(slotPercent(KJob*,ulong)));
190 connect(mItemSyncer, SIGNAL(result(KJob*)), q, SLOT(slotItemSyncDone(KJob*)));
191 connect(mItemSyncer, SIGNAL(readyForNextBatch(
int)), q, SIGNAL(retrieveNextItemSyncBatch(
int)));
193 Q_ASSERT(mItemSyncer);
198 Q_SCRIPTABLE QString dumpToString()
const
202 QMetaObject::invokeMethod(
const_cast<ResourceBase *
>(q),
"dumpResourceToString", Qt::DirectConnection, Q_RETURN_ARG(QString, retVal));
203 return scheduler->dumpToString() + QLatin1Char(
'\n') + retVal;
206 Q_SCRIPTABLE
void dump()
211 Q_SCRIPTABLE
void clear()
224 if (collection.remoteId().isEmpty()) {
228 AgentBasePrivate::itemAdded(item, collection);
231 void itemChanged(
const Akonadi::Item &item,
const QSet< QByteArray > &partIdentifiers)
233 if (item.remoteId().isEmpty()) {
237 AgentBasePrivate::itemChanged(item, partIdentifiers);
240 void itemsFlagsChanged(
const Item::List &items,
const QSet< QByteArray > &addedFlags,
241 const QSet< QByteArray > &removedFlags)
243 if (addedFlags.isEmpty() && removedFlags.isEmpty()) {
248 Item::List validItems;
249 foreach (
const Akonadi::Item &item, items) {
250 if (!item.remoteId().isEmpty()) {
254 if (validItems.isEmpty()) {
259 AgentBasePrivate::itemsFlagsChanged(validItems, addedFlags, removedFlags);
262 void itemsTagsChanged(
const Item::List &items,
const QSet<Tag> &addedTags,
const QSet<Tag> &removedTags)
264 if (addedTags.isEmpty() && removedTags.isEmpty()) {
269 Item::List validItems;
270 foreach (
const Akonadi::Item &item, items) {
271 if (!item.remoteId().isEmpty()) {
275 if (validItems.isEmpty()) {
280 AgentBasePrivate::itemsTagsChanged(validItems, addedTags, removedTags);
286 if (item.remoteId().isEmpty() || destination.
remoteId().isEmpty() || destination == source) {
290 AgentBasePrivate::itemMoved(item, source, destination);
293 void itemsMoved(
const Item::List &items,
const Collection &source,
const Collection &destination)
295 if (destination.
remoteId().isEmpty() || destination == source) {
300 Item::List validItems;
301 foreach (
const Akonadi::Item &item, items) {
302 if (!item.remoteId().isEmpty()) {
306 if (validItems.isEmpty()) {
311 AgentBasePrivate::itemsMoved(validItems, source, destination);
314 void itemRemoved(
const Akonadi::Item &item)
316 if (item.remoteId().isEmpty()) {
320 if (!item.parentCollection().isValid()) {
321 kWarning() <<
"Invalid parent collection for item" << item.id();
325 AgentBasePrivate::itemRemoved(item);
328 void itemsRemoved(
const Item::List &items)
330 Item::List validItems;
331 foreach (
const Akonadi::Item &item, items) {
332 if (!item.parentCollection().isValid()) {
333 kWarning() <<
"Invalid parent collection for item" << item.id();
336 if (!item.remoteId().isEmpty()) {
340 if (validItems.isEmpty()) {
345 AgentBasePrivate::itemsRemoved(validItems);
354 AgentBasePrivate::collectionAdded(collection, parent);
359 if (collection.remoteId().isEmpty()) {
363 AgentBasePrivate::collectionChanged(collection);
366 void collectionChanged(
const Akonadi::Collection &collection,
const QSet< QByteArray > &partIdentifiers)
368 if (collection.remoteId().isEmpty()) {
372 AgentBasePrivate::collectionChanged(collection, partIdentifiers);
378 if (destination.
remoteId().isEmpty() || source == destination) {
386 AgentBasePrivate::collectionRemoved(collection);
388 scheduler->taskDone();
391 scheduler->scheduleMoveReplay(collection, mover);
397 if (collection.remoteId().isEmpty()) {
403 AgentBasePrivate::collectionMoved(collection, source, destination);
408 if (collection.remoteId().isEmpty()) {
412 AgentBasePrivate::collectionRemoved(collection);
417 if (!tag.isValid()) {
422 AgentBasePrivate::tagAdded(tag);
427 if (tag.remoteId().isEmpty()) {
432 AgentBasePrivate::tagChanged(tag);
437 if (tag.remoteId().isEmpty()) {
442 AgentBasePrivate::tagRemoved(tag);
449 ResourceScheduler *scheduler;
453 ItemSync::MergeMode mItemMergeMode;
455 bool mHierarchicalRid;
456 QTimer mProgressEmissionCompressor;
457 int mUnemittedProgress;
458 QMap<Akonadi::Collection::Id, QVariantMap> mUnemittedAdvancedStatus;
459 bool mAutomaticProgressReporting;
460 bool mDisableAutomaticItemDeliveryDone;
461 QPointer<RecursiveMover> m_recursiveMover;
462 int mItemSyncBatchSize;
463 QSet<QByteArray> mKeepLocalCollectionChanges;
464 KJob *mCurrentCollectionFetchJob;
468 :
AgentBase(new ResourceBasePrivate(this), id)
472 new Akonadi__ResourceAdaptor(
this);
474 d->scheduler =
new ResourceScheduler(
this);
476 d->mChangeRecorder->setChangeRecordingEnabled(
true);
477 d->mChangeRecorder->setCollectionMoveTranslationEnabled(
false);
478 connect(d->mChangeRecorder, SIGNAL(changesAdded()),
479 d->scheduler, SLOT(scheduleChangeReplay()));
481 d->mChangeRecorder->setResourceMonitored(d->mId.toLatin1());
482 d->mChangeRecorder->fetchCollection(
true);
484 connect(d->scheduler, SIGNAL(executeFullSync()),
486 connect(d->scheduler, SIGNAL(executeCollectionTreeSync()),
492 connect(d->scheduler, SIGNAL(executeItemFetch(Akonadi::Item,QSet<QByteArray>)),
493 SLOT(slotPrepareItemRetrieval(Akonadi::Item)));
494 connect(d->scheduler, SIGNAL(executeResourceCollectionDeletion()),
495 SLOT(slotDeleteResourceCollection()));
498 connect(d->scheduler, SIGNAL(
status(
int,QString)),
499 SIGNAL(
status(
int,QString)));
500 connect(d->scheduler, SIGNAL(executeChangeReplay()),
501 d->mChangeRecorder, SLOT(replayNext()));
502 connect(d->scheduler, SIGNAL(executeRecursiveMoveReplay(
RecursiveMover*)),
504 connect(d->scheduler, SIGNAL(fullSyncComplete()), SIGNAL(
synchronized()));
506 connect(d->mChangeRecorder, SIGNAL(nothingToReplay()), d->scheduler, SLOT(
taskDone()));
509 connect(
this, SIGNAL(
abortRequested()),
this, SLOT(slotAbortRequested()));
510 connect(
this, SIGNAL(
synchronized()), d->scheduler, SLOT(
taskDone()));
515 connect(&d->mProgressEmissionCompressor, SIGNAL(timeout()),
516 this, SLOT(slotDelayedEmitProgress()));
518 d->scheduler->setOnline(d->mOnline);
519 if (!d->mChangeRecorder->isEmpty()) {
520 d->scheduler->scheduleChangeReplay();
525 connect(d->mChangeRecorder->session(), SIGNAL(reconnected()), SLOT(slotSessionReconnected()));
534 d_func()->scheduler->scheduleFullSync();
547QString ResourceBase::parseArguments(
int argc,
char **argv)
551 kDebug() <<
"Not enough arguments passed...";
555 for (
int i = 1; i < argc - 1; ++i) {
556 if (QLatin1String(argv[i]) == QLatin1String(
"--identifier")) {
562 kDebug() <<
"Identifier argument missing";
566 const QFileInfo fi(QString::fromLocal8Bit(argv[0]));
568 const QByteArray catalog = fi.baseName().toLatin1();
571 ki18nc(
"@title application name",
"Akonadi Resource"), KDEPIMLIBS_VERSION,
572 ki18nc(
"@title application description",
"Akonadi Resource"));
574 KCmdLineOptions options;
575 options.add(
"identifier <argument>",
576 ki18nc(
"@label commandline option",
"Resource identifier"));
577 KCmdLineArgs::addCmdLineOptions(options);
584 QApplication::setQuitOnLastWindowClosed(
false);
585 KGlobal::locale()->insertCatalog(QLatin1String(
"libakonadi"));
586 int rv = kapp->exec();
591void ResourceBasePrivate::slotAbortRequested()
595 scheduler->cancelQueues();
596 QMetaObject::invokeMethod(q,
"abortActivity");
602 Q_ASSERT(d->scheduler->currentTask().type == ResourceScheduler::FetchItem);
603 if (!item.isValid()) {
604 d->scheduler->currentTask().sendDBusReplies(i18nc(
"@info",
"Invalid item retrieved"));
605 d->scheduler->taskDone();
610 QSet<QByteArray> requestedParts = d->scheduler->currentTask().itemParts;
611 foreach (
const QByteArray &part, requestedParts) {
612 if (!item.loadedPayloadParts().contains(part)) {
613 kWarning() <<
"Item does not provide part" << part;
618 job->d_func()->setSilent(
true );
621 connect(job, SIGNAL(result(KJob*)), SLOT(slotDeliveryDone(KJob*)));
624void ResourceBasePrivate::slotDeliveryDone(KJob *job)
627 Q_ASSERT(scheduler->currentTask().type == ResourceScheduler::FetchItem);
629 emit q->error(i18nc(
"@info",
"Error while creating item: %1", job->errorString()));
631 scheduler->currentTask().sendDBusReplies(job->error() ? job->errorString() : QString());
632 scheduler->taskDone();
638 Q_ASSERT(d->scheduler->currentTask().type == ResourceScheduler::SyncCollectionAttributes);
639 if (!collection.isValid()) {
641 d->scheduler->taskDone();
646 connect(job, SIGNAL(result(KJob*)), SLOT(slotCollectionAttributesSyncDone(KJob*)));
649void ResourceBasePrivate::slotCollectionAttributesSyncDone(KJob *job)
652 Q_ASSERT(scheduler->currentTask().type == ResourceScheduler::SyncCollectionAttributes);
654 emit q->error(i18nc(
"@info",
"Error while updating collection: %1", job->errorString()));
656 emit q->attributesSynchronized(scheduler->currentTask().collection.id());
657 scheduler->taskDone();
660void ResourceBasePrivate::slotDeleteResourceCollection()
666 connect(job, SIGNAL(result(KJob*)), q, SLOT(slotDeleteResourceCollectionDone(KJob*)));
669void ResourceBasePrivate::slotDeleteResourceCollectionDone(KJob *job)
673 emit q->error(job->errorString());
674 scheduler->taskDone();
680 connect(job, SIGNAL(result(KJob*)), q, SLOT(slotCollectionDeletionDone(KJob*)));
683 scheduler->taskDone();
688void ResourceBasePrivate::slotCollectionDeletionDone(KJob *job)
692 emit q->error(job->errorString());
695 scheduler->taskDone();
702 connect(job, SIGNAL(result(KJob*)), scheduler, SLOT(taskDone()));
713 connect(transaction, SIGNAL(finished(KJob*)),
714 this, SLOT(changeCommittedResult(KJob*)));
717 Q_FOREACH (
const Item &item, items) {
719 job->d_func()->setClean();
729 connect(job, SIGNAL(result(KJob*)), SLOT(changeCommittedResult(KJob*)));
732void ResourceBasePrivate::changeCommittedResult(KJob *job)
735 kWarning() << job->errorText();
739 if (qobject_cast<CollectionModifyJob *>(job)) {
741 emit q->error(i18nc(
"@info",
"Updating local collection failed: %1.", job->errorText()));
743 mChangeRecorder->d_ptr->invalidateCache(
static_cast<CollectionModifyJob *
>(job)->collection());
754 connect(job, SIGNAL(result(KJob*)), SLOT(changeCommittedResult(KJob*)));
757bool ResourceBase::requestItemDelivery(qint64 uid,
const QString &remoteId,
758 const QString &mimeType,
const QStringList &parts)
760 return requestItemDeliveryV2(uid, remoteId, mimeType, parts).isEmpty();
763QString ResourceBase::requestItemDeliveryV2(qint64 uid,
const QString &remoteId,
const QString &mimeType,
const QStringList &_parts)
767 const QString errorMsg = i18nc(
"@info",
"Cannot fetch item in offline mode.");
768 emit
error(errorMsg);
772 setDelayedReply(
true);
775 item.setMimeType(mimeType);
776 item.setRemoteId(remoteId);
778 QSet<QByteArray> parts;
779 Q_FOREACH (
const QString &str, _parts) {
780 parts.insert(str.toLatin1());
783 d->scheduler->scheduleItemFetch(item, parts, message());
792 Q_ASSERT_X(d->scheduler->currentTask().type == ResourceScheduler::SyncCollectionTree ||
793 d->scheduler->currentTask().type == ResourceScheduler::SyncAll,
794 "ResourceBase::collectionsRetrieved()",
795 "Calling collectionsRetrieved() although no collection retrieval is in progress");
796 if (!d->mCollectionSyncer) {
798 d->mCollectionSyncer->setHierarchicalRemoteIds(d->mHierarchicalRid);
799 d->mCollectionSyncer->setKeepLocalChanges(d->mKeepLocalCollectionChanges);
800 connect(d->mCollectionSyncer, SIGNAL(
percent(KJob*,ulong)), SLOT(slotPercent(KJob*,ulong)));
801 connect(d->mCollectionSyncer, SIGNAL(result(KJob*)), SLOT(slotCollectionSyncDone(KJob*)));
803 d->mCollectionSyncer->setRemoteCollections(collections);
810 Q_ASSERT_X(d->scheduler->currentTask().type == ResourceScheduler::SyncCollectionTree ||
811 d->scheduler->currentTask().type == ResourceScheduler::SyncAll,
812 "ResourceBase::collectionsRetrievedIncremental()",
813 "Calling collectionsRetrievedIncremental() although no collection retrieval is in progress");
814 if (!d->mCollectionSyncer) {
816 d->mCollectionSyncer->setHierarchicalRemoteIds(d->mHierarchicalRid);
817 d->mCollectionSyncer->setKeepLocalChanges(d->mKeepLocalCollectionChanges);
818 connect(d->mCollectionSyncer, SIGNAL(
percent(KJob*,ulong)), SLOT(slotPercent(KJob*,ulong)));
819 connect(d->mCollectionSyncer, SIGNAL(result(KJob*)), SLOT(slotCollectionSyncDone(KJob*)));
821 d->mCollectionSyncer->setRemoteCollections(changedCollections, removedCollections);
827 Q_ASSERT_X(d->scheduler->currentTask().type == ResourceScheduler::SyncCollectionTree ||
828 d->scheduler->currentTask().type == ResourceScheduler::SyncAll,
829 "ResourceBase::setCollectionStreamingEnabled()",
830 "Calling setCollectionStreamingEnabled() although no collection retrieval is in progress");
831 if (!d->mCollectionSyncer) {
833 d->mCollectionSyncer->setHierarchicalRemoteIds(d->mHierarchicalRid);
834 connect(d->mCollectionSyncer, SIGNAL(
percent(KJob*,ulong)), SLOT(slotPercent(KJob*,ulong)));
835 connect(d->mCollectionSyncer, SIGNAL(result(KJob*)), SLOT(slotCollectionSyncDone(KJob*)));
837 d->mCollectionSyncer->setStreamingEnabled(enable);
843 Q_ASSERT_X(d->scheduler->currentTask().type == ResourceScheduler::SyncCollectionTree ||
844 d->scheduler->currentTask().type == ResourceScheduler::SyncAll,
845 "ResourceBase::collectionsRetrievalDone()",
846 "Calling collectionsRetrievalDone() although no collection retrieval is in progress");
848 if (d->mCollectionSyncer) {
849 d->mCollectionSyncer->retrievalDone();
853 d->scheduler->taskDone();
860 d->mKeepLocalCollectionChanges = parts;
863void ResourceBasePrivate::slotCollectionSyncDone(KJob *job)
866 mCollectionSyncer = 0;
869 emit q->error(job->errorString());
872 if (scheduler->currentTask().type == ResourceScheduler::SyncAll) {
874 list->
setFetchScope(q->changeRecorder()->collectionFetchScope());
877 q->connect(list, SIGNAL(result(KJob*)), q, SLOT(slotLocalListDone(KJob*)));
879 }
else if (scheduler->currentTask().type == ResourceScheduler::SyncCollectionTree) {
880 scheduler->scheduleCollectionTreeSyncCompletion();
883 scheduler->taskDone();
886void ResourceBasePrivate::slotLocalListDone(KJob *job)
890 emit q->error(job->errorString());
894 scheduler->scheduleSync(col);
896 scheduler->scheduleFullSyncCompletion();
898 scheduler->taskDone();
901void ResourceBasePrivate::slotSynchronizeCollection(
const Collection &col)
904 currentCollection = col;
911 if (!contentTypes.isEmpty() || col.
isVirtual()) {
912 if (mAutomaticProgressReporting) {
917 fetchJob->
setFetchScope(q->changeRecorder()->collectionFetchScope());
918 connect(fetchJob, SIGNAL(result(KJob*)), q, SLOT(slotItemRetrievalCollectionFetchDone(KJob*)));
919 mCurrentCollectionFetchJob = fetchJob;
923 scheduler->taskDone();
926void ResourceBasePrivate::slotItemRetrievalCollectionFetchDone(KJob *job)
929 mCurrentCollectionFetchJob = 0;
931 kWarning() <<
"Failed to retrieve collection for sync: " << job->errorString();
932 q->cancelTask(i18n(
"Failed to retrieve collection for sync."));
942 return d->mItemSyncBatchSize;
948 d->mItemSyncBatchSize = batchSize;
951void ResourceBasePrivate::slotSynchronizeCollectionAttributes(
const Collection &col)
955 fetchJob->
setFetchScope(q->changeRecorder()->collectionFetchScope());
956 connect(fetchJob, SIGNAL(result(KJob*)), q, SLOT(slotAttributeRetrievalCollectionFetchDone(KJob*)));
957 Q_ASSERT(!mCurrentCollectionFetchJob);
958 mCurrentCollectionFetchJob = fetchJob;
961void ResourceBasePrivate::slotAttributeRetrievalCollectionFetchDone(KJob *job)
963 mCurrentCollectionFetchJob = 0;
966 kWarning() <<
"Failed to retrieve collection for attribute sync: " << job->errorString();
967 q->cancelTask(i18n(
"Failed to retrieve collection for attribute sync."));
974void ResourceBasePrivate::slotPrepareItemRetrieval(
const Akonadi::Item &item)
982 const QSet<QByteArray> attributes = q->changeRecorder()->itemFetchScope().attributes();
983 foreach (
const QByteArray &attribute, attributes) {
987 q->connect(fetch, SIGNAL(result(KJob*)), SLOT(slotPrepareItemRetrievalResult(KJob*)));
990void ResourceBasePrivate::slotPrepareItemRetrievalResult(KJob *job)
993 Q_ASSERT_X(scheduler->currentTask().type == ResourceScheduler::FetchItem,
994 "ResourceBasePrivate::slotPrepareItemRetrievalResult()",
995 "Preparing item retrieval although no item retrieval is in progress");
997 q->cancelTask(job->errorText());
1000 ItemFetchJob *fetch = qobject_cast<ItemFetchJob *>(job);
1001 if (fetch->
items().count() != 1) {
1002 q->cancelTask(i18n(
"The requested item no longer exists"));
1005 const Item item = fetch->
items().first();
1006 const QSet<QByteArray> parts = scheduler->currentTask().itemParts;
1007 if (!q->retrieveItem(item, parts)) {
1012void ResourceBasePrivate::slotRecursiveMoveReplay(
RecursiveMover *mover)
1016 Q_ASSERT(!m_recursiveMover);
1017 m_recursiveMover = mover;
1018 connect(mover, SIGNAL(result(KJob*)), q, SLOT(slotRecursiveMoveReplayResult(KJob*)));
1022void ResourceBasePrivate::slotRecursiveMoveReplayResult(KJob *job)
1025 m_recursiveMover = 0;
1039 if (d->mItemSyncer) {
1040 d->mItemSyncer->deliveryDone();
1043 d->scheduler->taskDone();
1050 d->scheduler->scheduleResourceCollectionDeletion();
1056 d->scheduler->scheduleCacheInvalidation(collection);
1062 Q_ASSERT_X(d->scheduler->currentTask().type == ResourceScheduler::SyncCollection ,
1063 "ResourceBase::currentCollection()",
1064 "Trying to access current collection although no item retrieval is in progress");
1065 return d->currentCollection;
1071 Q_ASSERT_X(d->scheduler->currentTask().type == ResourceScheduler::FetchItem ,
1072 "ResourceBase::currentItem()",
1073 "Trying to access current item although no item retrieval is in progress");
1074 return d->scheduler->currentTask().item;
1079 d_func()->scheduler->scheduleCollectionTreeSync();
1085 if (d->mCurrentCollectionFetchJob) {
1086 d->mCurrentCollectionFetchJob->kill();
1087 d->mCurrentCollectionFetchJob = 0;
1089 switch (d->scheduler->currentTask().type) {
1090 case ResourceScheduler::FetchItem:
1093 case ResourceScheduler::ChangeReplay:
1094 d->changeProcessed();
1096 case ResourceScheduler::SyncCollectionTree:
1097 case ResourceScheduler::SyncAll:
1098 if (d->mCollectionSyncer) {
1099 d->mCollectionSyncer->rollback();
1101 d->scheduler->taskDone();
1104 case ResourceScheduler::SyncCollection:
1105 if (d->mItemSyncer) {
1106 d->mItemSyncer->rollback();
1108 d->scheduler->taskDone();
1112 d->scheduler->taskDone();
1126 d->scheduler->deferTask();
1131 d_func()->scheduler->setOnline(state);
1145 connect(job, SIGNAL(result(KJob*)), SLOT(slotCollectionListDone(KJob*)));
1148void ResourceBasePrivate::slotCollectionListDone(KJob *job)
1150 if (!job->error()) {
1152 Q_FOREACH (
const Collection &collection, list) {
1156 scheduler->scheduleAttributesSync(collection);
1157 scheduler->scheduleSync(collection);
1161 kWarning() <<
"Failed to fetch collection for collection sync: " << job->errorString();
1170 connect(job, SIGNAL(result(KJob*)), SLOT(slotCollectionListForAttributesDone(KJob*)));
1173void ResourceBasePrivate::slotCollectionListForAttributesDone(KJob *job)
1175 if (!job->error()) {
1177 if (!list.isEmpty()) {
1179 scheduler->scheduleAttributesSync(col);
1190 if (d->mItemSyncer) {
1191 d->mItemSyncer->setTotalItems(amount);
1198 if (d->mItemSyncer) {
1199 d->mItemSyncer->setDisableAutomaticDeliveryDone(disable);
1201 d->mDisableAutomaticItemDeliveryDone = disable;
1207 d->createItemSyncInstanceIfMissing();
1208 if (d->mItemSyncer) {
1209 d->mItemSyncer->setStreamingEnabled(enable);
1216 d->createItemSyncInstanceIfMissing();
1217 if (d->mItemSyncer) {
1218 d->mItemSyncer->setFullSyncItems(items);
1223 const Item::List &removedItems)
1226 d->createItemSyncInstanceIfMissing();
1227 if (d->mItemSyncer) {
1228 d->mItemSyncer->setIncrementalSyncItems(changedItems, removedItems);
1232void ResourceBasePrivate::slotItemSyncDone(KJob *job)
1237 emit q->error(job->errorString());
1239 scheduler->taskDone();
1242void ResourceBasePrivate::slotDelayedEmitProgress()
1245 if (mAutomaticProgressReporting) {
1246 emit q->percent(mUnemittedProgress);
1248 Q_FOREACH (
const QVariantMap &statusMap, mUnemittedAdvancedStatus) {
1249 emit q->advancedStatus(statusMap);
1252 mUnemittedProgress = 0;
1253 mUnemittedAdvancedStatus.clear();
1256void ResourceBasePrivate::slotPercent(KJob *job,
unsigned long percent)
1258 mUnemittedProgress = percent;
1261 if (collection.isValid()) {
1262 QVariantMap statusMap;
1263 statusMap.insert(QLatin1String(
"key"), QString::fromLatin1(
"collectionSyncProgress"));
1264 statusMap.insert(QLatin1String(
"collectionId"), collection.id());
1265 statusMap.insert(QLatin1String(
"percent"),
static_cast<unsigned int>(percent));
1267 mUnemittedAdvancedStatus[collection.id()] = statusMap;
1270 if (percent == 100) {
1271 mProgressEmissionCompressor.stop();
1272 slotDelayedEmitProgress();
1273 }
else if (!mProgressEmissionCompressor.isActive()) {
1274 mProgressEmissionCompressor.start();
1281 d->mHierarchicalRid = enable;
1287 d->scheduler->scheduleCustomTask(receiver, method, argument, priority);
1293 d->scheduler->taskDone();
1308 d->mItemTransactionMode = mode;
1314 if (!d->mItemSyncFetchScope) {
1317 *(d->mItemSyncFetchScope) = fetchScope;
1323 d->mItemMergeMode = mode;
1329 d->mAutomaticProgressReporting = enabled;
1335 return d->dumpNotificationListToString();
1341 return d->dumpToString();
1347 return d->dumpMemoryInfo();
1353 return d->dumpMemoryInfoToString();
1356#include "resourcebase.moc"
1357#include "moc_resourcebase.cpp"
The base class for all Akonadi agents and resources.
virtual int status() const
This method returns the current status code of the agent.
bool isOnline() const
Returns whether the agent is currently online.
void setAgentName(const QString &name)
This method is used to set the name of the agent.
ChangeRecorder * changeRecorder() const
Returns the Akonadi::ChangeRecorder object used for monitoring.
QString agentName() const
Returns the name of the agent.
@ Running
The agent is working on something.
void percent(int progress)
This signal should be emitted whenever the progress of an action in the agent (e.g.
void abortRequested()
Emitted when another application has remotely asked the agent to abort its current operation.
QString identifier() const
Returns the instance identifier of this agent.
void agentNameChanged(const QString &name)
This signal is emitted whenever the name of the agent has changed.
void error(const QString &message)
This signal shall be used to report errors.
bool isEmpty() const
Returns whether there are recorded changes.
void changeProcessed()
Removes the previously emitted change from the records.
Job that deletes a collection in the Akonadi storage.
Job that fetches collections from the Akonadi storage.
void setFetchScope(const CollectionFetchScope &fetchScope)
Sets the collection fetch scope.
CollectionFetchScope & fetchScope()
Returns the collection fetch scope.
@ Recursive
List all sub-collections.
@ FirstLevel
Only list direct sub-collections of the base collection.
@ Base
Only fetch the base collection.
Collection::List collections() const
Returns the list of fetched collection.
@ Sync
Only retrieve collections for synchronization, taking the local preference and enabled into account.
void setResource(const QString &resource)
Sets a resource filter, that is only collections owned by the specified resource are retrieved.
void setListFilter(ListFilter)
Sets a filter for the collections to be listed.
Job that modifies a collection in the Akonadi storage.
Represents a collection of PIM items.
QString resource() const
Returns the identifier of the resource owning the collection.
QStringList contentMimeTypes() const
Returns a list of possible content mimetypes, e.g.
static QString mimeType()
Returns the mimetype used for collections.
QString displayName() const
Returns the display name (EntityDisplayAttribute::displayName()) if set, and Collection::name() other...
@ ListSync
Listing for synchronization.
static Collection root()
Returns the root collection.
static QString virtualMimeType()
Returns the mimetype used for virtual collections.
QList< Collection > List
Describes a list of collections.
bool isVirtual() const
Returns whether the collection is virtual, for example a search collection.
QString remoteId() const
Returns the remote id of the entity.
Helper job to invalidate item cache for an entire collection.
Job that fetches items from the Akonadi storage.
ItemFetchScope & fetchScope()
Returns the item fetch scope.
Item::List items() const
Returns the fetched items.
Specifies which parts of an item should be fetched from the Akonadi storage.
void setCacheOnly(bool cacheOnly)
Sets whether payload data should be requested from remote sources or just from the local cache.
void fetchAttribute(const QByteArray &type, bool fetch=true)
Sets whether the attribute of the given type should be fetched.
void setAncestorRetrieval(AncestorRetrieval ancestorDepth)
Sets how many levels of ancestor collections should be included in the retrieval.
Job that modifies an existing item in the Akonadi storage.
void setIgnorePayload(bool ignore)
Sets whether the payload of the modified item shall be omitted from transmission to the Akonadi stora...
void disableRevisionCheck()
Disables the check of the revision number.
void setUpdateGid(bool update)
Sets whether the GID shall be updated either from the gid parameter or by extracting it from the payl...
Syncs between items known to a client (usually a resource) and the Akonadi storage.
TransactionMode
Transaction mode used by ItemSync.
@ UserCanceled
The user canceld this job.
CollectionFetchScope & collectionFetchScope()
Returns the collection fetch scope.
Helper class for expanding inter-resource collection moves inside ResourceBase.
void setCollection(const Akonadi::Collection &collection, const Akonadi::Collection &parentCollection)
Set the collection that is actually moved.
The base class for all Akonadi resources.
void setItemTransactionMode(ItemSync::TransactionMode mode)
Set transaction mode for item sync'ing.
QString dumpNotificationListToString() const
Dump the contents of the current ChangeReplay.
void dumpMemoryInfo() const
Dumps memory usage information to stdout.
void taskDone()
Indicate that the current task is finished.
void invalidateCache(const Collection &collection)
Call this method to invalidate all cached content in collection.
void collectionAttributesRetrieved(const Collection &collection)
Call this method from retrieveCollectionAttributes() once the result is available.
int itemSyncBatchSize() const
Returns the batch size used during the item sync.
void setItemSyncBatchSize(int batchSize)
Set the batch size used during the item sync.
void doSetOnline(bool online)
Inherited from AgentBase.
void setDisableAutomaticItemDeliveryDone(bool disable)
Disables the automatic completion of the item sync, based on the number of delivered items.
static int init(int argc, char **argv)
Use this method in the main function of your resource application to initialize your resource subclas...
void setItemSynchronizationFetchScope(const ItemFetchScope &fetchScope)
Set the fetch scope applied for item synchronization.
void synchronizeCollection(qint64 id)
This method is called whenever the collection with the given id shall be synchronized.
virtual void retrieveCollections()=0
Retrieve the collection tree from the remote server and supply it via collectionsRetrieved() or colle...
QString dumpSchedulerToString() const
Dump the state of the scheduler.
QString dumpMemoryInfoToString() const
Returns a string with memory usage information.
SchedulePriority
Describes the scheduling priority of a task that has been queued for execution.
void setKeepLocalCollectionChanges(const QSet< QByteArray > &parts)
Allows to keep locally changed collection parts during the collection sync.
void attributesSynchronized(qlonglong collectionId)
Emitted when a collection attributes synchronization has been completed.
void itemsRetrievalDone()
Call this method to indicate you finished synchronizing the current collection.
void setTotalItems(int amount)
Call this method when you want to use the itemsRetrieved() method in streaming mode and indicate the ...
void setAutomaticProgressReporting(bool enabled)
Enable or disable automatic progress reporting.
void collectionsRetrieved(const Collection::List &collections)
Call this to supply the full folder tree retrieved from the remote server.
void deferTask()
Stops the execution of the current task and continues with the next one.
void setItemStreamingEnabled(bool enable)
Enable item streaming.
QString name() const
Returns the name of the resource.
void collectionsRetrievalDone()
Call this method to indicate you finished synchronizing the collection tree.
void clearCache()
Call this method to remove all items and collections of the resource from the server cache.
void scheduleCustomTask(QObject *receiver, const char *method, const QVariant &argument, SchedulePriority priority=Append)
Schedules a custom task in the internal scheduler.
void collectionsRetrievedIncremental(const Collection::List &changedCollections, const Collection::List &removedCollections)
Call this to supply incrementally retrieved collections from the remote server.
void itemRetrieved(const Item &item)
Call this method from retrieveItem() once the result is available.
void synchronize()
This method is called whenever the resource should start synchronize all data.
void changeCommitted(const Item &item)
Resets the dirty flag of the given item and updates the remote id.
void itemsRetrieved(const Item::List &items)
Call this method to supply the full collection listing from the remote server.
Collection currentCollection() const
Returns the collection that is currently synchronized.
void nameChanged(const QString &name)
This signal is emitted whenever the name of the resource has changed.
void cancelTask()
Stops the execution of the current task and continues with the next one.
void itemsRetrievedIncremental(const Item::List &changedItems, const Item::List &removedItems)
Call this method to supply incrementally retrieved items from the remote server.
void setItemMergingMode(ItemSync::MergeMode mode)
Set merge mode for item sync'ing.
void setHierarchicalRemoteIdentifiersEnabled(bool enable)
Indicate the use of hierarchical remote identifiers.
ResourceBase(const QString &id)
Creates a base resource.
void retrieveCollectionAttributes(const Akonadi::Collection &collection)
Retrieve the attributes of a single collection from the backend.
void collectionTreeSynchronized()
Emitted when a collection tree synchronization has been completed.
void setName(const QString &name)
This method is used to set the name of the resource.
void synchronizeCollectionTree()
Refetches the Collections.
void setCollectionStreamingEnabled(bool enable)
Enable collection streaming, that is collections don't have to be delivered at once as result of a re...
void changesCommitted(const Item::List &items)
Resets the dirty flag of all given items and updates remote ids.
void abortActivity()
Abort any activity in progress in the backend.
void synchronizeCollectionAttributes(qint64 id)
This method is called whenever the collection with the given id shall have its attributes synchronize...
Item currentItem() const
Returns the item that is currently retrieved.
~ResourceBase()
Destroys the base resource.
Job that selects a resource context for remote identifier based operations.
static QString agentServiceName(ServiceAgentType agentType, const QString &identifier)
Returns the namespaced D-Bus service name for an agent of type agentType with agent identifier identi...
static QString addNamespace(const QString &string)
Adds the multi-instance namespace to string if required (with '_' as separator).
Job that modifies a tag in the Akonadi storage.
Base class for jobs that need to run a sequence of sub-jobs in a transaction.
FreeBusyManager::Singleton.