From 4f3f642da6c44cf2eb1bc2210904e99d92e0a76a Mon Sep 17 00:00:00 2001 From: Olivier Goffart Date: Fri, 20 May 2016 10:26:02 +0200 Subject: [PATCH 01/19] Upload: refactor the upload in two classes so the new chuning can be implemented --- src/libsync/CMakeLists.txt | 1 + src/libsync/owncloudpropagator.cpp | 2 +- src/libsync/propagateupload.cpp | 471 +++++------------------------ src/libsync/propagateupload.h | 124 +++++--- src/libsync/propagateuploadv1.cpp | 376 +++++++++++++++++++++++ 5 files changed, 534 insertions(+), 440 deletions(-) create mode 100644 src/libsync/propagateuploadv1.cpp diff --git a/src/libsync/CMakeLists.txt b/src/libsync/CMakeLists.txt index 450f73f2a..d68ec323c 100644 --- a/src/libsync/CMakeLists.txt +++ b/src/libsync/CMakeLists.txt @@ -51,6 +51,7 @@ set(libsync_SRCS propagatorjobs.cpp propagatedownload.cpp propagateupload.cpp + propagateuploadv1.cpp propagateremotedelete.cpp propagateremotemove.cpp propagateremotemkdir.cpp diff --git a/src/libsync/owncloudpropagator.cpp b/src/libsync/owncloudpropagator.cpp index d73f63c76..39bced518 100644 --- a/src/libsync/owncloudpropagator.cpp +++ b/src/libsync/owncloudpropagator.cpp @@ -270,7 +270,7 @@ PropagateItemJob* OwncloudPropagator::createJob(const SyncFileItemPtr &item) { job->setDeleteExistingFolder(deleteExisting); return job; } else { - auto job = new PropagateUploadFile(this, item); + auto job = new PropagateUploadFileV1(this, item); // FIXME, depending on server version, use the other chunking algorithm job->setDeleteExisting(deleteExisting); return job; } diff --git a/src/libsync/propagateupload.cpp b/src/libsync/propagateupload.cpp index 2783ac9a5..485937f66 100644 --- a/src/libsync/propagateupload.cpp +++ b/src/libsync/propagateupload.cpp @@ -184,7 +184,13 @@ bool PollJob::finished() return true; } -void PropagateUploadFile::start() +void PropagateUploadFileCommon::setDeleteExisting(bool enabled) +{ + _deleteExisting = enabled; +} + + +void PropagateUploadFileCommon::start() { if (_propagator->_abortRequested.fetchAndAddRelaxed(0)) { return; @@ -205,7 +211,7 @@ void PropagateUploadFile::start() job->start(); } -void PropagateUploadFile::slotComputeContentChecksum() +void PropagateUploadFileCommon::slotComputeContentChecksum() { if (_propagator->_abortRequested.fetchAndAddRelaxed(0)) { return; @@ -239,12 +245,7 @@ void PropagateUploadFile::slotComputeContentChecksum() computeChecksum->start(filePath); } -void PropagateUploadFile::setDeleteExisting(bool enabled) -{ - _deleteExisting = enabled; -} - -void PropagateUploadFile::slotComputeTransmissionChecksum(const QByteArray& contentChecksumType, const QByteArray& contentChecksum) +void PropagateUploadFileCommon::slotComputeTransmissionChecksum(const QByteArray& contentChecksumType, const QByteArray& contentChecksum) { _item->_contentChecksum = contentChecksum; _item->_contentChecksumType = contentChecksumType; @@ -276,7 +277,7 @@ void PropagateUploadFile::slotComputeTransmissionChecksum(const QByteArray& cont computeChecksum->start(filePath); } -void PropagateUploadFile::slotStartUpload(const QByteArray& transmissionChecksumType, const QByteArray& transmissionChecksum) +void PropagateUploadFileCommon::slotStartUpload(const QByteArray& transmissionChecksumType, const QByteArray& transmissionChecksum) { // Remove ourselfs from the list of active job, before any posible call to done() // When we start chunks, we will add it again, once for every chunks. @@ -322,23 +323,7 @@ void PropagateUploadFile::slotStartUpload(const QByteArray& transmissionChecksum return; } - _chunkCount = std::ceil(fileSize/double(chunkSize())); - _startChunk = 0; - _transferId = qrand() ^ _item->_modtime ^ (_item->_size << 16); - - const SyncJournalDb::UploadInfo progressInfo = _propagator->_journal->getUploadInfo(_item->_file); - - if (progressInfo._valid && Utility::qDateTimeToTime_t(progressInfo._modtime) == _item->_modtime ) { - _startChunk = progressInfo._chunk; - _transferId = progressInfo._transferid; - qDebug() << Q_FUNC_INFO << _item->_file << ": Resuming from chunk " << _startChunk; - } - - _currentChunk = 0; - _duration.start(); - - emit progress(*_item, 0); - this->startNextChunk(); + doStartUpload(); } UploadDevice::UploadDevice(BandwidthManager *bwm) @@ -476,24 +461,64 @@ void UploadDevice::setChoked(bool b) { } } -void PropagateUploadFile::startNextChunk() +void PropagateUploadFileCommon::startPollJob(const QString& path) { - if (_propagator->_abortRequested.fetchAndAddRelaxed(0)) - return; + PollJob* job = new PollJob(_propagator->account(), path, _item, + _propagator->_journal, _propagator->_localDir, this); + connect(job, SIGNAL(finishedSignal()), SLOT(slotPollFinished())); + SyncJournalDb::PollInfo info; + info._file = _item->_file; + info._url = path; + info._modtime = _item->_modtime; + _propagator->_journal->setPollInfo(info); + _propagator->_journal->commit("add poll info"); + _propagator->_activeJobList.append(this); + job->start(); +} - if (! _jobs.isEmpty() && _currentChunk + _startChunk >= _chunkCount - 1) { - // Don't do parallel upload of chunk if this might be the last chunk because the server cannot handle that - // https://github.com/owncloud/core/issues/11106 - // We return now and when the _jobs are finished we will proceed with the last chunk - // NOTE: Some other parts of the code such as slotUploadProgress also assume that the last chunk - // is sent last. +void PropagateUploadFileCommon::slotPollFinished() +{ + PollJob *job = qobject_cast(sender()); + Q_ASSERT(job); + + _propagator->_activeJobList.removeOne(this); + + if (job->_item->_status != SyncFileItem::Success) { + _finished = true; + done(job->_item->_status, job->_item->_errorString); return; } - quint64 fileSize = _item->_size; + + finalize(); +} + +void PropagateUploadFileCommon::slotJobDestroyed(QObject* job) +{ + _jobs.erase(std::remove(_jobs.begin(), _jobs.end(), job) , _jobs.end()); +} + +void PropagateUploadFileCommon::abort() +{ + foreach(auto *job, _jobs) { + if (job->reply()) { + qDebug() << Q_FUNC_INFO << job << this->_item->_file; + job->reply()->abort(); + } + } +} + +// This function is used whenever there is an error occuring and jobs might be in progress +void PropagateUploadFileCommon::abortWithError(SyncFileItem::Status status, const QString &error) +{ + _finished = true; + abort(); + done(status, error); +} + +QMap PropagateUploadFileCommon::headers() +{ QMap headers; - headers["OC-Total-Length"] = QByteArray::number(fileSize); headers["OC-Async"] = "1"; - headers["OC-Chunk-Size"]= QByteArray::number(quint64(chunkSize())); headers["Content-Type"] = "application/octet-stream"; headers["X-OC-Mtime"] = QByteArray::number(qint64(_item->_modtime)); @@ -509,291 +534,20 @@ void PropagateUploadFile::startNextChunk() } if (!_item->_etag.isEmpty() && _item->_etag != "empty_etag" - && _item->_instruction != CSYNC_INSTRUCTION_NEW // On new files never send a If-Match - && _item->_instruction != CSYNC_INSTRUCTION_TYPE_CHANGE - && !_deleteExisting - ) { + && _item->_instruction != CSYNC_INSTRUCTION_NEW // On new files never send a If-Match + && _item->_instruction != CSYNC_INSTRUCTION_TYPE_CHANGE + && !_deleteExisting + ) { // We add quotes because the owncloud server always adds quotes around the etag, and // csync_owncloud.c's owncloud_file_id always strips the quotes. headers["If-Match"] = '"' + _item->_etag + '"'; } - - QString path = _item->_file; - - UploadDevice *device = new UploadDevice(&_propagator->_bandwidthManager); - qint64 chunkStart = 0; - qint64 currentChunkSize = fileSize; - bool isFinalChunk = false; - if (_chunkCount > 1) { - int sendingChunk = (_currentChunk + _startChunk) % _chunkCount; - // XOR with chunk size to make sure everything goes well if chunk size changes between runs - uint transid = _transferId ^ chunkSize(); - qDebug() << "Upload chunk" << sendingChunk << "of" << _chunkCount << "transferid(remote)=" << transid; - path += QString("-chunking-%1-%2-%3").arg(transid).arg(_chunkCount).arg(sendingChunk); - - headers["OC-Chunked"] = "1"; - - chunkStart = chunkSize() * quint64(sendingChunk); - currentChunkSize = chunkSize(); - if (sendingChunk == _chunkCount - 1) { // last chunk - currentChunkSize = (fileSize % chunkSize()); - if( currentChunkSize == 0 ) { // if the last chunk pretends to be 0, its actually the full chunk size. - currentChunkSize = chunkSize(); - } - isFinalChunk = true; - } - } else { - // if there's only one chunk, it's the final one - isFinalChunk = true; - } - - if (isFinalChunk && !_transmissionChecksumType.isEmpty()) { - headers[checkSumHeaderC] = makeChecksumHeader( - _transmissionChecksumType, _transmissionChecksum); - } - - const QString fileName = _propagator->getFilePath(_item->_file); - if (! device->prepareAndOpen(fileName, chunkStart, currentChunkSize)) { - qDebug() << "ERR: Could not prepare upload device: " << device->errorString(); - - // If the file is currently locked, we want to retry the sync - // when it becomes available again. - if (FileSystem::isFileLocked(fileName)) { - emit _propagator->seenLockedFile(fileName); - } - - // Soft error because this is likely caused by the user modifying his files while syncing - abortWithError( SyncFileItem::SoftError, device->errorString() ); - delete device; - return; - } - - // job takes ownership of device via a QScopedPointer. Job deletes itself when finishing - PUTFileJob* job = new PUTFileJob(_propagator->account(), _propagator->_remoteFolder + path, device, headers, _currentChunk); - _jobs.append(job); - connect(job, SIGNAL(finishedSignal()), this, SLOT(slotPutFinished())); - connect(job, SIGNAL(uploadProgress(qint64,qint64)), this, SLOT(slotUploadProgress(qint64,qint64))); - connect(job, SIGNAL(uploadProgress(qint64,qint64)), device, SLOT(slotJobUploadProgress(qint64,qint64))); - connect(job, SIGNAL(destroyed(QObject*)), this, SLOT(slotJobDestroyed(QObject*))); - job->start(); - _propagator->_activeJobList.append(this); - _currentChunk++; - - bool parallelChunkUpload = true; - QByteArray env = qgetenv("OWNCLOUD_PARALLEL_CHUNK"); - if (!env.isEmpty()) { - parallelChunkUpload = env != "false" && env != "0"; - } else { - int versionNum = _propagator->account()->serverVersionInt(); - if (versionNum < 0x080003) { - // Disable parallel chunk upload severs older than 8.0.3 to avoid too many - // internal sever errors (#2743, #2938) - parallelChunkUpload = false; - } - } - - if (_currentChunk + _startChunk >= _chunkCount - 1) { - // Don't do parallel upload of chunk if this might be the last chunk because the server cannot handle that - // https://github.com/owncloud/core/issues/11106 - parallelChunkUpload = false; - } - - if (parallelChunkUpload && (_propagator->_activeJobList.count() < _propagator->maximumActiveJob()) - && _currentChunk < _chunkCount ) { - startNextChunk(); - } - if (!parallelChunkUpload || _chunkCount - _currentChunk <= 0) { - emit ready(); - } + return headers; } -void PropagateUploadFile::slotPutFinished() +void PropagateUploadFileCommon::finalize() { - PUTFileJob *job = qobject_cast(sender()); - Q_ASSERT(job); - slotJobDestroyed(job); // remove it from the _jobs list - - qDebug() << Q_FUNC_INFO << job->reply()->request().url() << "FINISHED WITH STATUS" - << job->reply()->error() - << (job->reply()->error() == QNetworkReply::NoError ? QLatin1String("") : job->reply()->errorString()) - << job->reply()->attribute(QNetworkRequest::HttpStatusCodeAttribute) - << job->reply()->attribute(QNetworkRequest::HttpReasonPhraseAttribute); - - _propagator->_activeJobList.removeOne(this); - - if (_finished) { - // We have sent the finished signal already. We don't need to handle any remaining jobs - return; - } - - QNetworkReply::NetworkError err = job->reply()->error(); - -#if QT_VERSION < QT_VERSION_CHECK(5, 4, 2) - if (err == QNetworkReply::OperationCanceledError && job->reply()->property(owncloudShouldSoftCancelPropertyName).isValid()) { - // Abort the job and try again later. - // This works around a bug in QNAM wich might reuse a non-empty buffer for the next request. - qDebug() << "Forcing job abort on HTTP connection reset with Qt < 5.4.2."; - _propagator->_anotherSyncNeeded = true; - abortWithError(SyncFileItem::SoftError, tr("Forcing job abort on HTTP connection reset with Qt < 5.4.2.")); - return; - } -#endif - - if (err != QNetworkReply::NoError) { - _item->_httpErrorCode = job->reply()->attribute(QNetworkRequest::HttpStatusCodeAttribute).toInt(); - if(checkForProblemsWithShared(_item->_httpErrorCode, - tr("The file was edited locally but is part of a read only share. " - "It is restored and your edit is in the conflict file."))) { - return; - } - QByteArray replyContent = job->reply()->readAll(); - qDebug() << replyContent; // display the XML error in the debug - QString errorString = errorMessage(job->errorString(), replyContent); - - if (job->reply()->hasRawHeader("OC-ErrorString")) { - errorString = job->reply()->rawHeader("OC-ErrorString"); - } - - if (_item->_httpErrorCode == 412) { - // Precondition Failed: Maybe the bad etag is in the database, we need to clear the - // parent folder etag so we won't read from DB next sync. - _propagator->_journal->avoidReadFromDbOnNextSync(_item->_file); - _propagator->_anotherSyncNeeded = true; - } - - SyncFileItem::Status status = classifyError(err, _item->_httpErrorCode, - &_propagator->_anotherSyncNeeded); - abortWithError(status, errorString); - return; - } - - _item->_httpErrorCode = job->reply()->attribute(QNetworkRequest::HttpStatusCodeAttribute).toInt(); - // The server needs some time to process the request and provide us with a poll URL - if (_item->_httpErrorCode == 202) { - _finished = true; - QString path = QString::fromUtf8(job->reply()->rawHeader("OC-Finish-Poll")); - if (path.isEmpty()) { - done(SyncFileItem::NormalError, tr("Poll URL missing")); - return; - } - startPollJob(path); - return; - } - - // Check the file again post upload. - // Two cases must be considered separately: If the upload is finished, - // the file is on the server and has a changed ETag. In that case, - // the etag has to be properly updated in the client journal, and because - // of that we can bail out here with an error. But we can reschedule a - // sync ASAP. - // But if the upload is ongoing, because not all chunks were uploaded - // yet, the upload can be stopped and an error can be displayed, because - // the server hasn't registered the new file yet. - QByteArray etag = getEtagFromReply(job->reply()); - bool finished = etag.length() > 0; - - // Check if the file still exists - const QString fullFilePath(_propagator->getFilePath(_item->_file)); - if( !FileSystem::fileExists(fullFilePath) ) { - if (!finished) { - abortWithError(SyncFileItem::SoftError, tr("The local file was removed during sync.")); - return; - } else { - _propagator->_anotherSyncNeeded = true; - } - } - - // Check whether the file changed since discovery. - if (! FileSystem::verifyFileUnchanged(fullFilePath, _item->_size, _item->_modtime)) { - _propagator->_anotherSyncNeeded = true; - if( !finished ) { - abortWithError(SyncFileItem::SoftError, tr("Local file changed during sync.")); - // FIXME: the legacy code was retrying for a few seconds. - // and also checking that after the last chunk, and removed the file in case of INSTRUCTION_NEW - return; - } - } - - if (!finished) { - // Proceed to next chunk. - if (_currentChunk >= _chunkCount) { - if (!_jobs.empty()) { - // just wait for the other job to finish. - return; - } - _finished = true; - done(SyncFileItem::NormalError, tr("The server did not acknowledge the last chunk. (No e-tag was present)")); - return; - } - - // Deletes an existing blacklist entry on successful chunk upload - if (_item->_hasBlacklistEntry) { - _propagator->_journal->wipeErrorBlacklistEntry(_item->_file); - _item->_hasBlacklistEntry = false; - } - - SyncJournalDb::UploadInfo pi; - pi._valid = true; - auto currentChunk = job->_chunk; - foreach (auto *job, _jobs) { - // Take the minimum finished one - if (auto putJob = qobject_cast(job)) { - currentChunk = qMin(currentChunk, putJob->_chunk - 1); - } - } - pi._chunk = (currentChunk + _startChunk + 1) % _chunkCount ; // next chunk to start with - pi._transferid = _transferId; - pi._modtime = Utility::qDateTimeFromTime_t(_item->_modtime); - _propagator->_journal->setUploadInfo(_item->_file, pi); - _propagator->_journal->commit("Upload info"); - startNextChunk(); - return; - } - - // the following code only happens after all chunks were uploaded. - _finished = true; - // the file id should only be empty for new files up- or downloaded - QByteArray fid = job->reply()->rawHeader("OC-FileID"); - if( !fid.isEmpty() ) { - if( !_item->_fileId.isEmpty() && _item->_fileId != fid ) { - qDebug() << "WARN: File ID changed!" << _item->_fileId << fid; - } - _item->_fileId = fid; - } - - _item->_etag = etag; - - _item->_responseTimeStamp = job->responseTimestamp(); - - if (job->reply()->rawHeader("X-OC-MTime") != "accepted") { - // X-OC-MTime is supported since owncloud 5.0. But not when chunking. - // Normally Owncloud 6 always puts X-OC-MTime - qWarning() << "Server does not support X-OC-MTime" << job->reply()->rawHeader("X-OC-MTime"); - // Well, the mtime was not set - done(SyncFileItem::SoftError, "Server does not support X-OC-MTime"); - } - - // performance logging - _item->_requestDuration = _stopWatch.stop(); - qDebug() << "*==* duration UPLOAD" << _item->_size - << _stopWatch.durationOfLap(QLatin1String("ContentChecksum")) - << _stopWatch.durationOfLap(QLatin1String("TransmissionChecksum")) - << _item->_requestDuration; - // The job might stay alive for the whole sync, release this tiny bit of memory. - _stopWatch.reset(); - - finalize(*_item); -} - -void PropagateUploadFile::finalize(const SyncFileItem ©) -{ - // Normally, copy == _item, but when it comes from the UpdateMTimeAndETagJob, we need to do - // some updates - _item->_etag = copy._etag; - _item->_fileId = copy._fileId; - _item->_requestDuration = _duration.elapsed(); - _finished = true; if (!_propagator->_journal->setFileRecord(SyncJournalFileRecord(*_item, _propagator->getFilePath(_item->_file)))) { @@ -807,92 +561,5 @@ void PropagateUploadFile::finalize(const SyncFileItem ©) done(SyncFileItem::Success); } -void PropagateUploadFile::slotUploadProgress(qint64 sent, qint64 total) -{ - // Completion is signaled with sent=0, total=0; avoid accidentally - // resetting progress due to the sent being zero by ignoring it. - // finishedSignal() is bound to be emitted soon anyway. - // See https://bugreports.qt.io/browse/QTBUG-44782. - if (sent == 0 && total == 0) { - return; - } - - int progressChunk = _currentChunk + _startChunk - 1; - if (progressChunk >= _chunkCount) - progressChunk = _currentChunk - 1; - - // amount is the number of bytes already sent by all the other chunks that were sent - // not including this one. - // FIXME: this assumes all chunks have the same size, which is true only if the last chunk - // has not been finished (which should not happen because the last chunk is sent sequentially) - quint64 amount = progressChunk * chunkSize(); - - sender()->setProperty("byteWritten", sent); - if (_jobs.count() > 1) { - amount -= (_jobs.count() -1) * chunkSize(); - foreach (QObject *j, _jobs) { - amount += j->property("byteWritten").toULongLong(); - } - } else { - // sender() is the only current job, no need to look at the byteWritten properties - amount += sent; - } - emit progress(*_item, amount); -} - -void PropagateUploadFile::startPollJob(const QString& path) -{ - PollJob* job = new PollJob(_propagator->account(), path, _item, - _propagator->_journal, _propagator->_localDir, this); - connect(job, SIGNAL(finishedSignal()), SLOT(slotPollFinished())); - SyncJournalDb::PollInfo info; - info._file = _item->_file; - info._url = path; - info._modtime = _item->_modtime; - _propagator->_journal->setPollInfo(info); - _propagator->_journal->commit("add poll info"); - _propagator->_activeJobList.append(this); - job->start(); -} - -void PropagateUploadFile::slotPollFinished() -{ - PollJob *job = qobject_cast(sender()); - Q_ASSERT(job); - - _propagator->_activeJobList.removeOne(this); - - if (job->_item->_status != SyncFileItem::Success) { - _finished = true; - done(job->_item->_status, job->_item->_errorString); - return; - } - - finalize(*job->_item); -} - -void PropagateUploadFile::slotJobDestroyed(QObject* job) -{ - _jobs.erase(std::remove(_jobs.begin(), _jobs.end(), job) , _jobs.end()); -} - -void PropagateUploadFile::abort() -{ - foreach(auto *job, _jobs) { - if (job->reply()) { - qDebug() << Q_FUNC_INFO << job << this->_item->_file; - job->reply()->abort(); - } - } -} - -// This function is used whenever there is an error occuring and jobs might be in progress -void PropagateUploadFile::abortWithError(SyncFileItem::Status status, const QString &error) -{ - _finished = true; - abort(); - done(status, error); -} - } diff --git a/src/libsync/propagateupload.h b/src/libsync/propagateupload.h index 2be0212e9..ea9f546b8 100644 --- a/src/libsync/propagateupload.h +++ b/src/libsync/propagateupload.h @@ -155,10 +155,87 @@ signals: }; /** - * @brief The PropagateUploadFile class + * @brief The PropagateUploadFileCommon class is the code common between all chunking algorithms * @ingroup libsync + * + * State Machine: + * + * +---> start() --> (delete job) -------+ + * | | + * +--> slotComputeContentChecksum() <---+ + * | + * v + * slotComputeTransmissionChecksum() + * | + * v + * slotStartUpload() -> doStartUpload() + * . + * . + * v + * finalize() or abortWithError() or startPollJob() */ -class PropagateUploadFile : public PropagateItemJob { +class PropagateUploadFileCommon : public PropagateItemJob { + Q_OBJECT + +protected: + QElapsedTimer _duration; + QVector _jobs; /// network jobs that are currently in transit + bool _finished; /// Tells that all the jobs have been finished + bool _deleteExisting; + + // measure the performance of checksum calc and upload + Utility::StopWatch _stopWatch; + + QByteArray _transmissionChecksum; + QByteArray _transmissionChecksumType; + + +public: + PropagateUploadFileCommon(OwncloudPropagator* propagator,const SyncFileItemPtr& item) + : PropagateItemJob(propagator, item), _finished(false), _deleteExisting(false) {} + + /** + * Whether an existing entity with the same name may be deleted before + * the upload. + * + * Default: false. + */ + void setDeleteExisting(bool enabled); + + void start() Q_DECL_OVERRIDE; +private slots: + void slotComputeContentChecksum(); + // Content checksum computed, compute the transmission checksum + void slotComputeTransmissionChecksum(const QByteArray& contentChecksumType, const QByteArray& contentChecksum); + // transmission checksum computed, prepare the upload + void slotStartUpload(const QByteArray& transmissionChecksumType, const QByteArray& transmissionChecksum); +public: + virtual void doStartUpload() = 0; + + void startPollJob(const QString& path); + void finalize(); + void abortWithError(SyncFileItem::Status status, const QString &error); + +public slots: + void abort() Q_DECL_OVERRIDE; + void slotJobDestroyed(QObject *job); + +private slots: + void slotPollFinished(); + +protected: + // Bases headers that need to be sent with every chunk + QMap headers(); + +}; + +/** + * @ingroup libsync + * + * Propagation job, impementing the old chunking agorithm + * + */ +class PropagateUploadFileV1 : public PropagateUploadFileCommon { Q_OBJECT private: @@ -176,48 +253,21 @@ private: int _currentChunk; int _chunkCount; /// Total number of chunks for this file int _transferId; /// transfer id (part of the url) - QElapsedTimer _duration; - QVector _jobs; /// network jobs that are currently in transit - bool _finished; // Tells that all the jobs have been finished - - // measure the performance of checksum calc and upload - Utility::StopWatch _stopWatch; - - QByteArray _transmissionChecksum; - QByteArray _transmissionChecksumType; - - bool _deleteExisting; quint64 chunkSize() const { return _propagator->chunkSize(); } -public: - PropagateUploadFile(OwncloudPropagator* propagator,const SyncFileItemPtr& item) - : PropagateItemJob(propagator, item), _startChunk(0), _currentChunk(0), _chunkCount(0), _transferId(0), _finished(false), _deleteExisting(false) {} - void start() Q_DECL_OVERRIDE; - /** - * Whether an existing entity with the same name may be deleted before - * the upload. - * - * Default: false. - */ - void setDeleteExisting(bool enabled); + +public: + PropagateUploadFileV1(OwncloudPropagator* propagator,const SyncFileItemPtr& item) : + PropagateUploadFileCommon(propagator,item) {} + + void doStartUpload() Q_DECL_OVERRIDE; private slots: - void slotPutFinished(); - void slotPollFinished(); - void slotUploadProgress(qint64,qint64); - void abort() Q_DECL_OVERRIDE; void startNextChunk(); - void finalize(const SyncFileItem&); - void slotJobDestroyed(QObject *job); - void slotStartUpload(const QByteArray& transmissionChecksumType, const QByteArray& transmissionChecksum); - void slotComputeTransmissionChecksum(const QByteArray& contentChecksumType, const QByteArray& contentChecksum); - void slotComputeContentChecksum(); - -private: - void startPollJob(const QString& path); - void abortWithError(SyncFileItem::Status status, const QString &error); + void slotPutFinished(); + void slotUploadProgress(qint64,qint64); }; } diff --git a/src/libsync/propagateuploadv1.cpp b/src/libsync/propagateuploadv1.cpp new file mode 100644 index 000000000..c126164b3 --- /dev/null +++ b/src/libsync/propagateuploadv1.cpp @@ -0,0 +1,376 @@ +/* + * Copyright (C) by Olivier Goffart + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * for more details. + */ + +#include "config.h" +#include "propagateupload.h" +#include "owncloudpropagator_p.h" +#include "networkjobs.h" +#include "account.h" +#include "syncjournaldb.h" +#include "syncjournalfilerecord.h" +#include "utility.h" +#include "filesystem.h" +#include "propagatorjobs.h" +#include "checksums.h" +#include "syncengine.h" +#include "propagateremotedelete.h" + +#include +#include +#include +#include +#include +#include + +namespace OCC { +void PropagateUploadFileV1::doStartUpload() +{ + _chunkCount = std::ceil(_item->_size / double(chunkSize())); + _startChunk = 0; + _transferId = qrand() ^ _item->_modtime ^ (_item->_size << 16); + + const SyncJournalDb::UploadInfo progressInfo = _propagator->_journal->getUploadInfo(_item->_file); + + if (progressInfo._valid && Utility::qDateTimeToTime_t(progressInfo._modtime) == _item->_modtime ) { + _startChunk = progressInfo._chunk; + _transferId = progressInfo._transferid; + qDebug() << Q_FUNC_INFO << _item->_file << ": Resuming from chunk " << _startChunk; + } + + _currentChunk = 0; + _duration.start(); + + emit progress(*_item, 0); + startNextChunk(); +} + +void PropagateUploadFileV1::startNextChunk() +{ + if (_propagator->_abortRequested.fetchAndAddRelaxed(0)) + return; + + if (! _jobs.isEmpty() && _currentChunk + _startChunk >= _chunkCount - 1) { + // Don't do parallel upload of chunk if this might be the last chunk because the server cannot handle that + // https://github.com/owncloud/core/issues/11106 + // We return now and when the _jobs are finished we will proceed with the last chunk + // NOTE: Some other parts of the code such as slotUploadProgress also assume that the last chunk + // is sent last. + return; + } + quint64 fileSize = _item->_size; + auto headers = PropagateUploadFileCommon::headers(); + headers["OC-Total-Length"] = QByteArray::number(fileSize); + headers["OC-Chunk-Size"]= QByteArray::number(quint64(chunkSize())); + + QString path = _item->_file; + + UploadDevice *device = new UploadDevice(&_propagator->_bandwidthManager); + qint64 chunkStart = 0; + qint64 currentChunkSize = fileSize; + bool isFinalChunk = false; + if (_chunkCount > 1) { + int sendingChunk = (_currentChunk + _startChunk) % _chunkCount; + // XOR with chunk size to make sure everything goes well if chunk size changes between runs + uint transid = _transferId ^ chunkSize(); + qDebug() << "Upload chunk" << sendingChunk << "of" << _chunkCount << "transferid(remote)=" << transid; + path += QString("-chunking-%1-%2-%3").arg(transid).arg(_chunkCount).arg(sendingChunk); + + headers["OC-Chunked"] = "1"; + + chunkStart = chunkSize() * quint64(sendingChunk); + currentChunkSize = chunkSize(); + if (sendingChunk == _chunkCount - 1) { // last chunk + currentChunkSize = (fileSize % chunkSize()); + if( currentChunkSize == 0 ) { // if the last chunk pretends to be 0, its actually the full chunk size. + currentChunkSize = chunkSize(); + } + isFinalChunk = true; + } + } else { + // if there's only one chunk, it's the final one + isFinalChunk = true; + } + + if (isFinalChunk && !_transmissionChecksumType.isEmpty()) { + headers[checkSumHeaderC] = makeChecksumHeader( + _transmissionChecksumType, _transmissionChecksum); + } + + const QString fileName = _propagator->getFilePath(_item->_file); + if (! device->prepareAndOpen(fileName, chunkStart, currentChunkSize)) { + qDebug() << "ERR: Could not prepare upload device: " << device->errorString(); + + // If the file is currently locked, we want to retry the sync + // when it becomes available again. + if (FileSystem::isFileLocked(fileName)) { + emit _propagator->seenLockedFile(fileName); + } + // Soft error because this is likely caused by the user modifying his files while syncing + abortWithError( SyncFileItem::SoftError, device->errorString() ); + delete device; + return; + } + + // job takes ownership of device via a QScopedPointer. Job deletes itself when finishing + PUTFileJob* job = new PUTFileJob(_propagator->account(), _propagator->_remoteFolder + path, device, headers, _currentChunk); + _jobs.append(job); + connect(job, SIGNAL(finishedSignal()), this, SLOT(slotPutFinished())); + connect(job, SIGNAL(uploadProgress(qint64,qint64)), this, SLOT(slotUploadProgress(qint64,qint64))); + connect(job, SIGNAL(uploadProgress(qint64,qint64)), device, SLOT(slotJobUploadProgress(qint64,qint64))); + connect(job, SIGNAL(destroyed(QObject*)), this, SLOT(slotJobDestroyed(QObject*))); + job->start(); + _propagator->_activeJobList.append(this); + _currentChunk++; + + bool parallelChunkUpload = true; + QByteArray env = qgetenv("OWNCLOUD_PARALLEL_CHUNK"); + if (!env.isEmpty()) { + parallelChunkUpload = env != "false" && env != "0"; + } else { + int versionNum = _propagator->account()->serverVersionInt(); + if (versionNum < 0x080003) { + // Disable parallel chunk upload severs older than 8.0.3 to avoid too many + // internal sever errors (#2743, #2938) + parallelChunkUpload = false; + } + } + + if (_currentChunk + _startChunk >= _chunkCount - 1) { + // Don't do parallel upload of chunk if this might be the last chunk because the server cannot handle that + // https://github.com/owncloud/core/issues/11106 + parallelChunkUpload = false; + } + + if (parallelChunkUpload && (_propagator->_activeJobList.count() < _propagator->maximumActiveJob()) + && _currentChunk < _chunkCount ) { + startNextChunk(); + } + if (!parallelChunkUpload || _chunkCount - _currentChunk <= 0) { + emit ready(); + } +} + +void PropagateUploadFileV1::slotPutFinished() +{ + PUTFileJob *job = qobject_cast(sender()); + Q_ASSERT(job); + slotJobDestroyed(job); // remove it from the _jobs list + + qDebug() << Q_FUNC_INFO << job->reply()->request().url() << "FINISHED WITH STATUS" + << job->reply()->error() + << (job->reply()->error() == QNetworkReply::NoError ? QLatin1String("") : job->reply()->errorString()) + << job->reply()->attribute(QNetworkRequest::HttpStatusCodeAttribute) + << job->reply()->attribute(QNetworkRequest::HttpReasonPhraseAttribute); + + _propagator->_activeJobList.removeOne(this); + + if (_finished) { + // We have sent the finished signal already. We don't need to handle any remaining jobs + return; + } + + QNetworkReply::NetworkError err = job->reply()->error(); + +#if QT_VERSION < QT_VERSION_CHECK(5, 4, 2) + if (err == QNetworkReply::OperationCanceledError && job->reply()->property(owncloudShouldSoftCancelPropertyName).isValid()) { + // Abort the job and try again later. + // This works around a bug in QNAM wich might reuse a non-empty buffer for the next request. + qDebug() << "Forcing job abort on HTTP connection reset with Qt < 5.4.2."; + _propagator->_anotherSyncNeeded = true; + abortWithError(SyncFileItem::SoftError, tr("Forcing job abort on HTTP connection reset with Qt < 5.4.2.")); + return; + } +#endif + + if (err != QNetworkReply::NoError) { + _item->_httpErrorCode = job->reply()->attribute(QNetworkRequest::HttpStatusCodeAttribute).toInt(); + if(checkForProblemsWithShared(_item->_httpErrorCode, + tr("The file was edited locally but is part of a read only share. " + "It is restored and your edit is in the conflict file."))) { + return; + } + QByteArray replyContent = job->reply()->readAll(); + qDebug() << replyContent; // display the XML error in the debug + QString errorString = errorMessage(job->errorString(), replyContent); + + if (job->reply()->hasRawHeader("OC-ErrorString")) { + errorString = job->reply()->rawHeader("OC-ErrorString"); + } + + if (_item->_httpErrorCode == 412) { + // Precondition Failed: Maybe the bad etag is in the database, we need to clear the + // parent folder etag so we won't read from DB next sync. + _propagator->_journal->avoidReadFromDbOnNextSync(_item->_file); + _propagator->_anotherSyncNeeded = true; + } + + SyncFileItem::Status status = classifyError(err, _item->_httpErrorCode, + &_propagator->_anotherSyncNeeded); + abortWithError(status, errorString); + return; + } + + _item->_httpErrorCode = job->reply()->attribute(QNetworkRequest::HttpStatusCodeAttribute).toInt(); + // The server needs some time to process the request and provide us with a poll URL + if (_item->_httpErrorCode == 202) { + _finished = true; + QString path = QString::fromUtf8(job->reply()->rawHeader("OC-Finish-Poll")); + if (path.isEmpty()) { + done(SyncFileItem::NormalError, tr("Poll URL missing")); + return; + } + startPollJob(path); + return; + } + + // Check the file again post upload. + // Two cases must be considered separately: If the upload is finished, + // the file is on the server and has a changed ETag. In that case, + // the etag has to be properly updated in the client journal, and because + // of that we can bail out here with an error. But we can reschedule a + // sync ASAP. + // But if the upload is ongoing, because not all chunks were uploaded + // yet, the upload can be stopped and an error can be displayed, because + // the server hasn't registered the new file yet. + QByteArray etag = getEtagFromReply(job->reply()); + bool finished = etag.length() > 0; + + // Check if the file still exists + const QString fullFilePath(_propagator->getFilePath(_item->_file)); + if( !FileSystem::fileExists(fullFilePath) ) { + if (!finished) { + abortWithError(SyncFileItem::SoftError, tr("The local file was removed during sync.")); + return; + } else { + _propagator->_anotherSyncNeeded = true; + } + } + + // Check whether the file changed since discovery. + if (! FileSystem::verifyFileUnchanged(fullFilePath, _item->_size, _item->_modtime)) { + _propagator->_anotherSyncNeeded = true; + if( !finished ) { + abortWithError(SyncFileItem::SoftError, tr("Local file changed during sync.")); + // FIXME: the legacy code was retrying for a few seconds. + // and also checking that after the last chunk, and removed the file in case of INSTRUCTION_NEW + return; + } + } + + if (!finished) { + // Proceed to next chunk. + if (_currentChunk >= _chunkCount) { + if (!_jobs.empty()) { + // just wait for the other job to finish. + return; + } + _finished = true; + done(SyncFileItem::NormalError, tr("The server did not acknowledge the last chunk. (No e-tag was present)")); + return; + } + + // Deletes an existing blacklist entry on successful chunk upload + if (_item->_hasBlacklistEntry) { + _propagator->_journal->wipeErrorBlacklistEntry(_item->_file); + _item->_hasBlacklistEntry = false; + } + + SyncJournalDb::UploadInfo pi; + pi._valid = true; + auto currentChunk = job->_chunk; + foreach (auto *job, _jobs) { + // Take the minimum finished one + if (auto putJob = qobject_cast(job)) { + currentChunk = qMin(currentChunk, putJob->_chunk - 1); + } + } + pi._chunk = (currentChunk + _startChunk + 1) % _chunkCount ; // next chunk to start with + pi._transferid = _transferId; + pi._modtime = Utility::qDateTimeFromTime_t(_item->_modtime); + _propagator->_journal->setUploadInfo(_item->_file, pi); + _propagator->_journal->commit("Upload info"); + startNextChunk(); + return; + } + + // the following code only happens after all chunks were uploaded. + _finished = true; + // the file id should only be empty for new files up- or downloaded + QByteArray fid = job->reply()->rawHeader("OC-FileID"); + if( !fid.isEmpty() ) { + if( !_item->_fileId.isEmpty() && _item->_fileId != fid ) { + qDebug() << "WARN: File ID changed!" << _item->_fileId << fid; + } + _item->_fileId = fid; + } + + _item->_etag = etag; + + _item->_responseTimeStamp = job->responseTimestamp(); + + if (job->reply()->rawHeader("X-OC-MTime") != "accepted") { + // X-OC-MTime is supported since owncloud 5.0. But not when chunking. + // Normally Owncloud 6 always puts X-OC-MTime + qWarning() << "Server does not support X-OC-MTime" << job->reply()->rawHeader("X-OC-MTime"); + // Well, the mtime was not set + done(SyncFileItem::SoftError, "Server does not support X-OC-MTime"); + } + + // performance logging + _item->_requestDuration = _stopWatch.stop(); + qDebug() << "*==* duration UPLOAD" << _item->_size + << _stopWatch.durationOfLap(QLatin1String("ContentChecksum")) + << _stopWatch.durationOfLap(QLatin1String("TransmissionChecksum")) + << _item->_requestDuration; + // The job might stay alive for the whole sync, release this tiny bit of memory. + _stopWatch.reset(); + + finalize(); +} + + +void PropagateUploadFileV1::slotUploadProgress(qint64 sent, qint64 total) +{ + // Completion is signaled with sent=0, total=0; avoid accidentally + // resetting progress due to the sent being zero by ignoring it. + // finishedSignal() is bound to be emitted soon anyway. + // See https://bugreports.qt.io/browse/QTBUG-44782. + if (sent == 0 && total == 0) { + return; + } + + int progressChunk = _currentChunk + _startChunk - 1; + if (progressChunk >= _chunkCount) + progressChunk = _currentChunk - 1; + + // amount is the number of bytes already sent by all the other chunks that were sent + // not including this one. + // FIXME: this assumes all chunks have the same size, which is true only if the last chunk + // has not been finished (which should not happen because the last chunk is sent sequentially) + quint64 amount = progressChunk * chunkSize(); + + sender()->setProperty("byteWritten", sent); + if (_jobs.count() > 1) { + amount -= (_jobs.count() -1) * chunkSize(); + foreach (QObject *j, _jobs) { + amount += j->property("byteWritten").toULongLong(); + } + } else { + // sender() is the only current job, no need to look at the byteWritten properties + amount += sent; + } + emit progress(*_item, amount); +} + +} From a1558100b8345e07026bfc4298fe64895df3aeea Mon Sep 17 00:00:00 2001 From: Olivier Goffart Date: Tue, 2 Aug 2016 13:48:56 +0200 Subject: [PATCH 02/19] WIP: new chunking algorithm Current limitations of this WiP - No resuming implemented yet - No parallel chunks - Hackish way to get the webdav paths --- src/libsync/CMakeLists.txt | 1 + src/libsync/account.cpp | 7 +- src/libsync/account.h | 3 + src/libsync/networkjobs.cpp | 8 +- src/libsync/networkjobs.h | 2 + src/libsync/propagateremotemove.cpp | 10 +- src/libsync/propagateremotemove.h | 4 + src/libsync/propagateupload.cpp | 3 +- src/libsync/propagateupload.h | 40 ++++ src/libsync/propagateuploadng.cpp | 351 ++++++++++++++++++++++++++++ 10 files changed, 424 insertions(+), 5 deletions(-) create mode 100644 src/libsync/propagateuploadng.cpp diff --git a/src/libsync/CMakeLists.txt b/src/libsync/CMakeLists.txt index d68ec323c..bae54bf37 100644 --- a/src/libsync/CMakeLists.txt +++ b/src/libsync/CMakeLists.txt @@ -52,6 +52,7 @@ set(libsync_SRCS propagatedownload.cpp propagateupload.cpp propagateuploadv1.cpp + propagateuploadng.cpp propagateremotedelete.cpp propagateremotemove.cpp propagateremotemkdir.cpp diff --git a/src/libsync/account.cpp b/src/libsync/account.cpp index 510dfd3aa..07ab0970f 100644 --- a/src/libsync/account.cpp +++ b/src/libsync/account.cpp @@ -76,11 +76,14 @@ AccountPtr Account::sharedFromThis() return _sharedThis.toStrongRef(); } +QString Account::user() const +{ + return _credentials->user(); +} QString Account::displayName() const { - auto user = _credentials->user(); - QString dn = QString("%1@%2").arg(user, _url.host()); + QString dn = QString("%1@%2").arg(user(), _url.host()); int port = url().port(); if (port > 0 && port != 80 && port != 443) { dn.append(QLatin1Char(':')); diff --git a/src/libsync/account.h b/src/libsync/account.h index c4852e3ae..797cd1422 100644 --- a/src/libsync/account.h +++ b/src/libsync/account.h @@ -75,6 +75,9 @@ public: void setSharedThis(AccountPtr sharedThis); AccountPtr sharedFromThis(); + /// The user that can be used in dav url + QString user() const; + /// The name of the account as shown in the toolbar QString displayName() const; diff --git a/src/libsync/networkjobs.cpp b/src/libsync/networkjobs.cpp index ab5598468..5fd3c83a6 100644 --- a/src/libsync/networkjobs.cpp +++ b/src/libsync/networkjobs.cpp @@ -106,6 +106,11 @@ MkColJob::MkColJob(AccountPtr account, const QString &path, QObject *parent) { } +MkColJob::MkColJob(AccountPtr account, const QUrl &url, QObject *parent) + : AbstractNetworkJob(account, QString(), parent), _url(url) +{ +} + void MkColJob::start() { // add 'Content-Length: 0' header (see https://github.com/owncloud/client/issues/3256) @@ -113,7 +118,8 @@ void MkColJob::start() req.setRawHeader("Content-Length", "0"); // assumes ownership - QNetworkReply *reply = davRequest("MKCOL", path(), req); + QNetworkReply *reply = _url.isValid() ? davRequest("MKCOL", _url, req) + : davRequest("MKCOL", path(), req); setReply(reply); setupConnections(reply); AbstractNetworkJob::start(); diff --git a/src/libsync/networkjobs.h b/src/libsync/networkjobs.h index c2cc978c4..94deb3af1 100644 --- a/src/libsync/networkjobs.h +++ b/src/libsync/networkjobs.h @@ -170,8 +170,10 @@ private: */ class OWNCLOUDSYNC_EXPORT MkColJob : public AbstractNetworkJob { Q_OBJECT + QUrl _url; // Only used if the constructor taking a url is taken. public: explicit MkColJob(AccountPtr account, const QString &path, QObject *parent = 0); + explicit MkColJob(AccountPtr account, const QUrl &url, QObject *parent = 0); void start() Q_DECL_OVERRIDE; signals: diff --git a/src/libsync/propagateremotemove.cpp b/src/libsync/propagateremotemove.cpp index 5a75e761e..4bc8e3a94 100644 --- a/src/libsync/propagateremotemove.cpp +++ b/src/libsync/propagateremotemove.cpp @@ -27,12 +27,20 @@ MoveJob::MoveJob(AccountPtr account, const QString& path, : AbstractNetworkJob(account, path, parent), _destination(destination) { } +MoveJob::MoveJob(AccountPtr account, const QUrl& url, const QString &destination, + QMap extraHeaders, QObject* parent) + : AbstractNetworkJob(account, QString(), parent), _destination(destination), _url(url) + , _extraHeaders(extraHeaders) +{ } void MoveJob::start() { QNetworkRequest req; req.setRawHeader("Destination", QUrl::toPercentEncoding(_destination, "/")); - setReply(davRequest("MOVE", path(), req)); + for(auto it = _extraHeaders.constBegin(); it != _extraHeaders.constEnd(); ++it) { + req.setRawHeader(it.key(), it.value()); + } + setReply(_url.isValid() ? davRequest("MOVE", _url, req) : davRequest("MOVE", path(), req)); setupConnections(reply()); if( reply()->error() != QNetworkReply::NoError ) { diff --git a/src/libsync/propagateremotemove.h b/src/libsync/propagateremotemove.h index 3ce54f4a8..4146d4b17 100644 --- a/src/libsync/propagateremotemove.h +++ b/src/libsync/propagateremotemove.h @@ -25,8 +25,12 @@ namespace OCC { class MoveJob : public AbstractNetworkJob { Q_OBJECT const QString _destination; + const QUrl _url; // Only used (instead of path) when the constructor taking an URL is used + QMap _extraHeaders; public: explicit MoveJob(AccountPtr account, const QString& path, const QString &destination, QObject* parent = 0); + explicit MoveJob(AccountPtr account, const QUrl& url, const QString &destination, + QMap _extraHeaders, QObject* parent = 0); void start() Q_DECL_OVERRIDE; bool finished() Q_DECL_OVERRIDE; diff --git a/src/libsync/propagateupload.cpp b/src/libsync/propagateupload.cpp index 485937f66..388752ad9 100644 --- a/src/libsync/propagateupload.cpp +++ b/src/libsync/propagateupload.cpp @@ -72,7 +72,8 @@ void PUTFileJob::start() { req.setRawHeader(it.key(), it.value()); } - setReply(davRequest("PUT", path(), req, _device.data())); + setReply(_url.isValid() ? davRequest("PUT", _url, req, _device.data()) + : davRequest("PUT", path(), req, _device.data())); setupConnections(reply()); if( reply()->error() != QNetworkReply::NoError ) { diff --git a/src/libsync/propagateupload.h b/src/libsync/propagateupload.h index ea9f546b8..4a06f2f13 100644 --- a/src/libsync/propagateupload.h +++ b/src/libsync/propagateupload.h @@ -89,12 +89,17 @@ private: QScopedPointer _device; QMap _headers; QString _errorString; + QUrl _url; public: // Takes ownership of the device explicit PUTFileJob(AccountPtr account, const QString& path, QIODevice *device, const QMap &headers, int chunk, QObject* parent = 0) : AbstractNetworkJob(account, path, parent), _device(device), _headers(headers), _chunk(chunk) {} + explicit PUTFileJob(AccountPtr account, const QUrl& url, QIODevice *device, + const QMap &headers, int chunk, QObject* parent = 0) + : AbstractNetworkJob(account, QString(), parent), _device(device), _headers(headers) + , _url(url), _chunk(chunk) {} ~PUTFileJob(); int _chunk; @@ -270,5 +275,40 @@ private slots: void slotUploadProgress(qint64,qint64); }; +/** + * @ingroup libsync + * + * Propagation job, impementing the new chunking agorithm + * + */ +class PropagateUploadFileNG : public PropagateUploadFileCommon { + Q_OBJECT +private: + quint64 _sent; /// amount of data that was already sent + uint _transferId; /// transfer id (part of the url) + int _currentChunk; + + quint64 chunkSize() const { return _propagator->chunkSize(); } + /** + * Return the URL of a chunk. + * If chunk == -1, returns the URL of the parent folder containing the chunks + */ + QUrl chunkUrl(int chunk = -1); + +public: + PropagateUploadFileNG(OwncloudPropagator* propagator,const SyncFileItemPtr& item) : + PropagateUploadFileCommon(propagator,item) {} + + void doStartUpload() Q_DECL_OVERRIDE; + +private slots: + void slotMkColFinished(QNetworkReply::NetworkError); + void startNextChunk(); + void slotPutFinished(); + void slotMoveJobFinished(); + void slotUploadProgress(qint64,qint64); +}; + + } diff --git a/src/libsync/propagateuploadng.cpp b/src/libsync/propagateuploadng.cpp new file mode 100644 index 000000000..d2f89aba8 --- /dev/null +++ b/src/libsync/propagateuploadng.cpp @@ -0,0 +1,351 @@ +/* + * Copyright (C) by Olivier Goffart + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * for more details. + */ + +#include "config.h" +#include "propagateupload.h" +#include "owncloudpropagator_p.h" +#include "networkjobs.h" +#include "account.h" +#include "syncjournaldb.h" +#include "syncjournalfilerecord.h" +#include "utility.h" +#include "filesystem.h" +#include "propagatorjobs.h" +#include "syncengine.h" +#include "propagateremotemove.h" + +#include +#include +#include +#include +#include + +namespace OCC { + +QUrl PropagateUploadFileNG::chunkUrl(int chunk) +{ + // FIXME! we should not use the user from the credential, we should have it in the account + QString path = QLatin1String("remote.php/dav/uploads/") + + _propagator->account()->user() + + QLatin1Char('/') + QString::number(_transferId); + if (chunk >= 0) { + path += QLatin1Char('/') + QString::number(chunk); + } + return Account::concatUrlPath(_propagator->account()->url(), path); +} + +/* + State machine: + + *----> doStartUpload() + Check the db: is there an entry? + / \ + no yes + / \ + MKCOL PROPFIND (TODO) + | + slotMkColFinished() : + | : + +-----+---------------------+ + | + +----> startNextChunk() ---finished? --+ + ^ | | + +---------------+ | + | + +----------------------------------------+ + | + +-> MOVE ------> moveJobFinished() ---> finalize() + + + */ + + + +void PropagateUploadFileNG::doStartUpload() +{ + _transferId = qrand() ^ _item->_modtime ^ (_item->_size << 16) ^ qHash(_item->_file); + +#if FUTURE + const SyncJournalDb::UploadInfo progressInfo = _propagator->_journal->getUploadInfo(_item->_file); + + if (progressInfo._valid && Utility::qDateTimeToTime_t(progressInfo._modtime) == _item->_modtime ) { + _transferId = progressInfo._transferid; + // TODO: make a PROPFIND call to check what the size on the server is + //_startChunk = progressInfo._chunk; + //qDebug() << Q_FUNC_INFO << _item->_file << ": Resuming from chunk " << _startChunk; + + } +#endif + _propagator->_activeJobList.append(this); + + _sent = 0; + _currentChunk = 0; + _duration.start(); + + emit progress(*_item, 0); + + auto job = new MkColJob(_propagator->account(), + chunkUrl(), + this); + + connect(job, SIGNAL(finished(QNetworkReply::NetworkError)), + this, SLOT(slotMkColFinished(QNetworkReply::NetworkError))); + connect(job, SIGNAL(destroyed(QObject*)), this, SLOT(slotJobDestroyed(QObject*))); + job->start(); +} + +void PropagateUploadFileNG::slotMkColFinished(QNetworkReply::NetworkError) +{ + _propagator->_activeJobList.removeOne(this); + auto job = qobject_cast(sender()); + slotJobDestroyed(job); // remove it from the _jobs list + QNetworkReply::NetworkError err = job->reply()->error(); + _item->_httpErrorCode = job->reply()->attribute(QNetworkRequest::HttpStatusCodeAttribute).toInt(); + + if (err != QNetworkReply::NoError || _item->_httpErrorCode != 201) { + SyncFileItem::Status status = classifyError(err, _item->_httpErrorCode, + &_propagator->_anotherSyncNeeded); + QString errorString = errorMessage(job->reply()->errorString(), job->reply()->readAll()); + if (job->reply()->hasRawHeader("OC-ErrorString")) { + errorString = job->reply()->rawHeader("OC-ErrorString"); + } + abortWithError(status, errorString); + return; + } + startNextChunk(); +} + +void PropagateUploadFileNG::startNextChunk() +{ + if (_propagator->_abortRequested.fetchAndAddRelaxed(0)) + return; + + quint64 fileSize = _item->_size; + + quint64 currentChunkSize = qMin(chunkSize(), fileSize - _sent); + + if (currentChunkSize <= 0) { + Q_ASSERT(_jobs.isEmpty()); // There should be no running job anymore + _finished = true; + // Finish with a MOVE + QString destination = _propagator->_remoteDir + _item->_file; + auto headers = PropagateUploadFileCommon::headers(); + auto job = new MoveJob(_propagator->account(), Account::concatUrlPath(chunkUrl(), "/.file"), + destination, headers, this); + _jobs.append(job); + connect(job, SIGNAL(finishedSignal()), this, SLOT(slotMoveJobFinished())); + connect(job, SIGNAL(destroyed(QObject*)), this, SLOT(slotJobDestroyed(QObject*))); + _propagator->_activeJobList.append(this); + job->start(); + return; + } + + auto device = new UploadDevice(&_propagator->_bandwidthManager); + const QString fileName = _propagator->getFilePath(_item->_file); + + if (! device->prepareAndOpen(fileName, _sent, currentChunkSize)) { + qDebug() << "ERR: Could not prepare upload device: " << device->errorString(); + + // If the file is currently locked, we want to retry the sync + // when it becomes available again. + if (FileSystem::isFileLocked(fileName)) { + emit _propagator->seenLockedFile(fileName); + } + // Soft error because this is likely caused by the user modifying his files while syncing + abortWithError( SyncFileItem::SoftError, device->errorString() ); + return; + } + + _sent += currentChunkSize; + QUrl url = chunkUrl(_currentChunk); + + // job takes ownership of device via a QScopedPointer. Job deletes itself when finishing + PUTFileJob* job = new PUTFileJob(_propagator->account(), url, device, {}, _currentChunk); + _jobs.append(job); + connect(job, SIGNAL(finishedSignal()), this, SLOT(slotPutFinished())); + connect(job, SIGNAL(uploadProgress(qint64,qint64)), + this, SLOT(slotUploadProgress(qint64,qint64))); + connect(job, SIGNAL(uploadProgress(qint64,qint64)), + device, SLOT(slotJobUploadProgress(qint64,qint64))); + connect(job, SIGNAL(destroyed(QObject*)), this, SLOT(slotJobDestroyed(QObject*))); + job->start(); + _propagator->_activeJobList.append(this); + _currentChunk++; + + // FIXME! parallel chunk? + +} + +void PropagateUploadFileNG::slotPutFinished() +{ + PUTFileJob *job = qobject_cast(sender()); + Q_ASSERT(job); + slotJobDestroyed(job); // remove it from the _jobs list + + qDebug() << job->reply()->request().url() << "FINISHED WITH STATUS" + << job->reply()->error() + << (job->reply()->error() == QNetworkReply::NoError ? QLatin1String("") : job->reply()->errorString()) + << job->reply()->attribute(QNetworkRequest::HttpStatusCodeAttribute) + << job->reply()->attribute(QNetworkRequest::HttpReasonPhraseAttribute); + + _propagator->_activeJobList.removeOne(this); + + if (_finished) { + // We have sent the finished signal already. We don't need to handle any remaining jobs + return; + } + + QNetworkReply::NetworkError err = job->reply()->error(); + +#if QT_VERSION < QT_VERSION_CHECK(5, 4, 2) + if (err == QNetworkReply::OperationCanceledError && job->reply()->property(owncloudShouldSoftCancelPropertyName).isValid()) { + // Abort the job and try again later. + // This works around a bug in QNAM wich might reuse a non-empty buffer for the next request. + qDebug() << "Forcing job abort on HTTP connection reset with Qt < 5.4.2."; + _propagator->_anotherSyncNeeded = true; + abortWithError(SyncFileItem::SoftError, tr("Forcing job abort on HTTP connection reset with Qt < 5.4.2.")); + return; + } +#endif + + if (err != QNetworkReply::NoError) { + _item->_httpErrorCode = job->reply()->attribute(QNetworkRequest::HttpStatusCodeAttribute).toInt(); + QByteArray replyContent = job->reply()->readAll(); + qDebug() << replyContent; // display the XML error in the debug + QString errorString = errorMessage(job->errorString(), replyContent); + + if (job->reply()->hasRawHeader("OC-ErrorString")) { + errorString = job->reply()->rawHeader("OC-ErrorString"); + } + + // FIXME! can tth peneunking? + if (_item->_httpErrorCode == 412) { + // Precondition Failed: Maybe the bad etag is in the database, we need to clear the + // parent folder etag so we won't read from DB next sync. + _propagator->_journal->avoidReadFromDbOnNextSync(_item->_file); + _propagator->_anotherSyncNeeded = true; + } + + SyncFileItem::Status status = classifyError(err, _item->_httpErrorCode, + &_propagator->_anotherSyncNeeded); + abortWithError(status, errorString); + return; + } + + bool finished = _sent >= _item->_size; + + // Check if the file still exists + const QString fullFilePath(_propagator->getFilePath(_item->_file)); + if( !FileSystem::fileExists(fullFilePath) ) { + if (!finished) { + abortWithError(SyncFileItem::SoftError, tr("The local file was removed during sync.")); + return; + } else { + _propagator->_anotherSyncNeeded = true; + } + } + + // Check whether the file changed since discovery. + if (! FileSystem::verifyFileUnchanged(fullFilePath, _item->_size, _item->_modtime)) { + _propagator->_anotherSyncNeeded = true; + if( !finished ) { + abortWithError(SyncFileItem::SoftError, tr("Local file changed during sync.")); + // FIXME: the legacy code was retrying for a few seconds. + // and also checking that after the last chunk, and removed the file in case of INSTRUCTION_NEW + return; + } + } + + if (!finished) { + // Deletes an existing blacklist entry on successful chunk upload + if (_item->_hasBlacklistEntry) { + _propagator->_journal->wipeErrorBlacklistEntry(_item->_file); + _item->_hasBlacklistEntry = false; + } + + SyncJournalDb::UploadInfo pi; + pi._valid = true; + auto currentChunk = job->_chunk; + foreach (auto *job, _jobs) { + // Take the minimum finished one + if (auto putJob = qobject_cast(job)) { + currentChunk = qMin(currentChunk, putJob->_chunk - 1); + } + } + pi._chunk = currentChunk; // FIXME + pi._transferid = _transferId; + pi._modtime = Utility::qDateTimeFromTime_t(_item->_modtime); + _propagator->_journal->setUploadInfo(_item->_file, pi); + _propagator->_journal->commit("Upload info"); + } + startNextChunk(); +} + +void PropagateUploadFileNG::slotMoveJobFinished() +{ + _propagator->_activeJobList.removeOne(this); + auto job = qobject_cast(sender()); + slotJobDestroyed(job); // remove it from the _jobs list + QNetworkReply::NetworkError err = job->reply()->error(); + _item->_httpErrorCode = job->reply()->attribute(QNetworkRequest::HttpStatusCodeAttribute).toInt(); + + if (err != QNetworkReply::NoError || _item->_httpErrorCode != 201) { + SyncFileItem::Status status = classifyError(err, _item->_httpErrorCode, + &_propagator->_anotherSyncNeeded); + QString errorString = errorMessage(job->errorString(), job->reply()->readAll()); + abortWithError(status, errorString); + return; + } + + QByteArray fid = job->reply()->rawHeader("OC-FileID"); + if(fid.isEmpty()) { + qWarning() << "Server did not return a OC-FileID" << _item->_file; + } else { + // the old file id should only be empty for new files uploaded + if( !_item->_fileId.isEmpty() && _item->_fileId != fid ) { + qDebug() << "WARN: File ID changed!" << _item->_fileId << fid; + } + _item->_fileId = fid; + } + + _item->_etag = getEtagFromReply(job->reply());; + if (_item->_etag.isEmpty()) { + qWarning() << "Server did not return an ETAG" << _item->_file; + } + _item->_responseTimeStamp = job->responseTimestamp(); + + // performance logging + _item->_requestDuration = _stopWatch.stop(); + qDebug() << "*==* duration UPLOAD" << _item->_size + << _stopWatch.durationOfLap(QLatin1String("ContentChecksum")) + << _stopWatch.durationOfLap(QLatin1String("TransmissionChecksum")) + << _item->_requestDuration; + // The job might stay alive for the whole sync, release this tiny bit of memory. + _stopWatch.reset(); + finalize(); +} + +void PropagateUploadFileNG::slotUploadProgress(qint64 sent, qint64 total) +{ + // Completion is signaled with sent=0, total=0; avoid accidentally + // resetting progress due to the sent being zero by ignoring it. + // finishedSignal() is bound to be emitted soon anyway. + // See https://bugreports.qt.io/browse/QTBUG-44782. + if (sent == 0 && total == 0) { + return; + } + emit progress(*_item, _sent + sent - total); +} + +} From fad387b6b85cde6da1a50e2f48451fa32f2bf610 Mon Sep 17 00:00:00 2001 From: Olivier Goffart Date: Tue, 2 Aug 2016 17:14:44 +0200 Subject: [PATCH 03/19] Chunking-Ng: Resume --- src/libsync/networkjobs.cpp | 8 +- src/libsync/networkjobs.h | 2 + src/libsync/propagateupload.h | 12 ++- src/libsync/propagateuploadng.cpp | 122 +++++++++++++++++++++--------- 4 files changed, 102 insertions(+), 42 deletions(-) diff --git a/src/libsync/networkjobs.cpp b/src/libsync/networkjobs.cpp index 5fd3c83a6..3c167334e 100644 --- a/src/libsync/networkjobs.cpp +++ b/src/libsync/networkjobs.cpp @@ -270,6 +270,11 @@ LsColJob::LsColJob(AccountPtr account, const QString &path, QObject *parent) { } +LsColJob::LsColJob(AccountPtr account, const QUrl &url, QObject *parent) + : AbstractNetworkJob(account, QString(), parent), _url(url) +{ +} + void LsColJob::setProperties(QList properties) { _properties = properties; @@ -313,7 +318,8 @@ void LsColJob::start() QBuffer *buf = new QBuffer(this); buf->setData(xml); buf->open(QIODevice::ReadOnly); - QNetworkReply *reply = davRequest("PROPFIND", path(), req, buf); + QNetworkReply *reply = _url.isValid() ? davRequest("PROPFIND", _url, req, buf) + : davRequest("PROPFIND", path(), req, buf); buf->setParent(reply); setReply(reply); setupConnections(reply); diff --git a/src/libsync/networkjobs.h b/src/libsync/networkjobs.h index 94deb3af1..0a19707f4 100644 --- a/src/libsync/networkjobs.h +++ b/src/libsync/networkjobs.h @@ -62,6 +62,7 @@ class OWNCLOUDSYNC_EXPORT LsColJob : public AbstractNetworkJob { Q_OBJECT public: explicit LsColJob(AccountPtr account, const QString &path, QObject *parent = 0); + explicit LsColJob(AccountPtr account, const QUrl &url, QObject *parent = 0); void start() Q_DECL_OVERRIDE; QHash _sizes; @@ -87,6 +88,7 @@ private slots: private: QList _properties; + QUrl _url; // Used instead of path() if the url is specified in the constructor }; /** diff --git a/src/libsync/propagateupload.h b/src/libsync/propagateupload.h index 4a06f2f13..83b1fea65 100644 --- a/src/libsync/propagateupload.h +++ b/src/libsync/propagateupload.h @@ -286,7 +286,8 @@ class PropagateUploadFileNG : public PropagateUploadFileCommon { private: quint64 _sent; /// amount of data that was already sent uint _transferId; /// transfer id (part of the url) - int _currentChunk; + int _currentChunk; /// Id of the next chunk that will be sent + QMap _serverChunks; // Map chunk number with its size from the PROPFIND on resume quint64 chunkSize() const { return _propagator->chunkSize(); } /** @@ -300,10 +301,13 @@ public: PropagateUploadFileCommon(propagator,item) {} void doStartUpload() Q_DECL_OVERRIDE; - -private slots: - void slotMkColFinished(QNetworkReply::NetworkError); +private: + void startNewUpload(); void startNextChunk(); +private slots: + void slotPropfindFinished(); + void slotPropfindFinishedWithError(); + void slotMkColFinished(QNetworkReply::NetworkError); void slotPutFinished(); void slotMoveJobFinished(); void slotUploadProgress(qint64,qint64); diff --git a/src/libsync/propagateuploadng.cpp b/src/libsync/propagateuploadng.cpp index d2f89aba8..95b41fa20 100644 --- a/src/libsync/propagateuploadng.cpp +++ b/src/libsync/propagateuploadng.cpp @@ -50,14 +50,17 @@ QUrl PropagateUploadFileNG::chunkUrl(int chunk) *----> doStartUpload() Check the db: is there an entry? - / \ - no yes - / \ - MKCOL PROPFIND (TODO) - | - slotMkColFinished() : - | : - +-----+---------------------+ + / \ + no yes + / \ + / PROPFIND + startNewUpload() <-+ +----------------------------\ + | | | \ + MKCOL + slotPropfindFinishedWithError() slotPropfindFinished() + | | + slotMkColFinished() | + | | + +-----+-------------------------------------------------------+ | +----> startNextChunk() ---finished? --+ ^ | | @@ -74,27 +77,87 @@ QUrl PropagateUploadFileNG::chunkUrl(int chunk) void PropagateUploadFileNG::doStartUpload() { - _transferId = qrand() ^ _item->_modtime ^ (_item->_size << 16) ^ qHash(_item->_file); - -#if FUTURE - const SyncJournalDb::UploadInfo progressInfo = _propagator->_journal->getUploadInfo(_item->_file); - - if (progressInfo._valid && Utility::qDateTimeToTime_t(progressInfo._modtime) == _item->_modtime ) { - _transferId = progressInfo._transferid; - // TODO: make a PROPFIND call to check what the size on the server is - //_startChunk = progressInfo._chunk; - //qDebug() << Q_FUNC_INFO << _item->_file << ": Resuming from chunk " << _startChunk; - - } -#endif + _duration.start(); _propagator->_activeJobList.append(this); + const SyncJournalDb::UploadInfo progressInfo = _propagator->_journal->getUploadInfo(_item->_file); + if (progressInfo._valid && Utility::qDateTimeToTime_t(progressInfo._modtime) == _item->_modtime ) { + _transferId = progressInfo._transferid; + auto url = chunkUrl(); + auto job = new LsColJob(_propagator->account(), url, this); + _jobs.append(job); + job->setProperties(QList() << "resourcetype" << "getcontentlength"); + connect(job, SIGNAL(finishedWithoutError()), this, SLOT(slotPropfindFinished())); + connect(job, SIGNAL(finishedWithError(QNetworkReply*)), + this, SLOT(slotPropfindFinishedWithError())); + connect(job, SIGNAL(destroyed(QObject*)), this, SLOT(slotJobDestroyed(QObject*))); + //TODO: port to Qt4 + connect(job, &LsColJob::directoryListingIterated, + [this, url](const QString &name, const QMap &properties) mutable { + if (name == url.path()) { + return; // skip the info about the path itself + } + bool ok = false; + auto chunkId = name.midRef(name.lastIndexOf('/')+1).toUInt(&ok); + if (ok) { + this->_serverChunks[chunkId] = properties["getcontentlength"].toULongLong(); + } + }); + job->start(); + return; + } + + startNewUpload(); +} + +void PropagateUploadFileNG::slotPropfindFinished() +{ + auto job = qobject_cast(sender()); + slotJobDestroyed(job); // remove it from the _jobs list + _propagator->_activeJobList.removeOne(this); + + _currentChunk = 0; + _sent = 0; + while (_serverChunks.contains(_currentChunk)) { + _sent += _serverChunks[_currentChunk]; + ++_currentChunk; + } + startNextChunk(); +} + +void PropagateUploadFileNG::slotPropfindFinishedWithError() +{ + auto job = qobject_cast(sender()); + slotJobDestroyed(job); // remove it from the _jobs list + QNetworkReply::NetworkError err = job->reply()->error(); + auto httpErrorCode = job->reply()->attribute(QNetworkRequest::HttpStatusCodeAttribute).toInt(); + auto status = classifyError(err, httpErrorCode, &_propagator->_anotherSyncNeeded); + if (status == SyncFileItem::FatalError) { + _propagator->_activeJobList.removeOne(this); + QString errorString = errorMessage(job->reply()->errorString(), job->reply()->readAll()); + abortWithError(status, errorString); + return; + } + startNewUpload(); +} + + +void PropagateUploadFileNG::startNewUpload() +{ + Q_ASSERT(_propagator->_activeJobList.count(this) == 1); + _transferId = qrand() ^ _item->_modtime ^ (_item->_size << 16) ^ qHash(_item->_file); _sent = 0; _currentChunk = 0; - _duration.start(); emit progress(*_item, 0); + SyncJournalDb::UploadInfo pi; + pi._valid = true; + pi._transferid = _transferId; + pi._modtime = Utility::qDateTimeFromTime_t(_item->_modtime); + _propagator->_journal->setUploadInfo(_item->_file, pi); + _propagator->_journal->commit("Upload info"); + auto job = new MkColJob(_propagator->account(), chunkUrl(), this); @@ -273,21 +336,6 @@ void PropagateUploadFileNG::slotPutFinished() _propagator->_journal->wipeErrorBlacklistEntry(_item->_file); _item->_hasBlacklistEntry = false; } - - SyncJournalDb::UploadInfo pi; - pi._valid = true; - auto currentChunk = job->_chunk; - foreach (auto *job, _jobs) { - // Take the minimum finished one - if (auto putJob = qobject_cast(job)) { - currentChunk = qMin(currentChunk, putJob->_chunk - 1); - } - } - pi._chunk = currentChunk; // FIXME - pi._transferid = _transferId; - pi._modtime = Utility::qDateTimeFromTime_t(_item->_modtime); - _propagator->_journal->setUploadInfo(_item->_file, pi); - _propagator->_journal->commit("Upload info"); } startNextChunk(); } From 79abb8b4e3a838b9cd97f340aee15272f7a78114 Mon Sep 17 00:00:00 2001 From: Olivier Goffart Date: Wed, 3 Aug 2016 17:15:23 +0200 Subject: [PATCH 04/19] ChunkingNg: enable depending on an environment variable --- src/libsync/owncloudpropagator.cpp | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/libsync/owncloudpropagator.cpp b/src/libsync/owncloudpropagator.cpp index 39bced518..5969a2ee4 100644 --- a/src/libsync/owncloudpropagator.cpp +++ b/src/libsync/owncloudpropagator.cpp @@ -270,7 +270,13 @@ PropagateItemJob* OwncloudPropagator::createJob(const SyncFileItemPtr &item) { job->setDeleteExistingFolder(deleteExisting); return job; } else { - auto job = new PropagateUploadFileV1(this, item); // FIXME, depending on server version, use the other chunking algorithm + static const bool isNg = !qgetenv("OWNCLOUD_CHUNK_NG").isEmpty(); // FIXME! use server version + PropagateUploadFileCommon *job = 0; + if (isNg && item->_size > chunkSize()) { + job = new PropagateUploadFileNG(this, item); + } else { + job = new PropagateUploadFileV1(this, item); + } job->setDeleteExisting(deleteExisting); return job; } From 818b5854ce614f6ae1c43c99adb443ae5c61e147 Mon Sep 17 00:00:00 2001 From: Olivier Goffart Date: Wed, 3 Aug 2016 17:43:03 +0200 Subject: [PATCH 05/19] Chunking-NG: Qt4 compile --- src/libsync/owncloudpropagator.h | 3 ++- src/libsync/propagateupload.h | 1 + src/libsync/propagateuploadng.cpp | 30 ++++++++++++++++-------------- src/libsync/propagateuploadv1.cpp | 3 +-- 4 files changed, 20 insertions(+), 17 deletions(-) diff --git a/src/libsync/owncloudpropagator.h b/src/libsync/owncloudpropagator.h index b571e64a6..1b4246c3b 100644 --- a/src/libsync/owncloudpropagator.h +++ b/src/libsync/owncloudpropagator.h @@ -379,10 +379,11 @@ private: #if QT_VERSION < QT_VERSION_CHECK(5, 0, 0) // access to signals which are protected in Qt4 friend class PropagateDownloadFile; - friend class PropagateUploadFile; friend class PropagateLocalMkdir; friend class PropagateLocalRename; friend class PropagateRemoteMove; + friend class PropagateUploadFileV1; + friend class PropagateUploadFileNG; #endif }; diff --git a/src/libsync/propagateupload.h b/src/libsync/propagateupload.h index 83b1fea65..1cde85196 100644 --- a/src/libsync/propagateupload.h +++ b/src/libsync/propagateupload.h @@ -307,6 +307,7 @@ private: private slots: void slotPropfindFinished(); void slotPropfindFinishedWithError(); + void slotPropfindIterate(const QString &name, const QMap &properties); void slotMkColFinished(QNetworkReply::NetworkError); void slotPutFinished(); void slotMoveJobFinished(); diff --git a/src/libsync/propagateuploadng.cpp b/src/libsync/propagateuploadng.cpp index 95b41fa20..889156135 100644 --- a/src/libsync/propagateuploadng.cpp +++ b/src/libsync/propagateuploadng.cpp @@ -73,8 +73,6 @@ QUrl PropagateUploadFileNG::chunkUrl(int chunk) */ - - void PropagateUploadFileNG::doStartUpload() { _duration.start(); @@ -92,17 +90,8 @@ void PropagateUploadFileNG::doStartUpload() this, SLOT(slotPropfindFinishedWithError())); connect(job, SIGNAL(destroyed(QObject*)), this, SLOT(slotJobDestroyed(QObject*))); //TODO: port to Qt4 - connect(job, &LsColJob::directoryListingIterated, - [this, url](const QString &name, const QMap &properties) mutable { - if (name == url.path()) { - return; // skip the info about the path itself - } - bool ok = false; - auto chunkId = name.midRef(name.lastIndexOf('/')+1).toUInt(&ok); - if (ok) { - this->_serverChunks[chunkId] = properties["getcontentlength"].toULongLong(); - } - }); + connect(job, SIGNAL(directoryListingIterated(QString,QMap)), + this, SLOT(slotPropfindIterate(QString,QMap))); job->start(); return; } @@ -110,6 +99,18 @@ void PropagateUploadFileNG::doStartUpload() startNewUpload(); } +void PropagateUploadFileNG::slotPropfindIterate(const QString &name, const QMap &properties) +{ + if (name == chunkUrl().path()) { + return; // skip the info about the path itself + } + bool ok = false; + auto chunkId = name.mid(name.lastIndexOf('/')+1).toUInt(&ok); + if (ok) { + this->_serverChunks[chunkId] = properties["getcontentlength"].toULongLong(); + } +} + void PropagateUploadFileNG::slotPropfindFinished() { auto job = qobject_cast(sender()); @@ -122,6 +123,7 @@ void PropagateUploadFileNG::slotPropfindFinished() _sent += _serverChunks[_currentChunk]; ++_currentChunk; } + qDebug() << "Resuming "<< _item->_file << " from chunk " << _currentChunk << "; sent ="<< _sent; startNextChunk(); } @@ -272,7 +274,7 @@ void PropagateUploadFileNG::slotPutFinished() QNetworkReply::NetworkError err = job->reply()->error(); #if QT_VERSION < QT_VERSION_CHECK(5, 4, 2) - if (err == QNetworkReply::OperationCanceledError && job->reply()->property(owncloudShouldSoftCancelPropertyName).isValid()) { + if (err == QNetworkReply::OperationCanceledError && job->reply()->property("owncloud-should-soft-cancel").isValid()) { // Abort the job and try again later. // This works around a bug in QNAM wich might reuse a non-empty buffer for the next request. qDebug() << "Forcing job abort on HTTP connection reset with Qt < 5.4.2."; diff --git a/src/libsync/propagateuploadv1.cpp b/src/libsync/propagateuploadv1.cpp index c126164b3..dd075fa77 100644 --- a/src/libsync/propagateuploadv1.cpp +++ b/src/libsync/propagateuploadv1.cpp @@ -183,8 +183,7 @@ void PropagateUploadFileV1::slotPutFinished() QNetworkReply::NetworkError err = job->reply()->error(); #if QT_VERSION < QT_VERSION_CHECK(5, 4, 2) - if (err == QNetworkReply::OperationCanceledError && job->reply()->property(owncloudShouldSoftCancelPropertyName).isValid()) { - // Abort the job and try again later. + if (err == QNetworkReply::OperationCanceledError && job->reply()->property("owncloud-should-soft-cancel").isValid()) { // Abort the job and try again later. // This works around a bug in QNAM wich might reuse a non-empty buffer for the next request. qDebug() << "Forcing job abort on HTTP connection reset with Qt < 5.4.2."; _propagator->_anotherSyncNeeded = true; From c41f6ed76bfaae61fbf38ea5309f6ae6ab70bf5e Mon Sep 17 00:00:00 2001 From: Olivier Goffart Date: Wed, 31 Aug 2016 12:12:34 +0200 Subject: [PATCH 06/19] Chunking-NG: use new dav URL for the move --- src/libsync/propagateuploadng.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/libsync/propagateuploadng.cpp b/src/libsync/propagateuploadng.cpp index 889156135..7cd1b8b3f 100644 --- a/src/libsync/propagateuploadng.cpp +++ b/src/libsync/propagateuploadng.cpp @@ -35,7 +35,6 @@ namespace OCC { QUrl PropagateUploadFileNG::chunkUrl(int chunk) { - // FIXME! we should not use the user from the credential, we should have it in the account QString path = QLatin1String("remote.php/dav/uploads/") + _propagator->account()->user() + QLatin1Char('/') + QString::number(_transferId); @@ -204,7 +203,8 @@ void PropagateUploadFileNG::startNextChunk() Q_ASSERT(_jobs.isEmpty()); // There should be no running job anymore _finished = true; // Finish with a MOVE - QString destination = _propagator->_remoteDir + _item->_file; + QString destination = QLatin1String("remote.php/dav/files/") + + _propagator->account()->user() + QLatin1Char('/') + QString::number(_transferId); auto headers = PropagateUploadFileCommon::headers(); auto job = new MoveJob(_propagator->account(), Account::concatUrlPath(chunkUrl(), "/.file"), destination, headers, this); From 7c75a39bc182f29e1d7c3d5d9ccd5178c71fb9eb Mon Sep 17 00:00:00 2001 From: Olivier Goffart Date: Sat, 10 Sep 2016 12:30:14 +0200 Subject: [PATCH 07/19] Chunking-NG: Some fixup after feedback from the pull request #5102 --- src/libsync/propagateupload.h | 7 +++++-- src/libsync/propagateuploadng.cpp | 10 ++++++++-- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/src/libsync/propagateupload.h b/src/libsync/propagateupload.h index 1cde85196..5d0d0cfa0 100644 --- a/src/libsync/propagateupload.h +++ b/src/libsync/propagateupload.h @@ -284,10 +284,13 @@ private slots: class PropagateUploadFileNG : public PropagateUploadFileCommon { Q_OBJECT private: - quint64 _sent; /// amount of data that was already sent + quint64 _sent; /// amount of data (bytes) that was already sent uint _transferId; /// transfer id (part of the url) int _currentChunk; /// Id of the next chunk that will be sent - QMap _serverChunks; // Map chunk number with its size from the PROPFIND on resume + + // Map chunk number with its size from the PROPFIND on resume. + // (Only used from slotPropfindIterate/slotPropfindFinished because the LsColJob use signals to report data.) + QMap _serverChunks; quint64 chunkSize() const { return _propagator->chunkSize(); } /** diff --git a/src/libsync/propagateuploadng.cpp b/src/libsync/propagateuploadng.cpp index 7cd1b8b3f..bafec2028 100644 --- a/src/libsync/propagateuploadng.cpp +++ b/src/libsync/propagateuploadng.cpp @@ -88,7 +88,6 @@ void PropagateUploadFileNG::doStartUpload() connect(job, SIGNAL(finishedWithError(QNetworkReply*)), this, SLOT(slotPropfindFinishedWithError())); connect(job, SIGNAL(destroyed(QObject*)), this, SLOT(slotJobDestroyed(QObject*))); - //TODO: port to Qt4 connect(job, SIGNAL(directoryListingIterated(QString,QMap)), this, SLOT(slotPropfindIterate(QString,QMap))); job->start(); @@ -122,6 +121,9 @@ void PropagateUploadFileNG::slotPropfindFinished() _sent += _serverChunks[_currentChunk]; ++_currentChunk; } + // FIXME: we should make sure that if there is a "hole" and then a few more chunks, on the server + // we should remove the later chunks. Otherwise when we do dynamic chunk sizing, we may end up + // with corruptions if there are too many chunks, or if we abort and there are still stale chunks. qDebug() << "Resuming "<< _item->_file << " from chunk " << _currentChunk << "; sent ="<< _sent; startNextChunk(); } @@ -294,7 +296,7 @@ void PropagateUploadFileNG::slotPutFinished() errorString = job->reply()->rawHeader("OC-ErrorString"); } - // FIXME! can tth peneunking? + // FIXME! can this happen for the chunks? if (_item->_httpErrorCode == 412) { // Precondition Failed: Maybe the bad etag is in the database, we need to clear the // parent folder etag so we won't read from DB next sync. @@ -361,6 +363,8 @@ void PropagateUploadFileNG::slotMoveJobFinished() QByteArray fid = job->reply()->rawHeader("OC-FileID"); if(fid.isEmpty()) { qWarning() << "Server did not return a OC-FileID" << _item->_file; + abortWithError(SyncFileItem::NormalError, tr("Missing File ID from server")); + return; } else { // the old file id should only be empty for new files uploaded if( !_item->_fileId.isEmpty() && _item->_fileId != fid ) { @@ -372,6 +376,8 @@ void PropagateUploadFileNG::slotMoveJobFinished() _item->_etag = getEtagFromReply(job->reply());; if (_item->_etag.isEmpty()) { qWarning() << "Server did not return an ETAG" << _item->_file; + abortWithError(SyncFileItem::NormalError, tr("Missing ETag from server")); + return; } _item->_responseTimeStamp = job->responseTimestamp(); From 4c79ce2ae625f4810b597366aabafe4b8f7f4355 Mon Sep 17 00:00:00 2001 From: Olivier Goffart Date: Fri, 16 Sep 2016 15:49:43 +0200 Subject: [PATCH 08/19] ConnectionValidator: fetch the account name. This is needed for the new webdav path used by the new chunking. The user might not be the same as the one used to connect --- src/libsync/account.cpp | 7 ++++++- src/libsync/account.h | 2 ++ src/libsync/connectionvalidator.cpp | 20 ++++++++++++++++++-- src/libsync/connectionvalidator.h | 14 ++++++++++++-- 4 files changed, 38 insertions(+), 5 deletions(-) diff --git a/src/libsync/account.cpp b/src/libsync/account.cpp index 07ab0970f..8a5c11b90 100644 --- a/src/libsync/account.cpp +++ b/src/libsync/account.cpp @@ -78,7 +78,12 @@ AccountPtr Account::sharedFromThis() QString Account::user() const { - return _credentials->user(); + return _user.isEmpty() ? _credentials->user() : _user; +} + +void Account::setUser(const QString &user) +{ + _user = user; } QString Account::displayName() const diff --git a/src/libsync/account.h b/src/libsync/account.h index 797cd1422..62098b218 100644 --- a/src/libsync/account.h +++ b/src/libsync/account.h @@ -77,6 +77,7 @@ public: /// The user that can be used in dav url QString user() const; + void setUser(const QString &user); /// The name of the account as shown in the toolbar QString displayName() const; @@ -209,6 +210,7 @@ private: QWeakPointer _sharedThis; QString _id; + QString _user; QMap _settingsMap; QUrl _url; QList _approvedCerts; diff --git a/src/libsync/connectionvalidator.cpp b/src/libsync/connectionvalidator.cpp index 2a955a521..814f4a303 100644 --- a/src/libsync/connectionvalidator.cpp +++ b/src/libsync/connectionvalidator.cpp @@ -229,10 +229,26 @@ void ConnectionValidator::slotCapabilitiesRecieved(const QVariantMap &json) auto caps = json.value("ocs").toMap().value("data").toMap().value("capabilities"); qDebug() << "Server capabilities" << caps; _account->setCapabilities(caps.toMap()); - reportResult(Connected); - return; + fetchUser(); } +void ConnectionValidator::fetchUser() +{ + + JsonApiJob *job = new JsonApiJob(_account, QLatin1String("ocs/v1.php/cloud/user"), this); + job->setTimeout(timeoutToUseMsec); + QObject::connect(job, SIGNAL(jsonReceived(QVariantMap, int)), this, SLOT(slotUserFetched(QVariantMap))); + job->start(); +} + +void ConnectionValidator::slotUserFetched(const QVariantMap &json) +{ + QString user = json.value("ocs").toMap().value("data").toMap().value("id").toString(); + if (!user.isEmpty()) { + _account->setUser(user); + } + reportResult(Connected); +} void ConnectionValidator::reportResult(Status status) { diff --git a/src/libsync/connectionvalidator.h b/src/libsync/connectionvalidator.h index e1264628a..b97d665ec 100644 --- a/src/libsync/connectionvalidator.h +++ b/src/libsync/connectionvalidator.h @@ -27,7 +27,7 @@ namespace OCC { * This is a job-like class to check that the server is up and that we are connected. * There are two entry points: checkServerAndAuth and checkAuthentication * checkAuthentication is the quick version that only does the propfind - * while checkServerAndAuth is doing the 3 calls. + * while checkServerAndAuth is doing the 4 calls. * * We cannot use the capabilites call to test the login and the password because of * https://github.com/owncloud/core/issues/12930 @@ -60,7 +60,15 @@ namespace OCC { +-> checkServerCapabilities (cloud/capabilities) JsonApiJob | - +-> slotCapabilitiesRecieved --> X + +-> slotCapabilitiesRecieved -+ + | + +-----------------------------------+ + | + +-> fetchUser + PropfindJob + | + +-> slotUserFetched --> X + \endcode */ class OWNCLOUDSYNC_EXPORT ConnectionValidator : public QObject @@ -109,10 +117,12 @@ protected slots: void slotAuthSuccess(); void slotCapabilitiesRecieved(const QVariantMap&); + void slotUserFetched(const QVariantMap &); private: void reportResult(Status status); void checkServerCapabilities(); + void fetchUser(); QStringList _errors; AccountPtr _account; From 28018e8590e6a6ca3ee6fe809c00ecc36505f02e Mon Sep 17 00:00:00 2001 From: Olivier Goffart Date: Fri, 16 Sep 2016 16:14:53 +0200 Subject: [PATCH 09/19] Chunking-NG: Fix destination URL --- src/libsync/propagateuploadng.cpp | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/libsync/propagateuploadng.cpp b/src/libsync/propagateuploadng.cpp index bafec2028..77a0254b5 100644 --- a/src/libsync/propagateuploadng.cpp +++ b/src/libsync/propagateuploadng.cpp @@ -205,8 +205,10 @@ void PropagateUploadFileNG::startNextChunk() Q_ASSERT(_jobs.isEmpty()); // There should be no running job anymore _finished = true; // Finish with a MOVE - QString destination = QLatin1String("remote.php/dav/files/") - + _propagator->account()->user() + QLatin1Char('/') + QString::number(_transferId); + // QString destination = _propagator->_remoteDir + _item->_file; // FIXME: _remoteDir currently is still using the old webdav path + QString destination = _propagator->account()->url().path() + + QLatin1String("/remote.php/dav/files/") + _propagator->account()->user() + + _propagator->_remoteFolder + _item->_file; auto headers = PropagateUploadFileCommon::headers(); auto job = new MoveJob(_propagator->account(), Account::concatUrlPath(chunkUrl(), "/.file"), destination, headers, this); From c222793525195650c00b3ded774a8cf81317e2db Mon Sep 17 00:00:00 2001 From: Olivier Goffart Date: Fri, 16 Sep 2016 16:15:09 +0200 Subject: [PATCH 10/19] Chunking-NG: use OC-If-Destination-Match instread of If-Match For the MOVE command, because If-Match in Webdav relates to the source, not the destination --- src/libsync/propagateuploadng.cpp | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/libsync/propagateuploadng.cpp b/src/libsync/propagateuploadng.cpp index 77a0254b5..09e89382f 100644 --- a/src/libsync/propagateuploadng.cpp +++ b/src/libsync/propagateuploadng.cpp @@ -210,6 +210,13 @@ void PropagateUploadFileNG::startNextChunk() + QLatin1String("/remote.php/dav/files/") + _propagator->account()->user() + _propagator->_remoteFolder + _item->_file; auto headers = PropagateUploadFileCommon::headers(); + + // Rename the If-Match header to "OC-If-Destination-Match" + // "If-Match applies to the source, but we are interested in comparing the etag of the destination + auto ifMatch = headers.take("If-Match"); + if (!ifMatch.isEmpty()) { + headers["OC-If-Destination-Match"] = ifMatch; + } auto job = new MoveJob(_propagator->account(), Account::concatUrlPath(chunkUrl(), "/.file"), destination, headers, this); _jobs.append(job); From 3c24d5a14892bc0452cd4a0d922701836b3a1f7b Mon Sep 17 00:00:00 2001 From: Olivier Goffart Date: Fri, 16 Sep 2016 16:20:40 +0200 Subject: [PATCH 11/19] Chunking-NG: The MOVE will return the code 204 if the file was already there --- src/libsync/propagateuploadng.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/libsync/propagateuploadng.cpp b/src/libsync/propagateuploadng.cpp index 09e89382f..6c71506f9 100644 --- a/src/libsync/propagateuploadng.cpp +++ b/src/libsync/propagateuploadng.cpp @@ -361,7 +361,8 @@ void PropagateUploadFileNG::slotMoveJobFinished() QNetworkReply::NetworkError err = job->reply()->error(); _item->_httpErrorCode = job->reply()->attribute(QNetworkRequest::HttpStatusCodeAttribute).toInt(); - if (err != QNetworkReply::NoError || _item->_httpErrorCode != 201) { + if (err != QNetworkReply::NoError || (_item->_httpErrorCode != 201 + && _item->_httpErrorCode != 204)) { SyncFileItem::Status status = classifyError(err, _item->_httpErrorCode, &_propagator->_anotherSyncNeeded); QString errorString = errorMessage(job->errorString(), job->reply()->readAll()); From da26e597709101834a30c44b410e866cf6642330 Mon Sep 17 00:00:00 2001 From: Olivier Goffart Date: Fri, 16 Sep 2016 16:36:46 +0200 Subject: [PATCH 12/19] Chunking-NG: add some headers when uploading chunks These are not understood by owncloud yet, but were requested for CernBox OC-Total-Length in the MKCOL: The full lenght of the file OC-Chunk-Offset in the PUT: The offset within the file in which this chunk belongs OC-Checksum in the MOVE: The transission checksum --- src/libsync/networkjobs.cpp | 8 ++++++-- src/libsync/networkjobs.h | 4 +++- src/libsync/propagateuploadng.cpp | 17 ++++++++++++----- 3 files changed, 21 insertions(+), 8 deletions(-) diff --git a/src/libsync/networkjobs.cpp b/src/libsync/networkjobs.cpp index 3c167334e..4e46779c8 100644 --- a/src/libsync/networkjobs.cpp +++ b/src/libsync/networkjobs.cpp @@ -106,8 +106,9 @@ MkColJob::MkColJob(AccountPtr account, const QString &path, QObject *parent) { } -MkColJob::MkColJob(AccountPtr account, const QUrl &url, QObject *parent) - : AbstractNetworkJob(account, QString(), parent), _url(url) +MkColJob::MkColJob(AccountPtr account, const QUrl &url, + const QMap &extraHeaders, QObject *parent) + : AbstractNetworkJob(account, QString(), parent), _url(url), _extraHeaders(extraHeaders) { } @@ -116,6 +117,9 @@ void MkColJob::start() // add 'Content-Length: 0' header (see https://github.com/owncloud/client/issues/3256) QNetworkRequest req; req.setRawHeader("Content-Length", "0"); + for(auto it = _extraHeaders.constBegin(); it != _extraHeaders.constEnd(); ++it) { + req.setRawHeader(it.key(), it.value()); + } // assumes ownership QNetworkReply *reply = _url.isValid() ? davRequest("MKCOL", _url, req) diff --git a/src/libsync/networkjobs.h b/src/libsync/networkjobs.h index 0a19707f4..f7a37d950 100644 --- a/src/libsync/networkjobs.h +++ b/src/libsync/networkjobs.h @@ -173,9 +173,11 @@ private: class OWNCLOUDSYNC_EXPORT MkColJob : public AbstractNetworkJob { Q_OBJECT QUrl _url; // Only used if the constructor taking a url is taken. + QMap _extraHeaders; public: explicit MkColJob(AccountPtr account, const QString &path, QObject *parent = 0); - explicit MkColJob(AccountPtr account, const QUrl &url, QObject *parent = 0); + explicit MkColJob(AccountPtr account, const QUrl &url, + const QMap &extraHeaders, QObject *parent = 0); void start() Q_DECL_OVERRIDE; signals: diff --git a/src/libsync/propagateuploadng.cpp b/src/libsync/propagateuploadng.cpp index 6c71506f9..5e43be633 100644 --- a/src/libsync/propagateuploadng.cpp +++ b/src/libsync/propagateuploadng.cpp @@ -160,10 +160,9 @@ void PropagateUploadFileNG::startNewUpload() pi._modtime = Utility::qDateTimeFromTime_t(_item->_modtime); _propagator->_journal->setUploadInfo(_item->_file, pi); _propagator->_journal->commit("Upload info"); - - auto job = new MkColJob(_propagator->account(), - chunkUrl(), - this); + QMap headers; + headers["OC-Total-Length"] = QByteArray::number(_item->_size); + auto job = new MkColJob(_propagator->account(), chunkUrl(), headers, this); connect(job, SIGNAL(finished(QNetworkReply::NetworkError)), this, SLOT(slotMkColFinished(QNetworkReply::NetworkError))); @@ -217,6 +216,11 @@ void PropagateUploadFileNG::startNextChunk() if (!ifMatch.isEmpty()) { headers["OC-If-Destination-Match"] = ifMatch; } + if (!_transmissionChecksumType.isEmpty()) { + headers[checkSumHeaderC] = makeChecksumHeader( + _transmissionChecksumType, _transmissionChecksum); + } + auto job = new MoveJob(_propagator->account(), Account::concatUrlPath(chunkUrl(), "/.file"), destination, headers, this); _jobs.append(job); @@ -243,11 +247,14 @@ void PropagateUploadFileNG::startNextChunk() return; } + QMap headers; + headers["OC-Chunk-Offset"] = QByteArray::number(_sent); + _sent += currentChunkSize; QUrl url = chunkUrl(_currentChunk); // job takes ownership of device via a QScopedPointer. Job deletes itself when finishing - PUTFileJob* job = new PUTFileJob(_propagator->account(), url, device, {}, _currentChunk); + PUTFileJob* job = new PUTFileJob(_propagator->account(), url, device, headers, _currentChunk); _jobs.append(job); connect(job, SIGNAL(finishedSignal()), this, SLOT(slotPutFinished())); connect(job, SIGNAL(uploadProgress(qint64,qint64)), From 273590fdfcbc900b3a788ad81faa9e2f033b2d62 Mon Sep 17 00:00:00 2001 From: Olivier Goffart Date: Thu, 20 Oct 2016 11:16:06 +0200 Subject: [PATCH 13/19] ChunkingNG: Use the 'If' header As discussed in https://github.com/owncloud/core/pull/26368 --- src/libsync/propagateuploadng.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/libsync/propagateuploadng.cpp b/src/libsync/propagateuploadng.cpp index 5e43be633..7b632233d 100644 --- a/src/libsync/propagateuploadng.cpp +++ b/src/libsync/propagateuploadng.cpp @@ -210,11 +210,10 @@ void PropagateUploadFileNG::startNextChunk() + _propagator->_remoteFolder + _item->_file; auto headers = PropagateUploadFileCommon::headers(); - // Rename the If-Match header to "OC-If-Destination-Match" // "If-Match applies to the source, but we are interested in comparing the etag of the destination auto ifMatch = headers.take("If-Match"); if (!ifMatch.isEmpty()) { - headers["OC-If-Destination-Match"] = ifMatch; + headers["If"] = "<" + destination.toUtf8() + "> ([" + ifMatch + "])"; } if (!_transmissionChecksumType.isEmpty()) { headers[checkSumHeaderC] = makeChecksumHeader( From e33b89c222b164ec9b1146476fc018ab34194ecc Mon Sep 17 00:00:00 2001 From: Olivier Goffart Date: Fri, 21 Oct 2016 16:42:27 +0200 Subject: [PATCH 14/19] Chunking-NG: Enable if the server supports it --- src/libsync/capabilities.cpp | 6 ++++++ src/libsync/capabilities.h | 1 + src/libsync/owncloudpropagator.cpp | 5 +++-- 3 files changed, 10 insertions(+), 2 deletions(-) diff --git a/src/libsync/capabilities.cpp b/src/libsync/capabilities.cpp index ca72a70bb..6bf352f9a 100644 --- a/src/libsync/capabilities.cpp +++ b/src/libsync/capabilities.cpp @@ -107,4 +107,10 @@ QByteArray Capabilities::uploadChecksumType() const return QByteArray(); } +bool Capabilities::chunkingNg() const +{ + return _capabilities["dav"].toMap()["chunking"].toByteArray() >= "1.0"; +} + + } diff --git a/src/libsync/capabilities.h b/src/libsync/capabilities.h index 04eef6e39..5f1936aba 100644 --- a/src/libsync/capabilities.h +++ b/src/libsync/capabilities.h @@ -39,6 +39,7 @@ public: bool sharePublicLinkEnforceExpireDate() const; int sharePublicLinkExpireDateDays() const; bool shareResharing() const; + bool chunkingNg() const; /// returns true if the capabilities report notifications bool notificationsAvailable() const; diff --git a/src/libsync/owncloudpropagator.cpp b/src/libsync/owncloudpropagator.cpp index eace8e35d..1302f88a7 100644 --- a/src/libsync/owncloudpropagator.cpp +++ b/src/libsync/owncloudpropagator.cpp @@ -270,9 +270,10 @@ PropagateItemJob* OwncloudPropagator::createJob(const SyncFileItemPtr &item) { job->setDeleteExistingFolder(deleteExisting); return job; } else { - static const bool isNg = !qgetenv("OWNCLOUD_CHUNK_NG").isEmpty(); // FIXME! use server version PropagateUploadFileCommon *job = 0; - if (isNg && item->_size > chunkSize()) { + static const auto chunkng = qgetenv("OWNCLOUD_CHUNKING_NG"); + if (item->_size > chunkSize() + && (account()->capabilities().chunkingNg() || chunkng == "1") && chunkng != "0") { job = new PropagateUploadFileNG(this, item); } else { job = new PropagateUploadFileV1(this, item); From 456d82715ef330d5763a8ff5c51e7b3aa1483e96 Mon Sep 17 00:00:00 2001 From: Olivier Goffart Date: Mon, 31 Oct 2016 11:29:33 +0100 Subject: [PATCH 15/19] Fix compile after merge --- src/libsync/propagateuploadng.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/libsync/propagateuploadng.cpp b/src/libsync/propagateuploadng.cpp index 7b632233d..0498f5b13 100644 --- a/src/libsync/propagateuploadng.cpp +++ b/src/libsync/propagateuploadng.cpp @@ -41,7 +41,7 @@ QUrl PropagateUploadFileNG::chunkUrl(int chunk) if (chunk >= 0) { path += QLatin1Char('/') + QString::number(chunk); } - return Account::concatUrlPath(_propagator->account()->url(), path); + return Utility::concatUrlPath(_propagator->account()->url(), path); } /* @@ -220,7 +220,7 @@ void PropagateUploadFileNG::startNextChunk() _transmissionChecksumType, _transmissionChecksum); } - auto job = new MoveJob(_propagator->account(), Account::concatUrlPath(chunkUrl(), "/.file"), + auto job = new MoveJob(_propagator->account(), Utility::concatUrlPath(chunkUrl(), "/.file"), destination, headers, this); _jobs.append(job); connect(job, SIGNAL(finishedSignal()), this, SLOT(slotMoveJobFinished())); From c8014a0afdc0bcfa780165b681fc5da2ca573f74 Mon Sep 17 00:00:00 2001 From: Olivier Goffart Date: Mon, 31 Oct 2016 15:16:53 +0100 Subject: [PATCH 16/19] ChunkingNG: Add Test --- src/libsync/propagateuploadng.cpp | 7 +- test/CMakeLists.txt | 1 + test/syncenginetestutils.h | 150 ++++++++++++++++++++++++------ test/testchunkingng.cpp | 39 ++++++++ 4 files changed, 168 insertions(+), 29 deletions(-) create mode 100644 test/testchunkingng.cpp diff --git a/src/libsync/propagateuploadng.cpp b/src/libsync/propagateuploadng.cpp index 0498f5b13..abfd8257c 100644 --- a/src/libsync/propagateuploadng.cpp +++ b/src/libsync/propagateuploadng.cpp @@ -367,14 +367,17 @@ void PropagateUploadFileNG::slotMoveJobFinished() QNetworkReply::NetworkError err = job->reply()->error(); _item->_httpErrorCode = job->reply()->attribute(QNetworkRequest::HttpStatusCodeAttribute).toInt(); - if (err != QNetworkReply::NoError || (_item->_httpErrorCode != 201 - && _item->_httpErrorCode != 204)) { + if (err != QNetworkReply::NoError) { SyncFileItem::Status status = classifyError(err, _item->_httpErrorCode, &_propagator->_anotherSyncNeeded); QString errorString = errorMessage(job->errorString(), job->reply()->readAll()); abortWithError(status, errorString); return; } + if (_item->_httpErrorCode != 201 && _item->_httpErrorCode != 204) { + abortWithError(SyncFileItem::NormalError, tr("Unexpected return code from server (%1)").arg(_item->_httpErrorCode)); + return; + } QByteArray fid = job->reply()->rawHeader("OC-FileID"); if(fid.isEmpty()) { diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 2b2e4591d..fd817c734 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -47,6 +47,7 @@ owncloud_add_test(ExcludedFiles "") if(HAVE_QT5 AND NOT BUILD_WITH_QT4) owncloud_add_test(SyncEngine "syncenginetestutils.h") owncloud_add_test(SyncFileStatusTracker "syncenginetestutils.h") + owncloud_add_test(ChunkingNg "syncenginetestutils.h") endif(HAVE_QT5 AND NOT BUILD_WITH_QT4) SET(FolderMan_SRC ../src/gui/folderman.cpp) diff --git a/test/syncenginetestutils.h b/test/syncenginetestutils.h index 9f38fc680..51d7ce421 100644 --- a/test/syncenginetestutils.h +++ b/test/syncenginetestutils.h @@ -18,6 +18,20 @@ #include static const QUrl sRootUrl("owncloud://somehost/owncloud/remote.php/webdav/"); +static const QUrl sRootUrl2("owncloud://somehost/owncloud/remote.php/dav/files/admin/"); +static const QUrl sUploadUrl("owncloud://somehost/owncloud/remote.php/dav/uploads/admin/"); + +inline QString getFilePathFromUrl(const QUrl &url) { + QString path = url.path(); + if (path.startsWith(sRootUrl.path())) + return path.mid(sRootUrl.path().length()); + if (path.startsWith(sRootUrl2.path())) + return path.mid(sRootUrl2.path().length()); + if (path.startsWith(sUploadUrl.path())) + return path.mid(sUploadUrl.path().length()); + return {}; +} + inline QString generateEtag() { return QString::number(QDateTime::currentDateTime().toMSecsSinceEpoch(), 16); @@ -320,8 +334,8 @@ public: xml.writeEndElement(); // response }; - Q_ASSERT(request.url().path().startsWith(sRootUrl.path())); - QString fileName = request.url().path().mid(sRootUrl.path().length()); + QString fileName = getFilePathFromUrl(request.url()); + Q_ASSERT(!fileName.isNull()); // for root, it should be empty const FileInfo *fileInfo = remoteRootFileInfo.find(fileName); Q_ASSERT(fileInfo); @@ -368,8 +382,8 @@ public: setOperation(op); open(QIODevice::ReadOnly); - Q_ASSERT(request.url().path().startsWith(sRootUrl.path())); - QString fileName = request.url().path().mid(sRootUrl.path().length()); + QString fileName = getFilePathFromUrl(request.url()); + Q_ASSERT(!fileName.isEmpty()); if ((fileInfo = remoteRootFileInfo.find(fileName))) { fileInfo->size = putPayload.size(); fileInfo->contentChar = putPayload.at(0); @@ -410,8 +424,8 @@ public: setOperation(op); open(QIODevice::ReadOnly); - Q_ASSERT(request.url().path().startsWith(sRootUrl.path())); - QString fileName = request.url().path().mid(sRootUrl.path().length()); + QString fileName = getFilePathFromUrl(request.url()); + Q_ASSERT(!fileName.isEmpty()); fileInfo = remoteRootFileInfo.createDir(fileName); if (!fileInfo) { @@ -443,8 +457,8 @@ public: setOperation(op); open(QIODevice::ReadOnly); - Q_ASSERT(request.url().path().startsWith(sRootUrl.path())); - QString fileName = request.url().path().mid(sRootUrl.path().length()); + QString fileName = getFilePathFromUrl(request.url()); + Q_ASSERT(!fileName.isEmpty()); remoteRootFileInfo.remove(fileName); QMetaObject::invokeMethod(this, "respond", Qt::QueuedConnection); } @@ -470,11 +484,10 @@ public: setOperation(op); open(QIODevice::ReadOnly); - Q_ASSERT(request.url().path().startsWith(sRootUrl.path())); - QString fileName = request.url().path().mid(sRootUrl.path().length()); - QString destPath = request.rawHeader("Destination"); - Q_ASSERT(destPath.startsWith(sRootUrl.path())); - QString dest = destPath.mid(sRootUrl.path().length()); + QString fileName = getFilePathFromUrl(request.url()); + Q_ASSERT(!fileName.isEmpty()); + QString dest = getFilePathFromUrl(QUrl::fromEncoded(request.rawHeader("Destination"))); + Q_ASSERT(!dest.isEmpty()); remoteRootFileInfo.rename(fileName, dest); QMetaObject::invokeMethod(this, "respond", Qt::QueuedConnection); } @@ -503,8 +516,8 @@ public: setOperation(op); open(QIODevice::ReadOnly); - Q_ASSERT(request.url().path().startsWith(sRootUrl.path())); - QString fileName = request.url().path().mid(sRootUrl.path().length()); + QString fileName = getFilePathFromUrl(request.url()); + Q_ASSERT(!fileName.isEmpty()); fileInfo = remoteRootFileInfo.find(fileName); QMetaObject::invokeMethod(this, "respond", Qt::QueuedConnection); } @@ -533,6 +546,78 @@ public: } }; + +class FakeChunkMoveReply : public QNetworkReply +{ + Q_OBJECT + FileInfo *fileInfo; +public: + FakeChunkMoveReply(FileInfo &uploadsFileInfo, FileInfo &remoteRootFileInfo, + QNetworkAccessManager::Operation op, const QNetworkRequest &request, + QObject *parent) : QNetworkReply{parent} { + setRequest(request); + setUrl(request.url()); + setOperation(op); + open(QIODevice::ReadOnly); + + QString source = getFilePathFromUrl(request.url()); + Q_ASSERT(!source.isEmpty()); + Q_ASSERT(source.endsWith("/.file")); + source = source.left(source.length() - qstrlen("/.file")); + auto sourceFolder = uploadsFileInfo.find(source); + Q_ASSERT(sourceFolder); + Q_ASSERT(sourceFolder->isDir); + int count = 0; + int size = 0; + char payload = '*'; + + do { + if (!sourceFolder->children.contains(QString::number(count))) + break; + auto &x = sourceFolder->children[QString::number(count)]; + Q_ASSERT(!x.isDir); + Q_ASSERT(x.size > 0); // There should not be empty chunks + size += x.size; + payload = x.contentChar; + ++count; + } while(true); + + Q_ASSERT(count > 1); // There should be at least two chunks, otherwise why would we use chunking? + + QString fileName = getFilePathFromUrl(QUrl::fromEncoded(request.rawHeader("Destination"))); + Q_ASSERT(!fileName.isEmpty()); + + if ((fileInfo = remoteRootFileInfo.find(fileName))) { + QCOMPARE(request.rawHeader("If"), QByteArray("<" + request.rawHeader("Destination") + "> ([\"" + fileInfo->etag.toLatin1() + "\"])")); + fileInfo->size = size; + fileInfo->contentChar = payload; + } else { + Q_ASSERT(!request.hasRawHeader("If")); + // Assume that the file is filled with the same character + fileInfo = remoteRootFileInfo.create(fileName, size, payload); + } + + if (!fileInfo) { + abort(); + return; + } + QMetaObject::invokeMethod(this, "respond", Qt::QueuedConnection); + } + + Q_INVOKABLE void respond() { + setAttribute(QNetworkRequest::HttpStatusCodeAttribute, 201); + setRawHeader("OC-ETag", fileInfo->etag.toLatin1()); + setRawHeader("ETag", fileInfo->etag.toLatin1()); + setRawHeader("OC-FileId", fileInfo->fileId); + emit metaDataChanged(); + emit finished(); + } + + void abort() override { } + qint64 readData(char *, qint64) override { return 0; } +}; + + class FakeErrorReply : public QNetworkReply { Q_OBJECT @@ -559,33 +644,41 @@ public: class FakeQNAM : public QNetworkAccessManager { FileInfo _remoteRootFileInfo; + FileInfo _uploadFileInfo; QStringList _errorPaths; public: FakeQNAM(FileInfo initialRoot) : _remoteRootFileInfo{std::move(initialRoot)} { } FileInfo ¤tRemoteState() { return _remoteRootFileInfo; } + FileInfo &uploadState() { return _uploadFileInfo; } QStringList &errorPaths() { return _errorPaths; } protected: QNetworkReply *createRequest(Operation op, const QNetworkRequest &request, QIODevice *outgoingData = 0) { - const QString fileName = request.url().path().mid(sRootUrl.path().length()); + const QString fileName = getFilePathFromUrl(request.url()); + Q_ASSERT(!fileName.isNull()); if (_errorPaths.contains(fileName)) return new FakeErrorReply{op, request, this}; + bool isUpload = request.url().path().startsWith(sUploadUrl.path()); + FileInfo &info = isUpload ? _uploadFileInfo : _remoteRootFileInfo; + auto verb = request.attribute(QNetworkRequest::CustomVerbAttribute); if (verb == QLatin1String("PROPFIND")) // Ignore outgoingData always returning somethign good enough, works for now. - return new FakePropfindReply{_remoteRootFileInfo, op, request, this}; + return new FakePropfindReply{info, op, request, this}; else if (verb == QLatin1String("GET")) - return new FakeGetReply{_remoteRootFileInfo, op, request, this}; + return new FakeGetReply{info, op, request, this}; else if (verb == QLatin1String("PUT")) - return new FakePutReply{_remoteRootFileInfo, op, request, outgoingData->readAll(), this}; + return new FakePutReply{info, op, request, outgoingData->readAll(), this}; else if (verb == QLatin1String("MKCOL")) - return new FakeMkcolReply{_remoteRootFileInfo, op, request, this}; + return new FakeMkcolReply{info, op, request, this}; else if (verb == QLatin1String("DELETE")) - return new FakeDeleteReply{_remoteRootFileInfo, op, request, this}; - else if (verb == QLatin1String("MOVE")) - return new FakeMoveReply{_remoteRootFileInfo, op, request, this}; + return new FakeDeleteReply{info, op, request, this}; + else if (verb == QLatin1String("MOVE") && !isUpload) + return new FakeMoveReply{info, op, request, this}; + else if (verb == QLatin1String("MOVE") && isUpload) + return new FakeChunkMoveReply{info, _remoteRootFileInfo, op, request, this}; else { qDebug() << verb << outgoingData; Q_UNREACHABLE(); @@ -657,6 +750,7 @@ public: } FileInfo currentRemoteState() { return _fakeQnam->currentRemoteState(); } + FileInfo uploadState() { return _fakeQnam->uploadState(); } QStringList &serverErrorPaths() { return _fakeQnam->errorPaths(); } @@ -693,14 +787,16 @@ public: QVERIFY(false); } - void execUntilFinished() { + bool execUntilFinished() { QSignalSpy spy(_syncEngine.get(), SIGNAL(finished(bool))); - QVERIFY(spy.wait()); + bool ok = spy.wait(); + Q_ASSERT(ok && "Sync timed out"); + return spy[0][0].toBool(); } - void syncOnce() { + bool syncOnce() { scheduleSync(); - execUntilFinished(); + return execUntilFinished(); } private: diff --git a/test/testchunkingng.cpp b/test/testchunkingng.cpp new file mode 100644 index 000000000..474aa8f5c --- /dev/null +++ b/test/testchunkingng.cpp @@ -0,0 +1,39 @@ +/* + * This software is in the public domain, furnished "as is", without technical + * support, and with no warranty, express or implied, as to its usefulness for + * any purpose. + * + */ + +#include +#include "syncenginetestutils.h" +#include + +using namespace OCC; + +class TestChunkingNG : public QObject +{ + Q_OBJECT + +private slots: + + void testFileUpload() { + FakeFolder fakeFolder{FileInfo::A12_B12_C12_S12()}; + fakeFolder.syncEngine().account()->setCapabilities({ { "dav", QVariantMap{ {"chunking", "1.0"} } } }); + const int size = 300 * 1000 * 1000; // 300 MB + fakeFolder.localModifier().insert("A/a0", size); + QVERIFY(fakeFolder.syncOnce()); + QCOMPARE(fakeFolder.currentLocalState(), fakeFolder.currentRemoteState()); + QCOMPARE(fakeFolder.uploadState().children.count(), 1); // the transfer was done with chunking + QCOMPARE(fakeFolder.currentLocalState().find("A/a0")->size, size); + + // Check that another upload of the same file also work. + fakeFolder.localModifier().appendByte("A/a0"); + QVERIFY(fakeFolder.syncOnce()); + QCOMPARE(fakeFolder.currentLocalState(), fakeFolder.currentRemoteState()); + QCOMPARE(fakeFolder.uploadState().children.count(), 2); // the transfer was done with chunking + } +}; + +QTEST_GUILESS_MAIN(TestChunkingNG) +#include "testchunkingng.moc" From 15f2b911d960dd88e86a388ccf1ced3eda0de4dd Mon Sep 17 00:00:00 2001 From: Olivier Goffart Date: Mon, 31 Oct 2016 17:44:00 +0100 Subject: [PATCH 17/19] ChunkingNG: remove stale files when resuming --- src/libsync/propagateremotedelete.cpp | 5 +- src/libsync/propagateremotedelete.h | 2 + src/libsync/propagateupload.h | 2 + src/libsync/propagateuploadng.cpp | 68 ++++++++++++++++++++++++--- test/syncenginetestutils.h | 17 ++++--- test/testchunkingng.cpp | 41 +++++++++++++++- 6 files changed, 119 insertions(+), 16 deletions(-) diff --git a/src/libsync/propagateremotedelete.cpp b/src/libsync/propagateremotedelete.cpp index 30b06fe41..9be19c309 100644 --- a/src/libsync/propagateremotedelete.cpp +++ b/src/libsync/propagateremotedelete.cpp @@ -22,11 +22,14 @@ DeleteJob::DeleteJob(AccountPtr account, const QString& path, QObject* parent) : AbstractNetworkJob(account, path, parent) { } +DeleteJob::DeleteJob(AccountPtr account, const QUrl& url, QObject* parent) + : AbstractNetworkJob(account, QString(), parent), _url(url) +{ } void DeleteJob::start() { QNetworkRequest req; - setReply(davRequest("DELETE", path(), req)); + setReply(_url.isValid() ? davRequest("DELETE", _url, req) : davRequest("DELETE", path(), req)); setupConnections(reply()); if( reply()->error() != QNetworkReply::NoError ) { diff --git a/src/libsync/propagateremotedelete.h b/src/libsync/propagateremotedelete.h index 0473042c8..44f4ebd3b 100644 --- a/src/libsync/propagateremotedelete.h +++ b/src/libsync/propagateremotedelete.h @@ -24,8 +24,10 @@ namespace OCC { */ class DeleteJob : public AbstractNetworkJob { Q_OBJECT + QUrl _url; // Only used if the constructor taking a url is taken. public: explicit DeleteJob(AccountPtr account, const QString& path, QObject* parent = 0); + explicit DeleteJob(AccountPtr account, const QUrl& url, QObject* parent = 0); void start() Q_DECL_OVERRIDE; bool finished() Q_DECL_OVERRIDE; diff --git a/src/libsync/propagateupload.h b/src/libsync/propagateupload.h index e7f88f169..bac3e112b 100644 --- a/src/libsync/propagateupload.h +++ b/src/libsync/propagateupload.h @@ -289,6 +289,7 @@ private: quint64 _sent; /// amount of data (bytes) that was already sent uint _transferId; /// transfer id (part of the url) int _currentChunk; /// Id of the next chunk that will be sent + bool _removeJobError; /// If not null, there was an error removing the job // Map chunk number with its size from the PROPFIND on resume. // (Only used from slotPropfindIterate/slotPropfindFinished because the LsColJob use signals to report data.) @@ -313,6 +314,7 @@ private slots: void slotPropfindFinished(); void slotPropfindFinishedWithError(); void slotPropfindIterate(const QString &name, const QMap &properties); + void slotDeleteJobFinished(); void slotMkColFinished(QNetworkReply::NetworkError); void slotPutFinished(); void slotMoveJobFinished(); diff --git a/src/libsync/propagateuploadng.cpp b/src/libsync/propagateuploadng.cpp index abfd8257c..5cddc22ac 100644 --- a/src/libsync/propagateuploadng.cpp +++ b/src/libsync/propagateuploadng.cpp @@ -24,6 +24,8 @@ #include "propagatorjobs.h" #include "syncengine.h" #include "propagateremotemove.h" +#include "propagateremotedelete.h" + #include #include @@ -56,10 +58,13 @@ QUrl PropagateUploadFileNG::chunkUrl(int chunk) startNewUpload() <-+ +----------------------------\ | | | \ MKCOL + slotPropfindFinishedWithError() slotPropfindFinished() - | | - slotMkColFinished() | - | | - +-----+-------------------------------------------------------+ + | Is there stale files to remove? + slotMkColFinished() | | + | no yes + | | | + | | DeleteJob + | | | + +-----+<------------------------------------------------------+<--- slotDeleteJobFinished() | +----> startNextChunk() ---finished? --+ ^ | | @@ -119,12 +124,29 @@ void PropagateUploadFileNG::slotPropfindFinished() _sent = 0; while (_serverChunks.contains(_currentChunk)) { _sent += _serverChunks[_currentChunk]; + _serverChunks.remove(_currentChunk); ++_currentChunk; } - // FIXME: we should make sure that if there is a "hole" and then a few more chunks, on the server - // we should remove the later chunks. Otherwise when we do dynamic chunk sizing, we may end up - // with corruptions if there are too many chunks, or if we abort and there are still stale chunks. + qDebug() << "Resuming "<< _item->_file << " from chunk " << _currentChunk << "; sent ="<< _sent; + + if (!_serverChunks.isEmpty()) { + qDebug() << "To Delete" << _serverChunks; + _propagator->_activeJobList.append(this); + _removeJobError = false; + + // Make sure that if there is a "hole" and then a few more chunks, on the server + // we should remove the later chunks. Otherwise when we do dynamic chunk sizing, we may end up + // with corruptions if there are too many chunks, or if we abort and there are still stale chunks. + for (auto it = _serverChunks.begin(); it != _serverChunks.end(); ++it) { + auto job = new DeleteJob(_propagator->account(), Utility::concatUrlPath(chunkUrl(), QString::number(it.key())), this); + QObject::connect(job, SIGNAL(finishedSignal()), this, SLOT(slotDeleteJobFinished())); + _jobs.append(job); + job->start(); + } + return; + } + startNextChunk(); } @@ -144,6 +166,38 @@ void PropagateUploadFileNG::slotPropfindFinishedWithError() startNewUpload(); } +void PropagateUploadFileNG::slotDeleteJobFinished() +{ + auto job = qobject_cast(sender()); + Q_ASSERT(job); + _jobs.remove(_jobs.indexOf(job)); + + QNetworkReply::NetworkError err = job->reply()->error(); + if (err != QNetworkReply::NoError && err != QNetworkReply::ContentNotFoundError) { + const int httpStatus = job->reply()->attribute(QNetworkRequest::HttpStatusCodeAttribute).toInt(); + SyncFileItem::Status status = classifyError(err, httpStatus); + if (status == SyncFileItem::FatalError) { + abortWithError(status, job->errorString()); + return; + } else { + qWarning() << "DeleteJob errored out" << job->errorString() << job->reply()->url(); + _removeJobError = true; + // Let the other jobs finish + } + } + + if (_jobs.isEmpty()) { + _propagator->_activeJobList.removeOne(this); + if (_removeJobError) { + // There was an error removing some files, just start over + startNewUpload(); + } else { + startNextChunk(); + } + } +} + + void PropagateUploadFileNG::startNewUpload() { diff --git a/test/syncenginetestutils.h b/test/syncenginetestutils.h index 51d7ce421..cbcb5fab2 100644 --- a/test/syncenginetestutils.h +++ b/test/syncenginetestutils.h @@ -297,6 +297,12 @@ public: setOperation(op); open(QIODevice::ReadOnly); + QString fileName = getFilePathFromUrl(request.url()); + Q_ASSERT(!fileName.isNull()); // for root, it should be empty + const FileInfo *fileInfo = remoteRootFileInfo.find(fileName); + Q_ASSERT(fileInfo); + QString prefix = request.url().path().left(request.url().path().size() - fileName.size()); + // Don't care about the request and just return a full propfind const QString davUri{QStringLiteral("DAV:")}; const QString ocUri{QStringLiteral("http://owncloud.org/ns")}; @@ -310,7 +316,7 @@ public: auto writeFileResponse = [&](const FileInfo &fileInfo) { xml.writeStartElement(davUri, QStringLiteral("response")); - xml.writeTextElement(davUri, QStringLiteral("href"), "/owncloud/remote.php/webdav/" + fileInfo.path()); + xml.writeTextElement(davUri, QStringLiteral("href"), prefix + fileInfo.path()); xml.writeStartElement(davUri, QStringLiteral("propstat")); xml.writeStartElement(davUri, QStringLiteral("prop")); @@ -334,11 +340,6 @@ public: xml.writeEndElement(); // response }; - QString fileName = getFilePathFromUrl(request.url()); - Q_ASSERT(!fileName.isNull()); // for root, it should be empty - const FileInfo *fileInfo = remoteRootFileInfo.find(fileName); - Q_ASSERT(fileInfo); - writeFileResponse(*fileInfo); foreach(const FileInfo &childFileInfo, fileInfo->children) writeFileResponse(childFileInfo); @@ -400,6 +401,7 @@ public: } Q_INVOKABLE void respond() { + emit uploadProgress(fileInfo->size, fileInfo->size); setRawHeader("OC-ETag", fileInfo->etag.toLatin1()); setRawHeader("ETag", fileInfo->etag.toLatin1()); setRawHeader("X-OC-MTime", "accepted"); // Prevents Q_ASSERT(!_runningNow) since we'll call PropagateItemJob::done twice in that case. @@ -583,6 +585,7 @@ public: } while(true); Q_ASSERT(count > 1); // There should be at least two chunks, otherwise why would we use chunking? + QCOMPARE(sourceFolder->children.count(), count); // There should not be holes or extra files QString fileName = getFilePathFromUrl(QUrl::fromEncoded(request.rawHeader("Destination"))); Q_ASSERT(!fileName.isEmpty()); @@ -750,7 +753,7 @@ public: } FileInfo currentRemoteState() { return _fakeQnam->currentRemoteState(); } - FileInfo uploadState() { return _fakeQnam->uploadState(); } + FileInfo &uploadState() { return _fakeQnam->uploadState(); } QStringList &serverErrorPaths() { return _fakeQnam->errorPaths(); } diff --git a/test/testchunkingng.cpp b/test/testchunkingng.cpp index 474aa8f5c..21225c210 100644 --- a/test/testchunkingng.cpp +++ b/test/testchunkingng.cpp @@ -25,7 +25,7 @@ private slots: QVERIFY(fakeFolder.syncOnce()); QCOMPARE(fakeFolder.currentLocalState(), fakeFolder.currentRemoteState()); QCOMPARE(fakeFolder.uploadState().children.count(), 1); // the transfer was done with chunking - QCOMPARE(fakeFolder.currentLocalState().find("A/a0")->size, size); + QCOMPARE(fakeFolder.currentRemoteState().find("A/a0")->size, size); // Check that another upload of the same file also work. fakeFolder.localModifier().appendByte("A/a0"); @@ -33,6 +33,45 @@ private slots: QCOMPARE(fakeFolder.currentLocalState(), fakeFolder.currentRemoteState()); QCOMPARE(fakeFolder.uploadState().children.count(), 2); // the transfer was done with chunking } + + + void testResume () { + + FakeFolder fakeFolder{FileInfo::A12_B12_C12_S12()}; + fakeFolder.syncEngine().account()->setCapabilities({ { "dav", QVariantMap{ {"chunking", "1.0"} } } }); + const int size = 300 * 1000 * 1000; // 300 MB + fakeFolder.localModifier().insert("A/a0", size); + + // Abort when the upload is at 1/3 + int sizeWhenAbort = -1; + auto con = QObject::connect(&fakeFolder.syncEngine(), &SyncEngine::transmissionProgress, + [&](const ProgressInfo &progress) { + if (progress.completedSize() > (progress.totalSize() /3 )) { + sizeWhenAbort = progress.completedSize(); + fakeFolder.syncEngine().abort(); + } + }); + + QVERIFY(!fakeFolder.syncOnce()); // there should have been an error + QObject::disconnect(con); + QVERIFY(sizeWhenAbort > 0); + QVERIFY(sizeWhenAbort < size); + QCOMPARE(fakeFolder.uploadState().children.count(), 1); // the transfer was done with chunking + auto upStateChildren = fakeFolder.uploadState().children.first().children; + QCOMPARE(sizeWhenAbort, std::accumulate(upStateChildren.cbegin(), upStateChildren.cend(), 0, + [](int s, const FileInfo &i) { return s + i.size; })); + + + // Add a fake file to make sure it gets deleted + fakeFolder.uploadState().children.first().insert("10000", size); + QVERIFY(fakeFolder.syncOnce()); + + + + QCOMPARE(fakeFolder.currentLocalState(), fakeFolder.currentRemoteState()); + QCOMPARE(fakeFolder.uploadState().children.count(), 1); // The same chunk id was re-used + QCOMPARE(fakeFolder.currentRemoteState().find("A/a0")->size, size); + } }; QTEST_GUILESS_MAIN(TestChunkingNG) From 5377d1e283ffccb1ad6a8771d5618854175a48e8 Mon Sep 17 00:00:00 2001 From: Olivier Goffart Date: Mon, 14 Nov 2016 10:42:07 +0100 Subject: [PATCH 18/19] Chunking-NG: code cleanup after review --- src/libsync/propagateuploadng.cpp | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/src/libsync/propagateuploadng.cpp b/src/libsync/propagateuploadng.cpp index 5cddc22ac..c96164c4b 100644 --- a/src/libsync/propagateuploadng.cpp +++ b/src/libsync/propagateuploadng.cpp @@ -110,7 +110,7 @@ void PropagateUploadFileNG::slotPropfindIterate(const QString &name, const QMap< bool ok = false; auto chunkId = name.mid(name.lastIndexOf('/')+1).toUInt(&ok); if (ok) { - this->_serverChunks[chunkId] = properties["getcontentlength"].toULongLong(); + _serverChunks[chunkId] = properties["getcontentlength"].toULongLong(); } } @@ -128,6 +128,16 @@ void PropagateUploadFileNG::slotPropfindFinished() ++_currentChunk; } + if (_sent > _item->_size) { + // Normally this can't happen because the size is xor'ed with the transfer id, and it is + // therefore impossible that there is more data on the server than on the file. + qWarning() << "Inconsistency while resuming " << _item->_file + << ": the size on the server (" << _sent << ") is bigger than the size of the file (" + << _item->_size << ")"; + startNewUpload(); + return; + } + qDebug() << "Resuming "<< _item->_file << " from chunk " << _currentChunk << "; sent ="<< _sent; if (!_serverChunks.isEmpty()) { @@ -251,14 +261,13 @@ void PropagateUploadFileNG::startNextChunk() return; quint64 fileSize = _item->_size; - + Q_ASSERT(fileSize >= _sent); quint64 currentChunkSize = qMin(chunkSize(), fileSize - _sent); - if (currentChunkSize <= 0) { + if (currentChunkSize == 0) { Q_ASSERT(_jobs.isEmpty()); // There should be no running job anymore _finished = true; // Finish with a MOVE - // QString destination = _propagator->_remoteDir + _item->_file; // FIXME: _remoteDir currently is still using the old webdav path QString destination = _propagator->account()->url().path() + QLatin1String("/remote.php/dav/files/") + _propagator->account()->user() + _propagator->_remoteFolder + _item->_file; @@ -379,7 +388,8 @@ void PropagateUploadFileNG::slotPutFinished() return; } - bool finished = _sent >= _item->_size; + Q_ASSERT(_sent <= _item->_size); + bool finished = _sent == _item->_size; // Check if the file still exists const QString fullFilePath(_propagator->getFilePath(_item->_file)); @@ -397,8 +407,6 @@ void PropagateUploadFileNG::slotPutFinished() _propagator->_anotherSyncNeeded = true; if( !finished ) { abortWithError(SyncFileItem::SoftError, tr("Local file changed during sync.")); - // FIXME: the legacy code was retrying for a few seconds. - // and also checking that after the last chunk, and removed the file in case of INSTRUCTION_NEW return; } } From 92027e869230d82de1bbbd0d486ebf14bfc471f7 Mon Sep 17 00:00:00 2001 From: Olivier Goffart Date: Tue, 15 Nov 2016 11:39:40 +0100 Subject: [PATCH 19/19] SyncEngineTestUtils: Do don't allocate a buffer for the whole file As the file can be some hunreds of megabytes, allocating such big arrays may cause problems. Also make the timeout a bit bigger so the test can rununder valgrind. --- test/syncenginetestutils.h | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/test/syncenginetestutils.h b/test/syncenginetestutils.h index cbcb5fab2..0f4249c38 100644 --- a/test/syncenginetestutils.h +++ b/test/syncenginetestutils.h @@ -82,10 +82,15 @@ public: QFile file{_rootDir.filePath(relativePath)}; QVERIFY(!file.exists()); file.open(QFile::WriteOnly); - file.write(QByteArray{}.fill(contentChar, size)); + QByteArray buf(1024, contentChar); + for (int x = 0; x < size/buf.size(); ++x) { + file.write(buf); + } + file.write(buf.data(), size % buf.size()); file.close(); // Set the mtime 30 seconds in the past, for some tests that need to make sure that the mtime differs. OCC::FileSystem::setModTime(file.fileName(), OCC::Utility::qDateTimeToTime_t(QDateTime::currentDateTime().addSecs(-30))); + QCOMPARE(file.size(), size); } void setContents(const QString &relativePath, char contentChar) override { QFile file{_rootDir.filePath(relativePath)}; @@ -792,7 +797,7 @@ public: bool execUntilFinished() { QSignalSpy spy(_syncEngine.get(), SIGNAL(finished(bool))); - bool ok = spy.wait(); + bool ok = spy.wait(60000); Q_ASSERT(ok && "Sync timed out"); return spy[0][0].toBool(); }