Merge pull request #3785 from nextcloud/feature/nextGenerationPropagator

Feature/next generation propagator
This commit is contained in:
Matthieu Gallien 2021-10-26 16:06:10 +02:00 committed by GitHub
commit 2a108b86c7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 277 additions and 60 deletions

View file

@ -48,6 +48,7 @@ namespace OCC {
Q_LOGGING_CATEGORY(lcPropagator, "nextcloud.sync.propagator", QtInfoMsg)
Q_LOGGING_CATEGORY(lcDirectory, "nextcloud.sync.propagator.directory", QtInfoMsg)
Q_LOGGING_CATEGORY(lcRootDirectory, "nextcloud.sync.propagator.root.directory", QtInfoMsg)
Q_LOGGING_CATEGORY(lcCleanupPolls, "nextcloud.sync.propagator.cleanuppolls", QtInfoMsg)
qint64 criticalFreeSpaceLimit()
@ -359,15 +360,13 @@ PropagateItemJob *OwncloudPropagator::createJob(const SyncFileItemPtr &item)
job->setDeleteExistingFolder(deleteExisting);
return job;
} else {
PropagateUploadFileCommon *job = nullptr;
if (item->_size > syncOptions()._initialChunkSize && account()->capabilities().chunkingNg()) {
// Item is above _initialChunkSize, thus will be classified as to be chunked
job = new PropagateUploadFileNG(this, item);
if (deleteExisting || !isDelayedUploadItem(item)) {
auto job = createUploadJob(item, deleteExisting);
return job.release();
} else {
job = new PropagateUploadFileV1(this, item);
pushDelayedUploadTask(item);
return nullptr;
}
job->setDeleteExisting(deleteExisting);
return job;
}
case CSYNC_INSTRUCTION_RENAME:
if (item->_direction == SyncFileItem::Up) {
@ -384,6 +383,33 @@ PropagateItemJob *OwncloudPropagator::createJob(const SyncFileItemPtr &item)
return nullptr;
}
std::unique_ptr<PropagateUploadFileCommon> OwncloudPropagator::createUploadJob(SyncFileItemPtr item, bool deleteExisting)
{
auto job = std::unique_ptr<PropagateUploadFileCommon>{};
if (item->_size > syncOptions()._initialChunkSize && account()->capabilities().chunkingNg()) {
// Item is above _initialChunkSize, thus will be classified as to be chunked
job = std::make_unique<PropagateUploadFileNG>(this, item);
} else {
job = std::make_unique<PropagateUploadFileV1>(this, item);
}
job->setDeleteExisting(deleteExisting);
return job;
}
void OwncloudPropagator::pushDelayedUploadTask(SyncFileItemPtr item)
{
_delayedTasks.push_back(item);
}
void OwncloudPropagator::resetDelayedUploadTasks()
{
_scheduleDelayedTasks = false;
_delayedTasks.clear();
}
qint64 OwncloudPropagator::smallFileSize()
{
const qint64 smallFileSize = 100 * 1024; //default to 1 MB. Not dynamic right now.
@ -419,6 +445,7 @@ void OwncloudPropagator::start(SyncFileItemVector &&items)
items.end());
}
resetDelayedUploadTasks();
_rootJob.reset(new PropagateRootDirectory(this));
QStack<QPair<QString /* directory name */, PropagateDirectory * /* job */>> directories;
directories.push(qMakePair(QString(), _rootJob.data()));
@ -474,56 +501,17 @@ void OwncloudPropagator::start(SyncFileItemVector &&items)
}
if (item->isDirectory()) {
auto *dir = new PropagateDirectory(this, item);
if (item->_instruction == CSYNC_INSTRUCTION_TYPE_CHANGE
&& item->_direction == SyncFileItem::Up) {
// Skip all potential uploads to the new folder.
// Processing them now leads to problems with permissions:
// checkForPermissions() has already run and used the permissions
// of the file we're about to delete to decide whether uploading
// to the new dir is ok...
foreach (const SyncFileItemPtr &item2, items) {
if (item2->destination().startsWith(item->destination() + "/")) {
item2->_instruction = CSYNC_INSTRUCTION_NONE;
_anotherSyncNeeded = true;
}
}
}
if (item->_instruction == CSYNC_INSTRUCTION_REMOVE) {
// We do the removal of directories at the end, because there might be moves from
// these directories that will happen later.
directoriesToRemove.prepend(dir);
removedDirectory = item->_file + "/";
// We should not update the etag of parent directories of the removed directory
// since it would be done before the actual remove (issue #1845)
// NOTE: Currently this means that we don't update those etag at all in this sync,
// but it should not be a problem, they will be updated in the next sync.
for (int i = 0; i < directories.size(); ++i) {
if (directories[i].second->_item->_instruction == CSYNC_INSTRUCTION_UPDATE_METADATA)
directories[i].second->_item->_instruction = CSYNC_INSTRUCTION_NONE;
}
} else {
PropagateDirectory *currentDirJob = directories.top().second;
currentDirJob->appendJob(dir);
}
directories.push(qMakePair(item->destination() + "/", dir));
startDirectoryPropagation(item,
directories,
directoriesToRemove,
removedDirectory,
items);
} else {
if (item->_instruction == CSYNC_INSTRUCTION_TYPE_CHANGE) {
// will delete directories, so defer execution
directoriesToRemove.prepend(createJob(item));
removedDirectory = item->_file + "/";
} else {
directories.top().second->appendTask(item);
}
if (item->_instruction == CSYNC_INSTRUCTION_CONFLICT) {
// This might be a file or a directory on the local side. If it's a
// directory we want to skip processing items inside it.
maybeConflictDirectory = item->_file + "/";
}
startFilePropagation(item,
directories,
directoriesToRemove,
removedDirectory,
maybeConflictDirectory);
}
}
@ -537,6 +525,75 @@ void OwncloudPropagator::start(SyncFileItemVector &&items)
scheduleNextJob();
}
void OwncloudPropagator::startDirectoryPropagation(const SyncFileItemPtr &item,
QStack<QPair<QString, PropagateDirectory *>> &directories,
QVector<PropagatorJob *> &directoriesToRemove,
QString &removedDirectory,
const SyncFileItemVector &items)
{
auto directoryPropagationJob = std::make_unique<PropagateDirectory>(this, item);
if (item->_instruction == CSYNC_INSTRUCTION_TYPE_CHANGE
&& item->_direction == SyncFileItem::Up) {
// Skip all potential uploads to the new folder.
// Processing them now leads to problems with permissions:
// checkForPermissions() has already run and used the permissions
// of the file we're about to delete to decide whether uploading
// to the new dir is ok...
foreach (const SyncFileItemPtr &dirItem, items) {
if (dirItem->destination().startsWith(item->destination() + "/")) {
dirItem->_instruction = CSYNC_INSTRUCTION_NONE;
_anotherSyncNeeded = true;
}
}
}
if (item->_instruction == CSYNC_INSTRUCTION_REMOVE) {
// We do the removal of directories at the end, because there might be moves from
// these directories that will happen later.
directoriesToRemove.prepend(directoryPropagationJob.get());
removedDirectory = item->_file + "/";
// We should not update the etag of parent directories of the removed directory
// since it would be done before the actual remove (issue #1845)
// NOTE: Currently this means that we don't update those etag at all in this sync,
// but it should not be a problem, they will be updated in the next sync.
for (int i = 0; i < directories.size(); ++i) {
if (directories[i].second->_item->_instruction == CSYNC_INSTRUCTION_UPDATE_METADATA) {
directories[i].second->_item->_instruction = CSYNC_INSTRUCTION_NONE;
}
}
} else {
const auto currentDirJob = directories.top().second;
currentDirJob->appendJob(directoryPropagationJob.get());
}
directories.push(qMakePair(item->destination() + "/", directoryPropagationJob.release()));
}
void OwncloudPropagator::startFilePropagation(const SyncFileItemPtr &item,
QStack<QPair<QString, PropagateDirectory *> > &directories,
QVector<PropagatorJob *> &directoriesToRemove,
QString &removedDirectory,
QString &maybeConflictDirectory)
{
if (item->_instruction == CSYNC_INSTRUCTION_TYPE_CHANGE) {
// will delete directories, so defer execution
auto job = createJob(item);
if (job) {
directoriesToRemove.prepend(job);
}
removedDirectory = item->_file + "/";
} else {
directories.top().second->appendTask(item);
}
if (item->_instruction == CSYNC_INSTRUCTION_CONFLICT) {
// This might be a file or a directory on the local side. If it's a
// directory we want to skip processing items inside it.
maybeConflictDirectory = item->_file + "/";
}
}
const SyncOptions &OwncloudPropagator::syncOptions() const
{
return _syncOptions;
@ -802,6 +859,21 @@ Result<Vfs::ConvertToPlaceholderResult, QString> OwncloudPropagator::staticUpdat
return Vfs::ConvertToPlaceholderResult::Ok;
}
bool OwncloudPropagator::isDelayedUploadItem(const SyncFileItemPtr &item) const
{
return !_scheduleDelayedTasks && !item->_isEncrypted;
}
void OwncloudPropagator::setScheduleDelayedTasks(bool active)
{
_scheduleDelayedTasks = active;
}
void OwncloudPropagator::clearDelayedTasks()
{
_delayedTasks.clear();
}
// ================================================================================
PropagatorJob::PropagatorJob(OwncloudPropagator *propagator)
@ -1012,6 +1084,7 @@ void PropagateDirectory::slotFirstJobFinished(SyncFileItem::Status status)
// Synchronously abort
abort(AbortType::Synchronous);
_state = Finished;
qCInfo(lcPropagator) << "PropagateDirectory::slotFirstJobFinished" << "emit finished" << status;
emit finished(status);
}
return;
@ -1054,6 +1127,7 @@ void PropagateDirectory::slotSubJobsFinished(SyncFileItem::Status status)
}
}
_state = Finished;
qCInfo(lcPropagator) << "PropagateDirectory::slotSubJobsFinished" << "emit finished" << status;
emit finished(status);
}
@ -1106,21 +1180,37 @@ qint64 PropagateRootDirectory::committedDiskSpace() const
bool PropagateRootDirectory::scheduleSelfOrChild()
{
if (_state == Finished)
return false;
qCInfo(lcRootDirectory()) << "scheduleSelfOrChild" << _state << "pending uploads" << propagator()->delayedTasks().size() << "subjobs state" << _subJobs._state;
if (PropagateDirectory::scheduleSelfOrChild())
if (_state == Finished) {
return false;
}
if (PropagateDirectory::scheduleSelfOrChild() && propagator()->delayedTasks().empty()) {
return true;
}
// Important: Finish _subJobs before scheduling any deletes.
if (_subJobs._state != Finished)
if (_subJobs._state != Finished) {
return false;
}
if (!propagator()->delayedTasks().empty()) {
return scheduleDelayedJobs();
}
return _dirDeletionJobs.scheduleSelfOrChild();
}
void PropagateRootDirectory::slotSubJobsFinished(SyncFileItem::Status status)
{
qCInfo(lcRootDirectory()) << status << "slotSubJobsFinished" << _state << "pending uploads" << propagator()->delayedTasks().size() << "subjobs state" << _subJobs._state;
if (!propagator()->delayedTasks().empty()) {
scheduleDelayedJobs();
return;
}
if (status != SyncFileItem::Success
&& status != SyncFileItem::Restoration
&& status != SyncFileItem::Conflict) {
@ -1128,6 +1218,7 @@ void PropagateRootDirectory::slotSubJobsFinished(SyncFileItem::Status status)
// Synchronously abort
abort(AbortType::Synchronous);
_state = Finished;
qCInfo(lcPropagator) << "PropagateRootDirectory::slotSubJobsFinished" << "emit finished" << status;
emit finished(status);
}
return;
@ -1139,9 +1230,21 @@ void PropagateRootDirectory::slotSubJobsFinished(SyncFileItem::Status status)
void PropagateRootDirectory::slotDirDeletionJobsFinished(SyncFileItem::Status status)
{
_state = Finished;
qCInfo(lcPropagator) << "PropagateRootDirectory::slotDirDeletionJobsFinished" << "emit finished" << status;
emit finished(status);
}
bool PropagateRootDirectory::scheduleDelayedJobs()
{
qCInfo(lcPropagator) << "PropagateRootDirectory::scheduleDelayedJobs";
propagator()->setScheduleDelayedTasks(true);
auto bulkPropagatorJob = std::make_unique<BulkPropagatorJob>(propagator(), propagator()->delayedTasks());
propagator()->clearDelayedTasks();
_subJobs.appendJob(bulkPropagatorJob.release());
_subJobs._state = Running;
return _subJobs.scheduleSelfOrChild();
}
// ================================================================================
CleanupPollsJob::~CleanupPollsJob() = default;
@ -1200,4 +1303,14 @@ QString OwncloudPropagator::remotePath() const
{
return _remoteFolder;
}
BulkPropagatorJob::BulkPropagatorJob(OwncloudPropagator *propagator, const QVector<SyncFileItemPtr> &items)
: PropagatorCompositeJob(propagator)
, _items(items)
{
for(const auto &oneItemJob : _items) {
appendTask(oneItemJob);
}
_items.clear();
}
}

