Merge pull request #4529 from owncloud/dynamic_parallelism_scaling

Propagator: Pump in more requests if we think current ones are quick
This commit is contained in:
Markus Goetz 2016-03-02 15:23:58 +01:00
commit 266508b691
11 changed files with 81 additions and 23 deletions

View file

@ -490,6 +490,10 @@ void PropfindJob::start()
qWarning() << "Propfind with no properties!";
}
QNetworkRequest req;
// Always have a higher priority than the propagator because we use this from the UI
// and really want this to be done first (no matter what internal scheduling QNAM uses).
// Also possibly useful for avoiding false timeouts.
req.setPriority(QNetworkRequest::HighPriority);
req.setRawHeader("Depth", "0");
QByteArray propStr;
foreach (const QByteArray &prop, properties) {

View file

@ -87,6 +87,16 @@ int OwncloudPropagator::maximumActiveJob()
return max;
}
int OwncloudPropagator::hardMaximumActiveJob()
{
int max = maximumActiveJob();
return max*2;
// FIXME: Wondering if we should hard-limit to 1 if maximumActiveJob() is 1
// to support our old use case of limiting concurrency (when "automatic" bandwidth
// limiting is set. But this causes https://github.com/owncloud/client/issues/4081
}
/** Updates, creates or removes a blacklist entry for the given item.
*
* Returns whether the file is in the blacklist now.
@ -518,10 +528,32 @@ QString OwncloudPropagator::getFilePath(const QString& tmp_file_name) const
void OwncloudPropagator::scheduleNextJob()
{
if (this->_activeJobs < maximumActiveJob()) {
// TODO: If we see that the automatic up-scaling has a bad impact we
// need to check how to avoid this.
// Down-scaling on slow networks? https://github.com/owncloud/client/issues/3382
// Making sure we do up/down at same time? https://github.com/owncloud/client/issues/1633
if (_activeJobList.count() < maximumActiveJob()) {
if (_rootJob->scheduleNextJob()) {
QTimer::singleShot(0, this, SLOT(scheduleNextJob()));
}
} else if (_activeJobList.count() < hardMaximumActiveJob()) {
int likelyFinishedQuicklyCount = 0;
// NOTE: Only counts the first 3 jobs! Then for each
// one that is likely finished quickly, we can launch another one.
// When a job finishes another one will "move up" to be one of the first 3 and then
// be counted too.
for (int i = 0; i < maximumActiveJob() && i < _activeJobList.count(); i++) {
if (_activeJobList.at(i)->isLikelyFinishedQuickly()) {
likelyFinishedQuicklyCount++;
}
}
if (_activeJobList.count() < maximumActiveJob() + likelyFinishedQuicklyCount) {
qDebug() << "Can pump in another request!";
if (_rootJob->scheduleNextJob()) {
QTimer::singleShot(0, this, SLOT(scheduleNextJob()));
}
}
}
}

View file

@ -88,6 +88,11 @@ public:
virtual JobParallelism parallelism() { return FullParallelism; }
/**
* For "small" jobs
*/
virtual bool isLikelyFinishedQuickly() { return false; }
/** The space that the running jobs need to complete but don't actually use yet.
*
* Note that this does *not* include the disk space that's already
@ -278,7 +283,6 @@ public:
, _journal(progressDb)
, _finishedEmited(false)
, _bandwidthManager(this)
, _activeJobs(0)
, _anotherSyncNeeded(false)
, _account(account)
{ }
@ -293,14 +297,15 @@ public:
QAtomicInt _abortRequested; // boolean set by the main thread to abort.
/* The number of currently active jobs */
int _activeJobs;
/* The list of currently active jobs */
QVector<PropagateItemJob*> _activeJobList;
/** We detected that another sync is required after this one */
bool _anotherSyncNeeded;
/* The maximum number of active jobs in parallel */
int maximumActiveJob();
int hardMaximumActiveJob();
bool isInSharedDirectory(const QString& file);
bool localFileNameClash(const QString& relfile);

View file

