From fad387b6b85cde6da1a50e2f48451fa32f2bf610 Mon Sep 17 00:00:00 2001 From: Olivier Goffart Date: Tue, 2 Aug 2016 17:14:44 +0200 Subject: [PATCH] Chunking-Ng: Resume --- src/libsync/networkjobs.cpp | 8 +- src/libsync/networkjobs.h | 2 + src/libsync/propagateupload.h | 12 ++- src/libsync/propagateuploadng.cpp | 122 +++++++++++++++++++++--------- 4 files changed, 102 insertions(+), 42 deletions(-) diff --git a/src/libsync/networkjobs.cpp b/src/libsync/networkjobs.cpp index 5fd3c83a6..3c167334e 100644 --- a/src/libsync/networkjobs.cpp +++ b/src/libsync/networkjobs.cpp @@ -270,6 +270,11 @@ LsColJob::LsColJob(AccountPtr account, const QString &path, QObject *parent) { } +LsColJob::LsColJob(AccountPtr account, const QUrl &url, QObject *parent) + : AbstractNetworkJob(account, QString(), parent), _url(url) +{ +} + void LsColJob::setProperties(QList properties) { _properties = properties; @@ -313,7 +318,8 @@ void LsColJob::start() QBuffer *buf = new QBuffer(this); buf->setData(xml); buf->open(QIODevice::ReadOnly); - QNetworkReply *reply = davRequest("PROPFIND", path(), req, buf); + QNetworkReply *reply = _url.isValid() ? davRequest("PROPFIND", _url, req, buf) + : davRequest("PROPFIND", path(), req, buf); buf->setParent(reply); setReply(reply); setupConnections(reply); diff --git a/src/libsync/networkjobs.h b/src/libsync/networkjobs.h index 94deb3af1..0a19707f4 100644 --- a/src/libsync/networkjobs.h +++ b/src/libsync/networkjobs.h @@ -62,6 +62,7 @@ class OWNCLOUDSYNC_EXPORT LsColJob : public AbstractNetworkJob { Q_OBJECT public: explicit LsColJob(AccountPtr account, const QString &path, QObject *parent = 0); + explicit LsColJob(AccountPtr account, const QUrl &url, QObject *parent = 0); void start() Q_DECL_OVERRIDE; QHash _sizes; @@ -87,6 +88,7 @@ private slots: private: QList _properties; + QUrl _url; // Used instead of path() if the url is specified in the constructor }; /** diff --git a/src/libsync/propagateupload.h b/src/libsync/propagateupload.h index 4a06f2f13..83b1fea65 100644 --- a/src/libsync/propagateupload.h +++ b/src/libsync/propagateupload.h @@ -286,7 +286,8 @@ class PropagateUploadFileNG : public PropagateUploadFileCommon { private: quint64 _sent; /// amount of data that was already sent uint _transferId; /// transfer id (part of the url) - int _currentChunk; + int _currentChunk; /// Id of the next chunk that will be sent + QMap _serverChunks; // Map chunk number with its size from the PROPFIND on resume quint64 chunkSize() const { return _propagator->chunkSize(); } /** @@ -300,10 +301,13 @@ public: PropagateUploadFileCommon(propagator,item) {} void doStartUpload() Q_DECL_OVERRIDE; - -private slots: - void slotMkColFinished(QNetworkReply::NetworkError); +private: + void startNewUpload(); void startNextChunk(); +private slots: + void slotPropfindFinished(); + void slotPropfindFinishedWithError(); + void slotMkColFinished(QNetworkReply::NetworkError); void slotPutFinished(); void slotMoveJobFinished(); void slotUploadProgress(qint64,qint64); diff --git a/src/libsync/propagateuploadng.cpp b/src/libsync/propagateuploadng.cpp index d2f89aba8..95b41fa20 100644 --- a/src/libsync/propagateuploadng.cpp +++ b/src/libsync/propagateuploadng.cpp @@ -50,14 +50,17 @@ QUrl PropagateUploadFileNG::chunkUrl(int chunk) *----> doStartUpload() Check the db: is there an entry? - / \ - no yes - / \ - MKCOL PROPFIND (TODO) - | - slotMkColFinished() : - | : - +-----+---------------------+ + / \ + no yes + / \ + / PROPFIND + startNewUpload() <-+ +----------------------------\ + | | | \ + MKCOL + slotPropfindFinishedWithError() slotPropfindFinished() + | | + slotMkColFinished() | + | | + +-----+-------------------------------------------------------+ | +----> startNextChunk() ---finished? --+ ^ | | @@ -74,27 +77,87 @@ QUrl PropagateUploadFileNG::chunkUrl(int chunk) void PropagateUploadFileNG::doStartUpload() { - _transferId = qrand() ^ _item->_modtime ^ (_item->_size << 16) ^ qHash(_item->_file); - -#if FUTURE - const SyncJournalDb::UploadInfo progressInfo = _propagator->_journal->getUploadInfo(_item->_file); - - if (progressInfo._valid && Utility::qDateTimeToTime_t(progressInfo._modtime) == _item->_modtime ) { - _transferId = progressInfo._transferid; - // TODO: make a PROPFIND call to check what the size on the server is - //_startChunk = progressInfo._chunk; - //qDebug() << Q_FUNC_INFO << _item->_file << ": Resuming from chunk " << _startChunk; - - } -#endif + _duration.start(); _propagator->_activeJobList.append(this); + const SyncJournalDb::UploadInfo progressInfo = _propagator->_journal->getUploadInfo(_item->_file); + if (progressInfo._valid && Utility::qDateTimeToTime_t(progressInfo._modtime) == _item->_modtime ) { + _transferId = progressInfo._transferid; + auto url = chunkUrl(); + auto job = new LsColJob(_propagator->account(), url, this); + _jobs.append(job); + job->setProperties(QList() << "resourcetype" << "getcontentlength"); + connect(job, SIGNAL(finishedWithoutError()), this, SLOT(slotPropfindFinished())); + connect(job, SIGNAL(finishedWithError(QNetworkReply*)), + this, SLOT(slotPropfindFinishedWithError())); + connect(job, SIGNAL(destroyed(QObject*)), this, SLOT(slotJobDestroyed(QObject*))); + //TODO: port to Qt4 + connect(job, &LsColJob::directoryListingIterated, + [this, url](const QString &name, const QMap &properties) mutable { + if (name == url.path()) { + return; // skip the info about the path itself + } + bool ok = false; + auto chunkId = name.midRef(name.lastIndexOf('/')+1).toUInt(&ok); + if (ok) { + this->_serverChunks[chunkId] = properties["getcontentlength"].toULongLong(); + } + }); + job->start(); + return; + } + + startNewUpload(); +} + +void PropagateUploadFileNG::slotPropfindFinished() +{ + auto job = qobject_cast(sender()); + slotJobDestroyed(job); // remove it from the _jobs list + _propagator->_activeJobList.removeOne(this); + + _currentChunk = 0; + _sent = 0; + while (_serverChunks.contains(_currentChunk)) { + _sent += _serverChunks[_currentChunk]; + ++_currentChunk; + } + startNextChunk(); +} + +void PropagateUploadFileNG::slotPropfindFinishedWithError() +{ + auto job = qobject_cast(sender()); + slotJobDestroyed(job); // remove it from the _jobs list + QNetworkReply::NetworkError err = job->reply()->error(); + auto httpErrorCode = job->reply()->attribute(QNetworkRequest::HttpStatusCodeAttribute).toInt(); + auto status = classifyError(err, httpErrorCode, &_propagator->_anotherSyncNeeded); + if (status == SyncFileItem::FatalError) { + _propagator->_activeJobList.removeOne(this); + QString errorString = errorMessage(job->reply()->errorString(), job->reply()->readAll()); + abortWithError(status, errorString); + return; + } + startNewUpload(); +} + + +void PropagateUploadFileNG::startNewUpload() +{ + Q_ASSERT(_propagator->_activeJobList.count(this) == 1); + _transferId = qrand() ^ _item->_modtime ^ (_item->_size << 16) ^ qHash(_item->_file); _sent = 0; _currentChunk = 0; - _duration.start(); emit progress(*_item, 0); + SyncJournalDb::UploadInfo pi; + pi._valid = true; + pi._transferid = _transferId; + pi._modtime = Utility::qDateTimeFromTime_t(_item->_modtime); + _propagator->_journal->setUploadInfo(_item->_file, pi); + _propagator->_journal->commit("Upload info"); + auto job = new MkColJob(_propagator->account(), chunkUrl(), this); @@ -273,21 +336,6 @@ void PropagateUploadFileNG::slotPutFinished() _propagator->_journal->wipeErrorBlacklistEntry(_item->_file); _item->_hasBlacklistEntry = false; } - - SyncJournalDb::UploadInfo pi; - pi._valid = true; - auto currentChunk = job->_chunk; - foreach (auto *job, _jobs) { - // Take the minimum finished one - if (auto putJob = qobject_cast(job)) { - currentChunk = qMin(currentChunk, putJob->_chunk - 1); - } - } - pi._chunk = currentChunk; // FIXME - pi._transferid = _transferId; - pi._modtime = Utility::qDateTimeFromTime_t(_item->_modtime); - _propagator->_journal->setUploadInfo(_item->_file, pi); - _propagator->_journal->commit("Upload info"); } startNextChunk(); }