View file

@ -70,6 +70,8 @@ public:
Asynchronous
};
Q_ENUM(AbortType)
enum JobState {
NotYetStarted,
Running,
@ -77,6 +79,8 @@ public:
};
JobState _state;
Q_ENUM(JobState)
enum JobParallelism {
/** Jobs can be run in parallel to this job */
@ -88,6 +92,8 @@ public:
WaitForFinished,
};
Q_ENUM(JobParallelism)
virtual JobParallelism parallelism() { return FullParallelism; }
/**
@ -368,6 +374,23 @@ public:
private slots:
void slotSubJobsFinished(SyncFileItem::Status status) override;
void slotDirDeletionJobsFinished(SyncFileItem::Status status);
private:
bool scheduleDelayedJobs();
};
class BulkPropagatorJob : public PropagatorCompositeJob
{
Q_OBJECT
public:
explicit BulkPropagatorJob(OwncloudPropagator *propagator,
const QVector<SyncFileItemPtr> &items);
private:
QVector<SyncFileItemPtr> _items;
};
/**
@ -397,6 +420,8 @@ public:
}
};
class PropagateUploadFileCommon;
class OWNCLOUDSYNC_EXPORT OwncloudPropagator : public QObject
{
Q_OBJECT
@ -423,6 +448,18 @@ public:
void start(SyncFileItemVector &&_syncedItems);
void startDirectoryPropagation(const SyncFileItemPtr &item,
QStack<QPair<QString, PropagateDirectory*>> &directories,
QVector<PropagatorJob *> &directoriesToRemove,
QString &removedDirectory,
const SyncFileItemVector &items);
void startFilePropagation(const SyncFileItemPtr &item,
QStack<QPair<QString, PropagateDirectory*>> &directories,
QVector<PropagatorJob *> &directoriesToRemove,
QString &removedDirectory,
QString &maybeConflictDirectory);
const SyncOptions &syncOptions() const;
void setSyncOptions(const SyncOptions &syncOptions);
@ -572,6 +609,17 @@ public:
static Result<Vfs::ConvertToPlaceholderResult, QString> staticUpdateMetadata(const SyncFileItem &item, const QString localDir,
Vfs *vfs, SyncJournalDb * const journal);
Q_REQUIRED_RESULT bool isDelayedUploadItem(const SyncFileItemPtr &item) const;
Q_REQUIRED_RESULT const QVector<SyncFileItemPtr>& delayedTasks() const
{
return _delayedTasks;
}
void setScheduleDelayedTasks(bool active);
void clearDelayedTasks();
private slots:
void abortTimeout()
@ -611,6 +659,13 @@ signals:
void insufficientRemoteStorage();
private:
std::unique_ptr<PropagateUploadFileCommon> createUploadJob(SyncFileItemPtr item,
bool deleteExisting);
void pushDelayedUploadTask(SyncFileItemPtr item);
void resetDelayedUploadTasks();
AccountPtr _account;
QScopedPointer<PropagateRootDirectory> _rootJob;
SyncOptions _syncOptions;
@ -618,6 +673,9 @@ private:
const QString _localDir; // absolute path to the local directory. ends with '/'
const QString _remoteFolder; // remote folder, ends with '/'
QVector<SyncFileItemPtr> _delayedTasks;
bool _scheduleDelayedTasks = false;
};

View file

@ -1059,6 +1059,19 @@ OCC::SyncFileItemPtr ItemCompletedSpy::findItem(const QString &path) const
return OCC::SyncFileItemPtr::create();
}
OCC::SyncFileItemPtr ItemCompletedSpy::findItemWithExpectedRank(const QString &path, int rank) const
{
Q_ASSERT(size() > rank);
Q_ASSERT(!(*this)[rank].isEmpty());
auto item = (*this)[rank][0].value<OCC::SyncFileItemPtr>();
if (item->destination() == path) {
return item;
} else {
return OCC::SyncFileItemPtr::create();
}
}
FakeReply::FakeReply(QObject *parent)
: QNetworkReply(parent)
{

View file

@ -516,6 +516,8 @@ struct ItemCompletedSpy : QSignalSpy {
{}
OCC::SyncFileItemPtr findItem(const QString &path) const;
OCC::SyncFileItemPtr findItemWithExpectedRank(const QString &path, int rank) const;
};
// QTest::toString overloads

View file

@ -33,6 +33,14 @@ bool itemDidCompleteSuccessfully(const ItemCompletedSpy &spy, const QString &pat
return false;
}
bool itemDidCompleteSuccessfullyWithExpectedRank(const ItemCompletedSpy &spy, const QString &path, int rank)
{
if (auto item = spy.findItemWithExpectedRank(path, rank)) {
return item->_status == SyncFileItem::Success;
}
return false;
}
class TestSyncEngine : public QObject
{
Q_OBJECT
@ -82,6 +90,29 @@ private slots:
QCOMPARE(fakeFolder.currentLocalState(), fakeFolder.currentRemoteState());
}
void testDirUploadWithDelayedAlgorithm() {
FakeFolder fakeFolder{FileInfo::A12_B12_C12_S12()};
ItemCompletedSpy completeSpy(fakeFolder);
fakeFolder.localModifier().mkdir("Y");
fakeFolder.localModifier().insert("Y/d0");
fakeFolder.localModifier().mkdir("Z");
fakeFolder.localModifier().insert("Z/d0");
fakeFolder.localModifier().insert("A/a0");
fakeFolder.localModifier().insert("B/b0");
fakeFolder.localModifier().insert("r0");
fakeFolder.localModifier().insert("r1");
fakeFolder.syncOnce();
QVERIFY(itemDidCompleteSuccessfullyWithExpectedRank(completeSpy, "Y", 0));
QVERIFY(itemDidCompleteSuccessfullyWithExpectedRank(completeSpy, "Z", 1));
QVERIFY(itemDidCompleteSuccessfullyWithExpectedRank(completeSpy, "Y/d0", 2));
QVERIFY(itemDidCompleteSuccessfullyWithExpectedRank(completeSpy, "Z/d0", 3));
QVERIFY(itemDidCompleteSuccessfullyWithExpectedRank(completeSpy, "A/a0", 4));
QVERIFY(itemDidCompleteSuccessfullyWithExpectedRank(completeSpy, "B/b0", 5));
QVERIFY(itemDidCompleteSuccessfullyWithExpectedRank(completeSpy, "r0", 6));
QVERIFY(itemDidCompleteSuccessfullyWithExpectedRank(completeSpy, "r1", 7));
QCOMPARE(fakeFolder.currentLocalState(), fakeFolder.currentRemoteState());
}
void testLocalDelete() {
FakeFolder fakeFolder{FileInfo::A12_B12_C12_S12()};
ItemCompletedSpy completeSpy(fakeFolder);