From 66595a359715f7380af539aa98a5567fb120e961 Mon Sep 17 00:00:00 2001 From: Olivier Goffart Date: Mon, 15 Sep 2014 17:55:55 +0200 Subject: [PATCH] Parallel chunk uploads This does not include progress indicator --- src/mirall/owncloudpropagator.cpp | 6 +- src/mirall/owncloudpropagator.h | 3 + src/mirall/propagator_qnam.cpp | 95 +++++++++++++++++++++++-------- src/mirall/propagator_qnam.h | 12 ++-- 4 files changed, 86 insertions(+), 30 deletions(-) diff --git a/src/mirall/owncloudpropagator.cpp b/src/mirall/owncloudpropagator.cpp index f5c0fb026..b06677497 100644 --- a/src/mirall/owncloudpropagator.cpp +++ b/src/mirall/owncloudpropagator.cpp @@ -34,7 +34,8 @@ namespace Mirall { /* The maximum number of active job in parallel */ -static int maximumActiveJob() { +int OwncloudPropagator::maximumActiveJob() +{ static int max = qgetenv("OWNCLOUD_MAX_PARALLEL").toUInt(); if (!max) { max = 3; //default @@ -42,6 +43,7 @@ static int maximumActiveJob() { return max; } + void PropagateItemJob::done(SyncFileItem::Status status, const QString &errorString) { if (_item._isRestoration) { @@ -415,7 +417,7 @@ void PropagateDirectory::slotSubJobReady() return; // Ignore the case when the _fistJob is ready and not yet finished if (_runningNow && _current >= 0 && _current < _subJobs.count()) { // there is a job running and the current one is not ready yet, we can't start new job - if (!_subJobs[_current]->_readySent || _propagator->_activeJobs >= maximumActiveJob()) + if (!_subJobs[_current]->_readySent || _propagator->_activeJobs >= _propagator->maximumActiveJob()) return; } diff --git a/src/mirall/owncloudpropagator.h b/src/mirall/owncloudpropagator.h index be84ec38f..1d3d96df5 100644 --- a/src/mirall/owncloudpropagator.h +++ b/src/mirall/owncloudpropagator.h @@ -221,6 +221,9 @@ public: /* The number of currently active jobs */ int _activeJobs; + /* The maximum number of active job in parallel */ + int maximumActiveJob(); + bool isInSharedDirectory(const QString& file); bool localFileNameClash(const QString& relfile); diff --git a/src/mirall/propagator_qnam.cpp b/src/mirall/propagator_qnam.cpp index 6d2e3895a..6b525324f 100644 --- a/src/mirall/propagator_qnam.cpp +++ b/src/mirall/propagator_qnam.cpp @@ -181,9 +181,7 @@ void PropagateUploadFileQNAM::start() _currentChunk = 0; _duration.start(); - _propagator->_activeJobs++; emit progress(_item, 0); - emitReady(); this->startNextChunk(); } @@ -197,7 +195,6 @@ public: ChunkDevice(QIODevice *file, qint64 start, qint64 size) : QIODevice(file), _file(file), _read(0), _size(size), _start(start) { _file = QPointer(file); - _file.data()->seek(start); } virtual qint64 writeData(const char* , qint64 ) { @@ -211,10 +208,12 @@ public: close(); return -1; } + _file.data()->seek(_start + _read); maxlen = qMin(maxlen, chunkSize() - _read); if (maxlen == 0) return 0; qint64 ret = _file.data()->read(data, maxlen); + if (ret < 0) return -1; _read += ret; @@ -259,6 +258,15 @@ void PropagateUploadFileQNAM::startNextChunk() if (_propagator->_abortRequested.fetchAndAddRelaxed(0)) return; + if (! _jobs.isEmpty() && _currentChunk + _startChunk >= _chunkCount - 1) { + // Don't do parallel upload of chunk if this might be the last chunk because the server cannot handle that + // https://github.com/owncloud/core/issues/11106 + // We return now and when the _jobs will be finished we will proceed the last chunk + qWarning() << "WTF" << _currentChunk << _chunkCount << _startChunk; + return; + } + + qWarning() << "Go Go Go " << _jobs.count() << _currentChunk << _chunkCount << _startChunk; /* * // If the source file has changed during upload, it is detected and the @@ -311,11 +319,31 @@ void PropagateUploadFileQNAM::startNextChunk() } if( isOpen ) { - _job = new PUTFileJob(AccountManager::instance()->account(), _propagator->_remoteFolder + path, device, headers); - _job->setTimeout(_propagator->httpTimeout() * 1000); - connect(_job, SIGNAL(finishedSignal()), this, SLOT(slotPutFinished())); - connect(_job, SIGNAL(uploadProgress(qint64,qint64)), this, SLOT(slotUploadProgress(qint64,qint64))); - _job->start(); + PUTFileJob* job = new PUTFileJob(AccountManager::instance()->account(), _propagator->_remoteFolder + path, device, headers, _currentChunk); + _jobs.append(job); + job->setTimeout(_propagator->httpTimeout() * 1000); + connect(job, SIGNAL(finishedSignal()), this, SLOT(slotPutFinished())); + connect(job, SIGNAL(uploadProgress(qint64,qint64)), this, SLOT(slotUploadProgress(qint64,qint64))); + connect(job, SIGNAL(destroyed(QObject*)), this, SLOT(slotJobDestroyed(QObject*))); + job->start(); + _propagator->_activeJobs++; + _currentChunk++; + + bool parallelChunkUpload = qgetenv("OWNCLOUD_PARALLEL_CHUNK").toInt() > 0; + if (_currentChunk + _startChunk >= _chunkCount - 1) { + // Don't do parallel upload of chunk if this might be the last chunk because the server cannot handle that + // https://github.com/owncloud/core/issues/11106 + parallelChunkUpload = false; + } + + if (parallelChunkUpload && (_propagator->_activeJobs < _propagator->maximumActiveJob()) + && _currentChunk < _chunkCount ) { + startNextChunk(); + } + if (!parallelChunkUpload || _chunkCount - _currentChunk <= 0) { + emitReady(); + } + } else { qDebug() << "ERR: Could not open upload file: " << device->errorString(); done( SyncFileItem::NormalError, device->errorString() ); @@ -328,6 +356,7 @@ void PropagateUploadFileQNAM::slotPutFinished() { PUTFileJob *job = qobject_cast(sender()); Q_ASSERT(job); + slotJobDestroyed(job); // remove it from the _jobs list qDebug() << Q_FUNC_INFO << job->reply()->request().url() << "FINISHED WITH STATUS" << job->reply()->error() @@ -335,10 +364,16 @@ void PropagateUploadFileQNAM::slotPutFinished() << job->reply()->attribute(QNetworkRequest::HttpStatusCodeAttribute) << job->reply()->attribute(QNetworkRequest::HttpReasonPhraseAttribute); + _propagator->_activeJobs--; + + if (_finished) { + // We have send the finished signal already. We don't need to handle any remaining jobs + return; + } + QNetworkReply::NetworkError err = job->reply()->error(); if (err != QNetworkReply::NoError) { _item._httpErrorCode = job->reply()->attribute(QNetworkRequest::HttpStatusCodeAttribute).toInt(); - _propagator->_activeJobs--; if(checkForProblemsWithShared(_item._httpErrorCode, tr("The file was edited locally but is part of a read only share. " "It is restored and your edit is in the conflict file."))) { @@ -366,9 +401,9 @@ void PropagateUploadFileQNAM::slotPutFinished() _item._httpErrorCode = job->reply()->attribute(QNetworkRequest::HttpStatusCodeAttribute).toInt(); // The server needs some time to process the request and provide with a poll URL if (_item._httpErrorCode == 202) { + _finished = true; QString path = QString::fromUtf8(job->reply()->rawHeader("OC-Finish-Poll")); if (path.isEmpty()) { - _propagator->_activeJobs--; done(SyncFileItem::NormalError, tr("Poll URL missing")); return; } @@ -381,14 +416,14 @@ void PropagateUploadFileQNAM::slotPutFinished() if (!finished) { QFileInfo fi(_propagator->_localDir + _item._file); if( !fi.exists() ) { - _propagator->_activeJobs--; + _finished = true; done(SyncFileItem::SoftError, tr("The local file was removed during sync.")); return; } if (Utility::qDateTimeToTime_t(fi.lastModified()) != _item._modtime) { qDebug() << "The local file has changed during upload:" << _item._modtime << "!=" << Utility::qDateTimeToTime_t(fi.lastModified()) << fi.lastModified(); - _propagator->_activeJobs--; + _finished = true; done(SyncFileItem::SoftError, tr("Local file changed during sync.")); // FIXME: the legacy code was retrying for a few seconds. // and also checking that after the last chunk, and removed the file in case of INSTRUCTION_NEW @@ -396,16 +431,23 @@ void PropagateUploadFileQNAM::slotPutFinished() } // Proceed to next chunk. - _currentChunk++; if (_currentChunk >= _chunkCount) { - _propagator->_activeJobs--; + if (!_jobs.empty()) { + // just wait for the other job to finish. + return; + } + _finished = true; done(SyncFileItem::NormalError, tr("The server did not acknowledge the last chunk. (No e-tag were present)")); return; } SyncJournalDb::UploadInfo pi; pi._valid = true; - pi._chunk = (_currentChunk + _startChunk) % _chunkCount; // next chunk to start with + auto currentChunk = _chunkCount; + foreach (auto *job, _jobs) { + currentChunk = qMin(currentChunk, job->_chunk); + } + pi._chunk = (currentChunk + _startChunk) % _chunkCount; // next chunk to start with pi._transferid = _transferId; pi._modtime = Utility::qDateTimeFromTime_t(_item._modtime); _propagator->_journal->setUploadInfo(_item._file, pi); @@ -415,7 +457,7 @@ void PropagateUploadFileQNAM::slotPutFinished() } // the following code only happens after all chunks were uploaded. - // + _finished = true; // the file id should only be empty for new files up- or downloaded QByteArray fid = job->reply()->rawHeader("OC-FileID"); if( !fid.isEmpty() ) { @@ -447,8 +489,6 @@ void PropagateUploadFileQNAM::finalize(const SyncFileItem ©) _item._etag = copy._etag; _item._fileId = copy._fileId; - _propagator->_activeJobs--; - _item._requestDuration = _duration.elapsed(); _propagator->_journal->setFileRecord(SyncJournalFileRecord(_item, _propagator->_localDir + _item._file)); @@ -461,10 +501,10 @@ void PropagateUploadFileQNAM::finalize(const SyncFileItem ©) void PropagateUploadFileQNAM::slotUploadProgress(qint64 sent, qint64) { - int progressChunk = _currentChunk + _startChunk; + int progressChunk = _currentChunk + _startChunk - 1; if (progressChunk >= _chunkCount) - progressChunk = _currentChunk; - emit progress(_item, sent + _currentChunk * chunkSize()); + progressChunk = _currentChunk - 1; + emit progress(_item, sent + progressChunk * chunkSize()); } void PropagateUploadFileQNAM::startPollJob(const QString& path) @@ -494,11 +534,18 @@ void PropagateUploadFileQNAM::slotPollFinished() finalize(job->_item); } +void PropagateUploadFileQNAM::slotJobDestroyed(QObject* job) +{ + _jobs.erase(std::remove(_jobs.begin(), _jobs.end(), job) , _jobs.end()); +} + void PropagateUploadFileQNAM::abort() { - if (_job && _job->reply()) { - qDebug() << Q_FUNC_INFO << this->_item._file; - _job->reply()->abort(); + foreach(auto *job, _jobs) { + if (job->reply()) { + qDebug() << Q_FUNC_INFO << job << this->_item._file; + job->reply()->abort(); + } } } diff --git a/src/mirall/propagator_qnam.h b/src/mirall/propagator_qnam.h index 3ad15c3de..ffc54dd35 100644 --- a/src/mirall/propagator_qnam.h +++ b/src/mirall/propagator_qnam.h @@ -58,8 +58,10 @@ class PUTFileJob : public AbstractNetworkJob { public: // Takes ownership of the device explicit PUTFileJob(Account* account, const QString& path, QIODevice *device, - const QMap &headers, QObject* parent = 0) - : AbstractNetworkJob(account, path, parent), _device(device), _headers(headers) {} + const QMap &headers, int chunk, QObject* parent = 0) + : AbstractNetworkJob(account, path, parent), _device(device), _headers(headers), _chunk(chunk) {} + + int _chunk; virtual void start(); @@ -107,16 +109,17 @@ signals: class PropagateUploadFileQNAM : public PropagateItemJob { Q_OBJECT - QPointer _job; QFile *_file; int _startChunk; int _currentChunk; int _chunkCount; int _transferId; QElapsedTimer _duration; + QVector _jobs; + bool _finished; public: PropagateUploadFileQNAM(OwncloudPropagator* propagator,const SyncFileItem& item) - : PropagateItemJob(propagator, item), _startChunk(0), _currentChunk(0), _chunkCount(0), _transferId(0) {} + : PropagateItemJob(propagator, item), _startChunk(0), _currentChunk(0), _chunkCount(0), _transferId(0), _finished(false) {} void start(); private slots: void slotPutFinished(); @@ -125,6 +128,7 @@ private slots: void abort(); void startNextChunk(); void finalize(const SyncFileItem&); + void slotJobDestroyed(QObject *job); private: void startPollJob(const QString& path); };