New discovery algorithm: Parallel PROPFIND

This commit is contained in:
Olivier Goffart 2018-10-12 14:44:33 +02:00 committed by Kevin Ottens
parent c8eff3da2d
commit afed46afcc
No known key found for this signature in database
GPG key ID: 074BBBCB8DECC9E2
8 changed files with 53 additions and 31 deletions

View file

@ -750,6 +750,9 @@ void Folder::setSyncOptions()
opt._maxChunkSize = cfgFile.maxChunkSize();
}
int maxParallel = qgetenv("OWNCLOUD_MAX_PARALLEL").toUInt();
opt._parallelNetworkJobs = maxParallel ? maxParallel : _accountState->account()->isHttp2Supported() ? 20 : 6;
// Previously min/max chunk size values didn't exist, so users might
// have setups where the chunk size exceeds the new min/max default
// values. To cope with this, adjust min/max to always include the

View file

@ -38,7 +38,11 @@ void ProcessDirectoryJob::start()
serverJob = new DiscoverySingleDirectoryJob(_discoveryData->_account,
_discoveryData->_remoteFolder + _currentFolder._server, this);
connect(serverJob, &DiscoverySingleDirectoryJob::etag, this, &ProcessDirectoryJob::etag);
_discoveryData->_currentlyActiveJobs++;
_pendingAsyncJobs++;
connect(serverJob, &DiscoverySingleDirectoryJob::finished, this, [this, serverJob](const auto &results) {
_discoveryData->_currentlyActiveJobs--;
_pendingAsyncJobs--;
if (results) {
_serverEntries = *results;
_hasServerEntries = true;
@ -252,8 +256,7 @@ void ProcessDirectoryJob::process()
}
processFile(std::move(path), localEntry, serverEntry, record);
}
progress();
QTimer::singleShot(0, _discoveryData, &DiscoveryPhase::scheduleMoreJobs);
}
bool ProcessDirectoryJob::handleExcluded(const QString &path, bool isDirectory, bool isHidden, bool isSymlink)
@ -444,7 +447,7 @@ void ProcessDirectoryJob::processFileAnalyzeRemoteInfo(
if (!result) {
processFileAnalyzeLocalInfo(item, path, localEntry, serverEntry, dbEntry, _queryServer);
}
progress();
QTimer::singleShot(0, _discoveryData, &DiscoveryPhase::scheduleMoreJobs);
});
return;
}
@ -567,12 +570,12 @@ void ProcessDirectoryJob::processFileAnalyzeRemoteInfo(
auto job = new RequestEtagJob(_discoveryData->_account, originalPath, this);
connect(job, &RequestEtagJob::finishedWithResult, this, [=](const Result<QString> &etag) mutable {
_pendingAsyncJobs--;
QTimer::singleShot(0, _discoveryData, &DiscoveryPhase::scheduleMoreJobs);
if (etag.errorCode() != 404 ||
// Somehow another item claimed this original path, consider as if it existed
_discoveryData->_renamedItems.contains(originalPath)) {
// If the file exist or if there is another error, consider it is a new file.
postProcessServerNew();
progress();
return;
}
@ -588,7 +591,6 @@ void ProcessDirectoryJob::processFileAnalyzeRemoteInfo(
postProcessRename(path);
processFileFinalize(item, path, item->isDirectory(), item->_instruction == CSYNC_INSTRUCTION_RENAME ? NormalQuery : ParentDontExist, _queryServer);
progress();
});
job->start();
done = true; // Ideally, if the origin still exist on the server, we should continue searching... but that'd be difficult
@ -939,7 +941,7 @@ void ProcessDirectoryJob::processFileAnalyzeLocalInfo(
}
processFileFinalize(item, path, item->isDirectory(), NormalQuery, recurseQueryServer);
_pendingAsyncJobs--;
progress();
QTimer::singleShot(0, _discoveryData, &DiscoveryPhase::scheduleMoreJobs);
});
job->start();
return;
@ -1173,23 +1175,13 @@ void ProcessDirectoryJob::subJobFinished()
int count = _runningJobs.removeAll(job);
ASSERT(count == 1);
job->deleteLater();
progress();
QTimer::singleShot(0, _discoveryData, &DiscoveryPhase::scheduleMoreJobs);
}
void ProcessDirectoryJob::progress()
int ProcessDirectoryJob::progress(int nbJobs)
{
int maxRunning = 3; // FIXME
if (_pendingAsyncJobs + _runningJobs.size() > maxRunning)
return;
if (!_queuedJobs.empty()) {
auto f = _queuedJobs.front();
_queuedJobs.pop_front();
_runningJobs.push_back(f);
f->start();
return;
}
if (_runningJobs.empty() && _pendingAsyncJobs == 0) {
if (_queuedJobs.empty() && _runningJobs.empty() && _pendingAsyncJobs == 0) {
_pendingAsyncJobs = -1; // We're finished, we don't want to emit finished again
if (_dirItem) {
if (_childModified && _dirItem->_instruction == CSYNC_INSTRUCTION_REMOVE) {
// re-create directory that has modified contents
@ -1209,6 +1201,22 @@ void ProcessDirectoryJob::progress()
}
emit finished();
}
int started = 0;
foreach (auto *rj, _runningJobs) {
started += rj->progress(nbJobs - started);
if (started >= nbJobs)
return started;
}
while (started < nbJobs && !_queuedJobs.empty()) {
auto f = _queuedJobs.front();
_queuedJobs.pop_front();
_runningJobs.push_back(f);
f->start();
started++;
}
return started;
}
void ProcessDirectoryJob::dbError()

View file

@ -46,6 +46,8 @@ public:
{
}
void start();
/** Start up to nbJobs, return the number of job started */
int progress(int nbJobs);
SyncFileItemPtr _dirItem;
@ -83,7 +85,6 @@ private:
bool checkPermissions(const SyncFileItemPtr &item);
void processBlacklisted(const PathTuple &, const LocalInfo &, const SyncJournalFileRecord &dbEntry);
void subJobFinished();
void progress();
/** An DB operation failed */
void dbError();

View file

@ -148,6 +148,7 @@ QString DiscoveryPhase::adjustRenamedPath(const QString &original) const
void DiscoveryPhase::startJob(ProcessDirectoryJob *job)
{
connect(job, &ProcessDirectoryJob::finished, this, [this, job] {
_currentRootJob = nullptr;
if (job->_dirItem)
emit itemDiscovered(job->_dirItem);
job->deleteLater();
@ -158,9 +159,18 @@ void DiscoveryPhase::startJob(ProcessDirectoryJob *job)
emit finished();
}
});
_currentRootJob = job;
job->start();
}
void DiscoveryPhase::scheduleMoreJobs()
{
auto limit = qMax(1, _syncOptions._parallelNetworkJobs);
if (_currentRootJob && _currentlyActiveJobs < limit) {
_currentRootJob->progress(limit - _currentlyActiveJobs);
}
}
DiscoverySingleDirectoryJob::DiscoverySingleDirectoryJob(const AccountPtr &account, const QString &path, QObject *parent)
: QObject(parent)
, _subPath(path)

View file

@ -122,6 +122,9 @@ public:
class DiscoveryPhase : public QObject
{
Q_OBJECT
ProcessDirectoryJob *_currentRootJob = nullptr;
public:
QString _localDir; // absolute path to the local directory. ends with '/'
QString _remoteFolder; // remote folder, ends with '/'
@ -132,6 +135,7 @@ public:
QStringList _selectiveSyncWhiteList;
ExcludedFiles *_excludes;
QString _invalidFilenamePattern; // FIXME: maybe move in ExcludedFiles
int _currentlyActiveJobs = 0;
bool _ignoreHiddenFiles = false;
std::function<bool(const QString &)> _shouldDiscoverLocaly;
@ -151,6 +155,7 @@ public:
QByteArray _dataFingerprint;
void scheduleMoreJobs();
signals:
void fatalError(const QString &errorString);
void itemDiscovered(const SyncFileItemPtr &item);

View file

@ -84,7 +84,7 @@ int OwncloudPropagator::maximumActiveTransferJob()
// disable parallelism when there is a network limit.
return 1;
}
return qMin(3, qCeil(hardMaximumActiveJob() / 2.));
return qMin(3, qCeil(_syncOptions._parallelNetworkJobs / 2.));
}
/* The maximum number of active jobs in parallel */
@ -92,12 +92,7 @@ int OwncloudPropagator::hardMaximumActiveJob()
{
if (!_syncOptions._parallelNetworkJobs)
return 1;
static int max = qgetenv("OWNCLOUD_MAX_PARALLEL").toUInt();
if (max)
return max;
if (_account->isHttp2Supported())
return 20;
return 6; // (Qt cannot do more anyway)
return _syncOptions._parallelNetworkJobs;
}
PropagateItemJob::~PropagateItemJob()

View file

@ -61,8 +61,8 @@ struct SyncOptions
*/
std::chrono::milliseconds _targetChunkUploadDuration = std::chrono::minutes(1);
/** Whether parallel network jobs are allowed. */
bool _parallelNetworkJobs = true;
/** The maximum number of active jobs in parallel */
int _parallelNetworkJobs = 6;
};

View file

@ -433,7 +433,7 @@ private slots:
// Disable parallel uploads
SyncOptions syncOptions;
syncOptions._parallelNetworkJobs = false;
syncOptions._parallelNetworkJobs = 0;
fakeFolder.syncEngine().setSyncOptions(syncOptions);
// Produce an error based on upload size