Create PropagateItemJobs only before starting them

This commit is contained in:
Jocelyn Turcotte 2017-02-13 23:16:20 +01:00
parent ff2d98596f
commit 16ad3d5c88
2 changed files with 51 additions and 19 deletions

View file

@ -381,22 +381,22 @@ void OwncloudPropagator::start(const SyncFileItemVector& items)
} }
} else { } else {
PropagateDirectory* currentDirJob = directories.top().second; PropagateDirectory* currentDirJob = directories.top().second;
currentDirJob->append(dir); currentDirJob->appendJob(dir);
} }
directories.push(qMakePair(item->destination() + "/" , dir)); directories.push(qMakePair(item->destination() + "/" , dir));
} else if (PropagateItemJob* current = createJob(item)) { } else {
if (item->_instruction == CSYNC_INSTRUCTION_TYPE_CHANGE) { if (item->_instruction == CSYNC_INSTRUCTION_TYPE_CHANGE) {
// will delete directories, so defer execution // will delete directories, so defer execution
directoriesToRemove.prepend(current); directoriesToRemove.prepend(createJob(item));
removedDirectory = item->_file + "/"; removedDirectory = item->_file + "/";
} else { } else {
directories.top().second->append(current); directories.top().second->appendTask(item);
} }
} }
} }
foreach(PropagatorJob* it, directoriesToRemove) { foreach(PropagatorJob* it, directoriesToRemove) {
_rootJob->append(it); _rootJob->appendJob(it);
} }
connect(_rootJob.data(), SIGNAL(itemCompleted(const SyncFileItemPtr &)), connect(_rootJob.data(), SIGNAL(itemCompleted(const SyncFileItemPtr &)),
@ -604,12 +604,6 @@ bool PropagatorCompositeJob::scheduleNextJob()
// Start the composite job // Start the composite job
if (_state == NotYetStarted) { if (_state == NotYetStarted) {
_state = Running; _state = Running;
if (_jobsToDo.isEmpty()) {
_state = Finished;
emit finished(SyncFileItem::Success);
return true;
}
} }
// Ask all the running composite jobs if they have something new to schedule. // Ask all the running composite jobs if they have something new to schedule.
@ -635,6 +629,26 @@ bool PropagatorCompositeJob::scheduleNextJob()
_runningJobs.append(nextJob); _runningJobs.append(nextJob);
return possiblyRunNextJob(nextJob); return possiblyRunNextJob(nextJob);
} }
while (!_tasksToDo.isEmpty()) {
SyncFileItemPtr nextTask = _tasksToDo.first();
_tasksToDo.remove(0);
PropagatorJob *job = propagator()->createJob(nextTask);
if (!job) {
qWarning() << "Useless task found for file" << nextTask->destination() << "instruction" << nextTask->_instruction;
continue;
}
_runningJobs.append(job);
return possiblyRunNextJob(job);
}
// If neither us or our children had stuff left to do we could hang. Make sure
// we mark this job as finished so that the propagator can schedule a new one.
if (_jobsToDo.isEmpty() && _tasksToDo.isEmpty() && _runningJobs.isEmpty()) {
// Our parent jobs are already iterating over their running jobs, post to the event loop
// to avoid removing ourself from that list while they iterate.
QMetaObject::invokeMethod(this, "finalize", Qt::QueuedConnection);
}
return false; return false;
} }
@ -658,15 +672,24 @@ void PropagatorCompositeJob::slotSubJobFinished(SyncFileItem::Status status)
_hasError = status; _hasError = status;
} }
// Check if we finished processing all the jobs. if (_jobsToDo.isEmpty() && _tasksToDo.isEmpty() && _runningJobs.isEmpty()) {
if (_jobsToDo.isEmpty() && _runningJobs.isEmpty()) { finalize();
_state = Finished;
emit finished(_hasError == SyncFileItem::NoStatus ? SyncFileItem::Success : _hasError);
} else { } else {
emit ready(); emit ready();
} }
} }
void PropagatorCompositeJob::finalize()
{
// The propagator will do parallel scheduling and this could be posted
// multiple times on the event loop, ignore the duplicate calls.
if (_state == Finished)
return;
_state = Finished;
emit finished(_hasError == SyncFileItem::NoStatus ? SyncFileItem::Success : _hasError);
}
qint64 PropagatorCompositeJob::committedDiskSpace() const qint64 PropagatorCompositeJob::committedDiskSpace() const
{ {
qint64 needed = 0; qint64 needed = 0;

View file

@ -178,6 +178,7 @@ class PropagatorCompositeJob : public PropagatorJob {
Q_OBJECT Q_OBJECT
public: public:
QVector<PropagatorJob *> _jobsToDo; QVector<PropagatorJob *> _jobsToDo;
SyncFileItemVector _tasksToDo;
QVector<PropagatorJob *> _runningJobs; QVector<PropagatorJob *> _runningJobs;
SyncFileItem::Status _hasError; // NoStatus, or NormalError / SoftError if there was an error SyncFileItem::Status _hasError; // NoStatus, or NormalError / SoftError if there was an error
@ -191,8 +192,11 @@ public:
qDeleteAll(_runningJobs); qDeleteAll(_runningJobs);
} }
void append(PropagatorJob *subJob) { void appendJob(PropagatorJob *job) {
_jobsToDo.append(subJob); _jobsToDo.append(job);
}
void appendTask(const SyncFileItemPtr &item) {
_tasksToDo.append(item);
} }
virtual bool scheduleNextJob() Q_DECL_OVERRIDE; virtual bool scheduleNextJob() Q_DECL_OVERRIDE;
@ -216,6 +220,7 @@ private slots:
} }
void slotSubJobFinished(SyncFileItem::Status status); void slotSubJobFinished(SyncFileItem::Status status);
void finalize();
}; };
/** /**
@ -233,8 +238,12 @@ public:
explicit PropagateDirectory(OwncloudPropagator *propagator, const SyncFileItemPtr &item = SyncFileItemPtr(new SyncFileItem)); explicit PropagateDirectory(OwncloudPropagator *propagator, const SyncFileItemPtr &item = SyncFileItemPtr(new SyncFileItem));
void append(PropagatorJob *subJob) { void appendJob(PropagatorJob *job) {
_subJobs.append(subJob); _subJobs.appendJob(job);
}
void appendTask(const SyncFileItemPtr &item) {
_subJobs.appendTask(item);
} }
virtual bool scheduleNextJob() Q_DECL_OVERRIDE; virtual bool scheduleNextJob() Q_DECL_OVERRIDE;