Parallel chunk uploads

This does not include progress indicator
This commit is contained in:
Olivier Goffart 2014-09-15 17:55:55 +02:00
parent 4960890d70
commit 66595a3597
4 changed files with 86 additions and 30 deletions

View file

@ -34,7 +34,8 @@
namespace Mirall { namespace Mirall {
/* The maximum number of active job in parallel */ /* The maximum number of active job in parallel */
static int maximumActiveJob() { int OwncloudPropagator::maximumActiveJob()
{
static int max = qgetenv("OWNCLOUD_MAX_PARALLEL").toUInt(); static int max = qgetenv("OWNCLOUD_MAX_PARALLEL").toUInt();
if (!max) { if (!max) {
max = 3; //default max = 3; //default
@ -42,6 +43,7 @@ static int maximumActiveJob() {
return max; return max;
} }
void PropagateItemJob::done(SyncFileItem::Status status, const QString &errorString) void PropagateItemJob::done(SyncFileItem::Status status, const QString &errorString)
{ {
if (_item._isRestoration) { if (_item._isRestoration) {
@ -415,7 +417,7 @@ void PropagateDirectory::slotSubJobReady()
return; // Ignore the case when the _fistJob is ready and not yet finished return; // Ignore the case when the _fistJob is ready and not yet finished
if (_runningNow && _current >= 0 && _current < _subJobs.count()) { if (_runningNow && _current >= 0 && _current < _subJobs.count()) {
// there is a job running and the current one is not ready yet, we can't start new job // there is a job running and the current one is not ready yet, we can't start new job
if (!_subJobs[_current]->_readySent || _propagator->_activeJobs >= maximumActiveJob()) if (!_subJobs[_current]->_readySent || _propagator->_activeJobs >= _propagator->maximumActiveJob())
return; return;
} }

View file

@ -221,6 +221,9 @@ public:
/* The number of currently active jobs */ /* The number of currently active jobs */
int _activeJobs; int _activeJobs;
/* The maximum number of active job in parallel */
int maximumActiveJob();
bool isInSharedDirectory(const QString& file); bool isInSharedDirectory(const QString& file);
bool localFileNameClash(const QString& relfile); bool localFileNameClash(const QString& relfile);

View file

@ -181,9 +181,7 @@ void PropagateUploadFileQNAM::start()
_currentChunk = 0; _currentChunk = 0;
_duration.start(); _duration.start();
_propagator->_activeJobs++;
emit progress(_item, 0); emit progress(_item, 0);
emitReady();
this->startNextChunk(); this->startNextChunk();
} }
@ -197,7 +195,6 @@ public:
ChunkDevice(QIODevice *file, qint64 start, qint64 size) ChunkDevice(QIODevice *file, qint64 start, qint64 size)
: QIODevice(file), _file(file), _read(0), _size(size), _start(start) { : QIODevice(file), _file(file), _read(0), _size(size), _start(start) {
_file = QPointer<QIODevice>(file); _file = QPointer<QIODevice>(file);
_file.data()->seek(start);
} }
virtual qint64 writeData(const char* , qint64 ) { virtual qint64 writeData(const char* , qint64 ) {
@ -211,10 +208,12 @@ public:
close(); close();
return -1; return -1;
} }
_file.data()->seek(_start + _read);
maxlen = qMin(maxlen, chunkSize() - _read); maxlen = qMin(maxlen, chunkSize() - _read);
if (maxlen == 0) if (maxlen == 0)
return 0; return 0;
qint64 ret = _file.data()->read(data, maxlen); qint64 ret = _file.data()->read(data, maxlen);
if (ret < 0) if (ret < 0)
return -1; return -1;
_read += ret; _read += ret;
@ -259,6 +258,15 @@ void PropagateUploadFileQNAM::startNextChunk()
if (_propagator->_abortRequested.fetchAndAddRelaxed(0)) if (_propagator->_abortRequested.fetchAndAddRelaxed(0))
return; return;
if (! _jobs.isEmpty() && _currentChunk + _startChunk >= _chunkCount - 1) {
// Don't do parallel upload of chunk if this might be the last chunk because the server cannot handle that
// https://github.com/owncloud/core/issues/11106
// We return now and when the _jobs will be finished we will proceed the last chunk
qWarning() << "WTF" << _currentChunk << _chunkCount << _startChunk;
return;
}
qWarning() << "Go Go Go " << _jobs.count() << _currentChunk << _chunkCount << _startChunk;
/* /*
* // If the source file has changed during upload, it is detected and the * // If the source file has changed during upload, it is detected and the
@ -311,11 +319,31 @@ void PropagateUploadFileQNAM::startNextChunk()
} }
if( isOpen ) { if( isOpen ) {
_job = new PUTFileJob(AccountManager::instance()->account(), _propagator->_remoteFolder + path, device, headers); PUTFileJob* job = new PUTFileJob(AccountManager::instance()->account(), _propagator->_remoteFolder + path, device, headers, _currentChunk);
_job->setTimeout(_propagator->httpTimeout() * 1000); _jobs.append(job);
connect(_job, SIGNAL(finishedSignal()), this, SLOT(slotPutFinished())); job->setTimeout(_propagator->httpTimeout() * 1000);
connect(_job, SIGNAL(uploadProgress(qint64,qint64)), this, SLOT(slotUploadProgress(qint64,qint64))); connect(job, SIGNAL(finishedSignal()), this, SLOT(slotPutFinished()));
_job->start(); connect(job, SIGNAL(uploadProgress(qint64,qint64)), this, SLOT(slotUploadProgress(qint64,qint64)));
connect(job, SIGNAL(destroyed(QObject*)), this, SLOT(slotJobDestroyed(QObject*)));
job->start();
_propagator->_activeJobs++;
_currentChunk++;
bool parallelChunkUpload = qgetenv("OWNCLOUD_PARALLEL_CHUNK").toInt() > 0;
if (_currentChunk + _startChunk >= _chunkCount - 1) {
// Don't do parallel upload of chunk if this might be the last chunk because the server cannot handle that
// https://github.com/owncloud/core/issues/11106
parallelChunkUpload = false;
}
if (parallelChunkUpload && (_propagator->_activeJobs < _propagator->maximumActiveJob())
&& _currentChunk < _chunkCount ) {
startNextChunk();
}
if (!parallelChunkUpload || _chunkCount - _currentChunk <= 0) {
emitReady();
}
} else { } else {
qDebug() << "ERR: Could not open upload file: " << device->errorString(); qDebug() << "ERR: Could not open upload file: " << device->errorString();
done( SyncFileItem::NormalError, device->errorString() ); done( SyncFileItem::NormalError, device->errorString() );
@ -328,6 +356,7 @@ void PropagateUploadFileQNAM::slotPutFinished()
{ {
PUTFileJob *job = qobject_cast<PUTFileJob *>(sender()); PUTFileJob *job = qobject_cast<PUTFileJob *>(sender());
Q_ASSERT(job); Q_ASSERT(job);
slotJobDestroyed(job); // remove it from the _jobs list
qDebug() << Q_FUNC_INFO << job->reply()->request().url() << "FINISHED WITH STATUS" qDebug() << Q_FUNC_INFO << job->reply()->request().url() << "FINISHED WITH STATUS"
<< job->reply()->error() << job->reply()->error()
@ -335,10 +364,16 @@ void PropagateUploadFileQNAM::slotPutFinished()
<< job->reply()->attribute(QNetworkRequest::HttpStatusCodeAttribute) << job->reply()->attribute(QNetworkRequest::HttpStatusCodeAttribute)
<< job->reply()->attribute(QNetworkRequest::HttpReasonPhraseAttribute); << job->reply()->attribute(QNetworkRequest::HttpReasonPhraseAttribute);
_propagator->_activeJobs--;
if (_finished) {
// We have send the finished signal already. We don't need to handle any remaining jobs
return;
}
QNetworkReply::NetworkError err = job->reply()->error(); QNetworkReply::NetworkError err = job->reply()->error();
if (err != QNetworkReply::NoError) { if (err != QNetworkReply::NoError) {
_item._httpErrorCode = job->reply()->attribute(QNetworkRequest::HttpStatusCodeAttribute).toInt(); _item._httpErrorCode = job->reply()->attribute(QNetworkRequest::HttpStatusCodeAttribute).toInt();
_propagator->_activeJobs--;
if(checkForProblemsWithShared(_item._httpErrorCode, if(checkForProblemsWithShared(_item._httpErrorCode,
tr("The file was edited locally but is part of a read only share. " tr("The file was edited locally but is part of a read only share. "
"It is restored and your edit is in the conflict file."))) { "It is restored and your edit is in the conflict file."))) {
@ -366,9 +401,9 @@ void PropagateUploadFileQNAM::slotPutFinished()
_item._httpErrorCode = job->reply()->attribute(QNetworkRequest::HttpStatusCodeAttribute).toInt(); _item._httpErrorCode = job->reply()->attribute(QNetworkRequest::HttpStatusCodeAttribute).toInt();
// The server needs some time to process the request and provide with a poll URL // The server needs some time to process the request and provide with a poll URL
if (_item._httpErrorCode == 202) { if (_item._httpErrorCode == 202) {
_finished = true;
QString path = QString::fromUtf8(job->reply()->rawHeader("OC-Finish-Poll")); QString path = QString::fromUtf8(job->reply()->rawHeader("OC-Finish-Poll"));
if (path.isEmpty()) { if (path.isEmpty()) {
_propagator->_activeJobs--;
done(SyncFileItem::NormalError, tr("Poll URL missing")); done(SyncFileItem::NormalError, tr("Poll URL missing"));
return; return;
} }
@ -381,14 +416,14 @@ void PropagateUploadFileQNAM::slotPutFinished()
if (!finished) { if (!finished) {
QFileInfo fi(_propagator->_localDir + _item._file); QFileInfo fi(_propagator->_localDir + _item._file);
if( !fi.exists() ) { if( !fi.exists() ) {
_propagator->_activeJobs--; _finished = true;
done(SyncFileItem::SoftError, tr("The local file was removed during sync.")); done(SyncFileItem::SoftError, tr("The local file was removed during sync."));
return; return;
} }
if (Utility::qDateTimeToTime_t(fi.lastModified()) != _item._modtime) { if (Utility::qDateTimeToTime_t(fi.lastModified()) != _item._modtime) {
qDebug() << "The local file has changed during upload:" << _item._modtime << "!=" << Utility::qDateTimeToTime_t(fi.lastModified()) << fi.lastModified(); qDebug() << "The local file has changed during upload:" << _item._modtime << "!=" << Utility::qDateTimeToTime_t(fi.lastModified()) << fi.lastModified();
_propagator->_activeJobs--; _finished = true;
done(SyncFileItem::SoftError, tr("Local file changed during sync.")); done(SyncFileItem::SoftError, tr("Local file changed during sync."));
// FIXME: the legacy code was retrying for a few seconds. // FIXME: the legacy code was retrying for a few seconds.
// and also checking that after the last chunk, and removed the file in case of INSTRUCTION_NEW // and also checking that after the last chunk, and removed the file in case of INSTRUCTION_NEW
@ -396,16 +431,23 @@ void PropagateUploadFileQNAM::slotPutFinished()
} }
// Proceed to next chunk. // Proceed to next chunk.
_currentChunk++;
if (_currentChunk >= _chunkCount) { if (_currentChunk >= _chunkCount) {
_propagator->_activeJobs--; if (!_jobs.empty()) {
// just wait for the other job to finish.
return;
}
_finished = true;
done(SyncFileItem::NormalError, tr("The server did not acknowledge the last chunk. (No e-tag were present)")); done(SyncFileItem::NormalError, tr("The server did not acknowledge the last chunk. (No e-tag were present)"));
return; return;
} }
SyncJournalDb::UploadInfo pi; SyncJournalDb::UploadInfo pi;
pi._valid = true; pi._valid = true;
pi._chunk = (_currentChunk + _startChunk) % _chunkCount; // next chunk to start with auto currentChunk = _chunkCount;
foreach (auto *job, _jobs) {
currentChunk = qMin(currentChunk, job->_chunk);
}
pi._chunk = (currentChunk + _startChunk) % _chunkCount; // next chunk to start with
pi._transferid = _transferId; pi._transferid = _transferId;
pi._modtime = Utility::qDateTimeFromTime_t(_item._modtime); pi._modtime = Utility::qDateTimeFromTime_t(_item._modtime);
_propagator->_journal->setUploadInfo(_item._file, pi); _propagator->_journal->setUploadInfo(_item._file, pi);
@ -415,7 +457,7 @@ void PropagateUploadFileQNAM::slotPutFinished()
} }
// the following code only happens after all chunks were uploaded. // the following code only happens after all chunks were uploaded.
// _finished = true;
// the file id should only be empty for new files up- or downloaded // the file id should only be empty for new files up- or downloaded
QByteArray fid = job->reply()->rawHeader("OC-FileID"); QByteArray fid = job->reply()->rawHeader("OC-FileID");
if( !fid.isEmpty() ) { if( !fid.isEmpty() ) {
@ -447,8 +489,6 @@ void PropagateUploadFileQNAM::finalize(const SyncFileItem &copy)
_item._etag = copy._etag; _item._etag = copy._etag;
_item._fileId = copy._fileId; _item._fileId = copy._fileId;
_propagator->_activeJobs--;
_item._requestDuration = _duration.elapsed(); _item._requestDuration = _duration.elapsed();
_propagator->_journal->setFileRecord(SyncJournalFileRecord(_item, _propagator->_localDir + _item._file)); _propagator->_journal->setFileRecord(SyncJournalFileRecord(_item, _propagator->_localDir + _item._file));
@ -461,10 +501,10 @@ void PropagateUploadFileQNAM::finalize(const SyncFileItem &copy)
void PropagateUploadFileQNAM::slotUploadProgress(qint64 sent, qint64) void PropagateUploadFileQNAM::slotUploadProgress(qint64 sent, qint64)
{ {
int progressChunk = _currentChunk + _startChunk; int progressChunk = _currentChunk + _startChunk - 1;
if (progressChunk >= _chunkCount) if (progressChunk >= _chunkCount)
progressChunk = _currentChunk; progressChunk = _currentChunk - 1;
emit progress(_item, sent + _currentChunk * chunkSize()); emit progress(_item, sent + progressChunk * chunkSize());
} }
void PropagateUploadFileQNAM::startPollJob(const QString& path) void PropagateUploadFileQNAM::startPollJob(const QString& path)
@ -494,11 +534,18 @@ void PropagateUploadFileQNAM::slotPollFinished()
finalize(job->_item); finalize(job->_item);
} }
void PropagateUploadFileQNAM::slotJobDestroyed(QObject* job)
{
_jobs.erase(std::remove(_jobs.begin(), _jobs.end(), job) , _jobs.end());
}
void PropagateUploadFileQNAM::abort() void PropagateUploadFileQNAM::abort()
{ {
if (_job && _job->reply()) { foreach(auto *job, _jobs) {
qDebug() << Q_FUNC_INFO << this->_item._file; if (job->reply()) {
_job->reply()->abort(); qDebug() << Q_FUNC_INFO << job << this->_item._file;
job->reply()->abort();
}
} }
} }

View file

@ -58,8 +58,10 @@ class PUTFileJob : public AbstractNetworkJob {
public: public:
// Takes ownership of the device // Takes ownership of the device
explicit PUTFileJob(Account* account, const QString& path, QIODevice *device, explicit PUTFileJob(Account* account, const QString& path, QIODevice *device,
const QMap<QByteArray, QByteArray> &headers, QObject* parent = 0) const QMap<QByteArray, QByteArray> &headers, int chunk, QObject* parent = 0)
: AbstractNetworkJob(account, path, parent), _device(device), _headers(headers) {} : AbstractNetworkJob(account, path, parent), _device(device), _headers(headers), _chunk(chunk) {}
int _chunk;
virtual void start(); virtual void start();
@ -107,16 +109,17 @@ signals:
class PropagateUploadFileQNAM : public PropagateItemJob { class PropagateUploadFileQNAM : public PropagateItemJob {
Q_OBJECT Q_OBJECT
QPointer<PUTFileJob> _job;
QFile *_file; QFile *_file;
int _startChunk; int _startChunk;
int _currentChunk; int _currentChunk;
int _chunkCount; int _chunkCount;
int _transferId; int _transferId;
QElapsedTimer _duration; QElapsedTimer _duration;
QVector<PUTFileJob*> _jobs;
bool _finished;
public: public:
PropagateUploadFileQNAM(OwncloudPropagator* propagator,const SyncFileItem& item) PropagateUploadFileQNAM(OwncloudPropagator* propagator,const SyncFileItem& item)
: PropagateItemJob(propagator, item), _startChunk(0), _currentChunk(0), _chunkCount(0), _transferId(0) {} : PropagateItemJob(propagator, item), _startChunk(0), _currentChunk(0), _chunkCount(0), _transferId(0), _finished(false) {}
void start(); void start();
private slots: private slots:
void slotPutFinished(); void slotPutFinished();
@ -125,6 +128,7 @@ private slots:
void abort(); void abort();
void startNextChunk(); void startNextChunk();
void finalize(const SyncFileItem&); void finalize(const SyncFileItem&);
void slotJobDestroyed(QObject *job);
private: private:
void startPollJob(const QString& path); void startPollJob(const QString& path);
}; };