@ -310,7 +310,8 @@ void PropagateDownloadFileQNAM::start()
if (_propagator->_abortRequested.fetchAndAddRelaxed(0))
return;
qDebug() << Q_FUNC_INFO << _item->_file << _propagator->_activeJobs;
qDebug() << Q_FUNC_INFO << _item->_file << _propagator->_activeJobList.count();
_stopwatch.start();
if (_deleteExisting) {
deleteExistingFolder();
@ -414,7 +415,7 @@ void PropagateDownloadFileQNAM::start()
_job->setBandwidthManager(&_propagator->_bandwidthManager);
connect(_job, SIGNAL(finishedSignal()), this, SLOT(slotGetFinished()));
connect(_job, SIGNAL(downloadProgress(qint64,qint64)), this, SLOT(slotDownloadProgress(qint64,qint64)));
_propagator->_activeJobs ++;
_propagator->_activeJobList.append(this);
_job->start();
}
@ -434,7 +435,7 @@ void PropagateDownloadFileQNAM::setDeleteExistingFolder(bool enabled)
const char owncloudCustomSoftErrorStringC[] = "owncloud-custom-soft-error-string";
void PropagateDownloadFileQNAM::slotGetFinished()
{
_propagator->_activeJobs--;
_propagator->_activeJobList.removeOne(this);
GETFileJob *job = qobject_cast<GETFileJob *>(sender());
Q_ASSERT(job);
@ -741,6 +742,11 @@ void PropagateDownloadFileQNAM::downloadFinished(const QByteArray& transportChec
if(_item->_file == QLatin1String(".sys.admin#recall#") || _item->_file.endsWith("/.sys.admin#recall#")) {
handleRecallFile(fn);
}
qint64 duration = _stopwatch.elapsed();
if (isLikelyFinishedQuickly() && duration > 5*1000) {
qDebug() << "WARNING: Unexpectedly slow connection, took" << duration << "msec for" << _item->_size - _resumeStart << "bytes for" << _item->_file;
}
}
void PropagateDownloadFileQNAM::slotDownloadProgress(qint64 received, qint64)

View file

@ -114,6 +114,9 @@ public:
void start() Q_DECL_OVERRIDE;
qint64 committedDiskSpace() const Q_DECL_OVERRIDE;
// We think it might finish quickly because it is a small file.
bool isLikelyFinishedQuickly() Q_DECL_OVERRIDE { return _item->_size < 100*1024; }
/**
* Whether an existing folder with the same name may be deleted before
* the download.
@ -141,6 +144,8 @@ private:
QPointer<GETFileJob> _job;
QFile _tmpFile;
bool _deleteExisting;
QElapsedTimer _stopwatch;
};
}

View file

@ -64,7 +64,7 @@ void PropagateRemoteDelete::start()
_propagator->_remoteFolder + _item->_file,
this);
connect(_job, SIGNAL(finishedSignal()), this, SLOT(slotDeleteJobFinished()));
_propagator->_activeJobs ++;
_propagator->_activeJobList.append(this);
_job->start();
}
@ -76,7 +76,7 @@ void PropagateRemoteDelete::abort()
void PropagateRemoteDelete::slotDeleteJobFinished()
{
_propagator->_activeJobs--;
_propagator->_activeJobList.removeOne(this);
Q_ASSERT(_job);

View file

@ -49,6 +49,9 @@ public:
: PropagateItemJob(propagator, item) {}
void start() Q_DECL_OVERRIDE;
void abort() Q_DECL_OVERRIDE;
bool isLikelyFinishedQuickly() Q_DECL_OVERRIDE { return !_item->_isDirectory; }
private slots:
void slotDeleteJobFinished();

View file

@ -28,7 +28,7 @@ void PropagateRemoteMkdir::start()
qDebug() << Q_FUNC_INFO << _item->_file;
_propagator->_activeJobs++;
_propagator->_activeJobList.append(this);
if (!_deleteExisting) {
return slotStartMkcolJob();
@ -68,7 +68,7 @@ void PropagateRemoteMkdir::setDeleteExisting(bool enabled)
void PropagateRemoteMkdir::slotMkcolJobFinished()
{
_propagator->_activeJobs--;
_propagator->_activeJobList.removeOne(this);
Q_ASSERT(_job);
@ -109,7 +109,7 @@ void PropagateRemoteMkdir::slotMkcolJobFinished()
// So we must get the file id using a PROPFIND
// This is required so that we can detect moves even if the folder is renamed on the server
// while files are still uploading
_propagator->_activeJobs++;
_propagator->_activeJobList.append(this);
auto propfindJob = new PropfindJob(_job->account(), _job->path(), this);
propfindJob->setProperties(QList<QByteArray>() << "getetag" << "http://owncloud.org/ns:id");
QObject::connect(propfindJob, SIGNAL(result(QVariantMap)), this, SLOT(propfindResult(QVariantMap)));
@ -123,7 +123,7 @@ void PropagateRemoteMkdir::slotMkcolJobFinished()
void PropagateRemoteMkdir::propfindResult(const QVariantMap &result)
{
_propagator->_activeJobs--;
_propagator->_activeJobList.removeOne(this);
if (result.contains("getetag")) {
_item->_etag = result["getetag"].toByteArray();
}
@ -136,7 +136,7 @@ void PropagateRemoteMkdir::propfindResult(const QVariantMap &result)
void PropagateRemoteMkdir::propfindError()
{
// ignore the PROPFIND error
_propagator->_activeJobs--;
_propagator->_activeJobList.removeOne(this);
done(SyncFileItem::Success);
}

View file

@ -33,6 +33,9 @@ public:
void start() Q_DECL_OVERRIDE;
void abort() Q_DECL_OVERRIDE;
// Creating a directory should be fast.
bool isLikelyFinishedQuickly() Q_DECL_OVERRIDE { return true; }
/**
* Whether an existing entity with the same name may be deleted before
* creating the directory.

View file

@ -97,7 +97,7 @@ void PropagateRemoteMove::start()
_propagator->_remoteDir + _item->_renameTarget,
this);
connect(_job, SIGNAL(finishedSignal()), this, SLOT(slotMoveJobFinished()));
_propagator->_activeJobs++;
_propagator->_activeJobList.append(this);
_job->start();
}
@ -110,7 +110,7 @@ void PropagateRemoteMove::abort()
void PropagateRemoteMove::slotMoveJobFinished()
{
_propagator->_activeJobs--;
_propagator->_activeJobList.removeOne(this);
Q_ASSERT(_job);

View file

@ -188,7 +188,7 @@ void PropagateUploadFileQNAM::start()
return;
}
_propagator->_activeJobs++;
_propagator->_activeJobList.append(this);
if (!_deleteExisting) {
return slotComputeContentChecksum();
@ -209,7 +209,7 @@ void PropagateUploadFileQNAM::slotComputeContentChecksum()
return;
}
_propagator->_activeJobs--; // from start
_propagator->_activeJobList.removeOne(this);
const QString filePath = _propagator->getFilePath(_item->_file);
@ -565,7 +565,7 @@ void PropagateUploadFileQNAM::startNextChunk()
connect(job, SIGNAL(uploadProgress(qint64,qint64)), device, SLOT(slotJobUploadProgress(qint64,qint64)));
connect(job, SIGNAL(destroyed(QObject*)), this, SLOT(slotJobDestroyed(QObject*)));
job->start();
_propagator->_activeJobs++;
_propagator->_activeJobList.append(this);
_currentChunk++;
bool parallelChunkUpload = true;
@ -587,7 +587,7 @@ void PropagateUploadFileQNAM::startNextChunk()
parallelChunkUpload = false;
}
if (parallelChunkUpload && (_propagator->_activeJobs < _propagator->maximumActiveJob())
if (parallelChunkUpload && (_propagator->_activeJobList.count() < _propagator->maximumActiveJob())
&& _currentChunk < _chunkCount ) {
startNextChunk();
}
@ -608,7 +608,7 @@ void PropagateUploadFileQNAM::slotPutFinished()
<< job->reply()->attribute(QNetworkRequest::HttpStatusCodeAttribute)
<< job->reply()->attribute(QNetworkRequest::HttpReasonPhraseAttribute);
_propagator->_activeJobs--;
_propagator->_activeJobList.removeOne(this);
if (_finished) {
// We have sent the finished signal already. We don't need to handle any remaining jobs
@ -834,7 +834,7 @@ void PropagateUploadFileQNAM::startPollJob(const QString& path)
info._modtime = _item->_modtime;
_propagator->_journal->setPollInfo(info);
_propagator->_journal->commit("add poll info");
_propagator->_activeJobs++;
_propagator->_activeJobList.append(this);
job->start();
}
@ -843,7 +843,7 @@ void PropagateUploadFileQNAM::slotPollFinished()
PollJob *job = qobject_cast<PollJob *>(sender());
Q_ASSERT(job);
_propagator->_activeJobs--;
_propagator->_activeJobList.removeOne(this);
if (job->_item->_status != SyncFileItem::Success) {
_finished = true;