implement bulk upload

add PutMultiFileJob to send many files at once

use it in BulkPropagatorJob to implement bulk upload feature

Signed-off-by: Matthieu Gallien <matthieu.gallien@nextcloud.com>
This commit is contained in:
Matthieu Gallien 2021-09-08 12:10:01 +02:00
parent 112be18635
commit c194605c35
19 changed files with 1446 additions and 69 deletions

View file

@ -40,6 +40,8 @@ set(libsync_SRCS
propagateupload.cpp
propagateuploadv1.cpp
propagateuploadng.cpp
bulkpropagatorjob.cpp
putmultifilejob.cpp
propagateremotedelete.cpp
propagateremotedeleteencrypted.cpp
propagateremotedeleteencryptedrootfolder.cpp

View file

@ -148,6 +148,17 @@ QNetworkReply *AbstractNetworkJob::sendRequest(const QByteArray &verb, const QUr
return reply;
}
QNetworkReply *AbstractNetworkJob::sendRequest(const QByteArray &verb,
const QUrl &url,
QNetworkRequest req,
QHttpMultiPart *requestBody)
{
auto reply = _account->sendRawRequest(verb, url, req, requestBody);
_requestBody = nullptr;
adoptRequest(reply);
return reply;
}
void AbstractNetworkJob::adoptRequest(QNetworkReply *reply)
{
addTimer(reply);

View file

@ -138,6 +138,9 @@ protected:
QNetworkRequest req = QNetworkRequest(),
QIODevice *requestBody = nullptr);
QNetworkReply *sendRequest(const QByteArray &verb, const QUrl &url,
QNetworkRequest req, QHttpMultiPart *requestBody);
/** Makes this job drive a pre-made QNetworkReply
*
* This reply cannot have a QIODevice request body because we can't get

View file

@ -47,6 +47,7 @@
#include <QJsonObject>
#include <QJsonArray>
#include <QLoggingCategory>
#include <QHttpMultiPart>
#include <qsslconfiguration.h>
#include <qt5keychain/keychain.h>
@ -360,6 +361,18 @@ QNetworkReply *Account::sendRawRequest(const QByteArray &verb, const QUrl &url,
return _am->sendCustomRequest(req, verb, data);
}
QNetworkReply *Account::sendRawRequest(const QByteArray &verb, const QUrl &url, QNetworkRequest req, QHttpMultiPart *data)
{
req.setUrl(url);
req.setSslConfiguration(this->getOrCreateSslConfig());
if (verb == "PUT") {
return _am->put(req, data);
} else if (verb == "POST") {
return _am->post(req, data);
}
return _am->sendCustomRequest(req, verb, data);
}
SimpleNetworkJob *Account::sendRequest(const QByteArray &verb, const QUrl &url, QNetworkRequest req, QIODevice *data)
{
auto job = new SimpleNetworkJob(sharedFromThis());

View file

@ -154,6 +154,9 @@ public:
QNetworkReply *sendRawRequest(const QByteArray &verb,
const QUrl &url, QNetworkRequest req, const QByteArray &data);
QNetworkReply *sendRawRequest(const QByteArray &verb,
const QUrl &url, QNetworkRequest req, QHttpMultiPart *data);
/** Create and start network job for a simple one-off request.
*
* More complicated requests typically create their own job types.

View file

@ -0,0 +1,679 @@
/*
* Copyright 2021 (c) Matthieu Gallien <matthieu.gallien@nextcloud.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* for more details.
*/
#include "bulkpropagatorjob.h"
#include "putmultifilejob.h"
#include "owncloudpropagator_p.h"
#include "syncfileitem.h"
#include "syncengine.h"
#include "propagateupload.h"
#include "propagatorjobs.h"
#include "filesystem.h"
#include "account.h"
#include "common/utility.h"
#include "common/checksums.h"
#include "networkjobs.h"
#include <QFileInfo>
#include <QDir>
#include <QJsonDocument>
#include <QJsonArray>
#include <QJsonObject>
#include <QJsonValue>
namespace OCC {
Q_LOGGING_CATEGORY(lcBulkPropagatorJob, "nextcloud.sync.propagator.bulkupload", QtInfoMsg)
}
namespace {
QByteArray getEtagFromJsonReply(const QJsonObject &reply)
{
const auto ocEtag = OCC::parseEtag(reply.value("OC-ETag").toString().toLatin1());
const auto ETag = OCC::parseEtag(reply.value("ETag").toString().toLatin1());
const auto etag = OCC::parseEtag(reply.value("etag").toString().toLatin1());
QByteArray ret = ocEtag;
if (ret.isEmpty()) {
ret = ETag;
}
if (ret.isEmpty()) {
ret = etag;
}
if (ocEtag.length() > 0 && ocEtag != etag && ocEtag != ETag) {
qCDebug(OCC::lcBulkPropagatorJob) << "Quite peculiar, we have an etag != OC-Etag [no problem!]" << etag << ETag << ocEtag;
}
return ret;
}
QByteArray getHeaderFromJsonReply(const QJsonObject &reply, const QByteArray &headerName)
{
return reply.value(headerName).toString().toLatin1();
}
}
namespace OCC {
BulkPropagatorJob::BulkPropagatorJob(OwncloudPropagator *propagator,
const std::deque<SyncFileItemPtr> &items)
: PropagatorJob(propagator)
, _items(items)
{
_filesToUpload.reserve(100);
}
bool BulkPropagatorJob::scheduleSelfOrChild()
{
if (_items.empty()) {
return false;
}
_state = Running;
for(int i = 0; i < 100 && !_items.empty(); ++i) {
auto currentItem = _items.front();
_items.pop_front();
_pendingChecksumFiles.insert(currentItem->_file);
QMetaObject::invokeMethod(this, [this, currentItem] () {
UploadFileInfo fileToUpload;
fileToUpload._file = currentItem->_file;
fileToUpload._size = currentItem->_size;
fileToUpload._path = propagator()->fullLocalPath(fileToUpload._file);
startUploadFile(currentItem, fileToUpload);
}); // We could be in a different thread (neon jobs)
}
return _items.empty() && _filesToUpload.empty();
}
PropagatorJob::JobParallelism BulkPropagatorJob::parallelism()
{
return PropagatorJob::JobParallelism::WaitForFinished;
}
void BulkPropagatorJob::startUploadFile(SyncFileItemPtr item, UploadFileInfo fileToUpload)
{
if (propagator()->_abortRequested) {
return;
}
// Check if the specific file can be accessed
if (propagator()->hasCaseClashAccessibilityProblem(fileToUpload._file)) {
done(item, SyncFileItem::NormalError, tr("File %1 cannot be uploaded because another file with the same name, differing only in case, exists").arg(QDir::toNativeSeparators(item->_file)));
return;
}
return slotComputeTransmissionChecksum(item, fileToUpload);
}
void BulkPropagatorJob::doStartUpload(SyncFileItemPtr item,
UploadFileInfo fileToUpload,
QByteArray transmissionChecksumHeader)
{
if (propagator()->_abortRequested) {
return;
}
// write the checksum in the database, so if the POST is sent
// to the server, but the connection drops before we get the etag, we can check the checksum
// in reconcile (issue #5106)
SyncJournalDb::UploadInfo pi;
pi._valid = true;
pi._chunk = 0;
pi._transferid = 0; // We set a null transfer id because it is not chunked.
pi._modtime = item->_modtime;
pi._errorCount = 0;
pi._contentChecksum = item->_checksumHeader;
pi._size = item->_size;
propagator()->_journal->setUploadInfo(item->_file, pi);
propagator()->_journal->commit("Upload info");
auto currentHeaders = headers(item);
currentHeaders[QByteArrayLiteral("Content-Length")] = QByteArray::number(fileToUpload._size);
if (!item->_renameTarget.isEmpty() && item->_file != item->_renameTarget) {
// Try to rename the file
const auto originalFilePathAbsolute = propagator()->fullLocalPath(item->_file);
const auto newFilePathAbsolute = propagator()->fullLocalPath(item->_renameTarget);
const auto renameSuccess = QFile::rename(originalFilePathAbsolute, newFilePathAbsolute);
if (!renameSuccess) {
done(item, SyncFileItem::NormalError, "File contains trailing spaces and couldn't be renamed");
return;
}
qCWarning(lcBulkPropagatorJob()) << item->_file << item->_renameTarget;
fileToUpload._file = item->_file = item->_renameTarget;
fileToUpload._path = propagator()->fullLocalPath(fileToUpload._file);
item->_modtime = FileSystem::getModTime(newFilePathAbsolute);
}
const auto remotePath = propagator()->fullRemotePath(fileToUpload._file);
currentHeaders["X-File-MD5"] = transmissionChecksumHeader;
BulkUploadItem newUploadFile{propagator()->account(), item, fileToUpload,
remotePath, fileToUpload._path,
fileToUpload._size, currentHeaders};
qCInfo(lcBulkPropagatorJob) << remotePath << "transmission checksum" << transmissionChecksumHeader << fileToUpload._path;
_filesToUpload.push_back(std::move(newUploadFile));
_pendingChecksumFiles.remove(item->_file);
if (_pendingChecksumFiles.empty()) {
triggerUpload();
}
}
void BulkPropagatorJob::triggerUpload()
{
auto uploadParametersData = std::vector<SingleUploadFileData>{};
uploadParametersData.reserve(_filesToUpload.size());
int timeout = 0;
for(auto &singleFile : _filesToUpload) {
// job takes ownership of device via a QScopedPointer. Job deletes itself when finishing
auto device = std::make_unique<UploadDevice>(
singleFile._localPath, 0, singleFile._fileSize, &propagator()->_bandwidthManager);
if (!device->open(QIODevice::ReadOnly)) {
qCWarning(lcBulkPropagatorJob) << "Could not prepare upload device: " << device->errorString();
// If the file is currently locked, we want to retry the sync
// when it becomes available again.
if (FileSystem::isFileLocked(singleFile._localPath)) {
emit propagator()->seenLockedFile(singleFile._localPath);
}
// Soft error because this is likely caused by the user modifying his files while syncing
abortWithError(singleFile._item, SyncFileItem::SoftError, device->errorString());
return;
}
singleFile._headers["X-File-Path"] = singleFile._remotePath.toUtf8();
uploadParametersData.push_back({std::move(device), singleFile._headers});
timeout += singleFile._fileSize;
}
const auto bulkUploadUrl = Utility::concatUrlPath(propagator()->account()->url(), QStringLiteral("/remote.php/dav/bulk"));
auto job = std::make_unique<PutMultiFileJob>(propagator()->account(), bulkUploadUrl, std::move(uploadParametersData), this);
connect(job.get(), &PutMultiFileJob::finishedSignal, this, &BulkPropagatorJob::slotPutFinished);
for(auto &singleFile : _filesToUpload) {
connect(job.get(), &PutMultiFileJob::uploadProgress,
this, [this, singleFile] (qint64 sent, qint64 total) {
slotUploadProgress(singleFile._item, sent, total);
});
}
adjustLastJobTimeout(job.get(), timeout);
_jobs.append(job.get());
job.release()->start();
}
void BulkPropagatorJob::slotComputeTransmissionChecksum(SyncFileItemPtr item,
UploadFileInfo fileToUpload)
{
// Reuse the content checksum as the transmission checksum if possible
const auto supportedTransmissionChecksums =
propagator()->account()->capabilities().supportedChecksumTypes();
// Compute the transmission checksum.
auto computeChecksum = std::make_unique<ComputeChecksum>(this);
if (uploadChecksumEnabled()) {
computeChecksum->setChecksumType("MD5" /*propagator()->account()->capabilities().uploadChecksumType()*/);
} else {
computeChecksum->setChecksumType(QByteArray());
}
connect(computeChecksum.get(), &ComputeChecksum::done,
this, [this, item, fileToUpload] (const QByteArray &contentChecksumType, const QByteArray &contentChecksum) {
slotStartUpload(item, fileToUpload, contentChecksumType, contentChecksum);
});
connect(computeChecksum.get(), &ComputeChecksum::done,
computeChecksum.get(), &QObject::deleteLater);
computeChecksum.release()->start(fileToUpload._path);
}
void BulkPropagatorJob::slotStartUpload(SyncFileItemPtr item,
UploadFileInfo fileToUpload,
const QByteArray &transmissionChecksumType,
const QByteArray &transmissionChecksum)
{
const auto transmissionChecksumHeader = makeChecksumHeader(transmissionChecksumType, transmissionChecksum);
item->_checksumHeader = transmissionChecksumHeader;
const QString fullFilePath = fileToUpload._path;
const QString originalFilePath = propagator()->fullLocalPath(item->_file);
if (!FileSystem::fileExists(fullFilePath)) {
return slotOnErrorStartFolderUnlock(item, SyncFileItem::SoftError, tr("File Removed (start upload) %1").arg(fullFilePath));
}
const time_t prevModtime = item->_modtime; // the _item value was set in PropagateUploadFile::start()
// but a potential checksum calculation could have taken some time during which the file could
// have been changed again, so better check again here.
item->_modtime = FileSystem::getModTime(originalFilePath);
if (prevModtime != item->_modtime) {
propagator()->_anotherSyncNeeded = true;
qDebug() << "trigger another sync after checking modified time of item" << item->_file << "prevModtime" << prevModtime << "Curr" << item->_modtime;
return slotOnErrorStartFolderUnlock(item, SyncFileItem::SoftError, tr("Local file changed during syncing. It will be resumed."));
}
fileToUpload._size = FileSystem::getSize(fullFilePath);
item->_size = FileSystem::getSize(originalFilePath);
// But skip the file if the mtime is too close to 'now'!
// That usually indicates a file that is still being changed
// or not yet fully copied to the destination.
if (fileIsStillChanging(*item)) {
propagator()->_anotherSyncNeeded = true;
return slotOnErrorStartFolderUnlock(item, SyncFileItem::SoftError, tr("Local file changed during sync."));
}
doStartUpload(item, fileToUpload, transmissionChecksum);
}
void BulkPropagatorJob::slotOnErrorStartFolderUnlock(SyncFileItemPtr item,
SyncFileItem::Status status,
const QString &errorString)
{
qCInfo(lcBulkPropagatorJob()) << status << errorString;
done(item, status, errorString);
}
void BulkPropagatorJob::slotPutFinishedOneFile(const BulkUploadItem &singleFile,
PutMultiFileJob *job,
const QJsonObject &fullReplyObject)
{
bool finished = false;
const auto fileReply = fullReplyObject.value(QChar('/') + singleFile._item->_file).toObject();
qCInfo(lcBulkPropagatorJob()) << singleFile._item->_file << "file headers" << fileReply;
if (!fileReply[QStringLiteral("error")].toBool()) {
singleFile._item->_httpErrorCode = static_cast<quint16>(200);
} else {
singleFile._item->_httpErrorCode = static_cast<quint16>(412);
}
singleFile._item->_responseTimeStamp = job->responseTimestamp();
singleFile._item->_requestId = job->requestId();
if (singleFile._item->_httpErrorCode != 200) {
commonErrorHandling(singleFile._item);
return;
}
singleFile._item->_status = SyncFileItem::Success;
// Check the file again post upload.
// Two cases must be considered separately: If the upload is finished,
// the file is on the server and has a changed ETag. In that case,
// the etag has to be properly updated in the client journal, and because
// of that we can bail out here with an error. But we can reschedule a
// sync ASAP.
// But if the upload is ongoing, because not all chunks were uploaded
// yet, the upload can be stopped and an error can be displayed, because
// the server hasn't registered the new file yet.
const auto etag = getEtagFromJsonReply(fileReply);
finished = etag.length() > 0;
const auto fullFilePath(propagator()->fullLocalPath(singleFile._item->_file));
// Check if the file still exists
if (!checkFileStillExists(singleFile._item, finished, fullFilePath)) {
return;
}
// Check whether the file changed since discovery. the file check here is the original and not the temporary.
if (!checkFileChanged(singleFile._item, finished, fullFilePath)) {
return;
}
// the file id should only be empty for new files up- or downloaded
computeFileId(singleFile._item, fileReply);
singleFile._item->_etag = etag;
if (getHeaderFromJsonReply(fileReply, "X-OC-MTime") != "accepted") {
// X-OC-MTime is supported since owncloud 5.0. But not when chunking.
// Normally Owncloud 6 always puts X-OC-MTime
qCWarning(lcBulkPropagatorJob) << "Server does not support X-OC-MTime" << getHeaderFromJsonReply(fileReply, "X-OC-MTime");
// Well, the mtime was not set
}
}
void BulkPropagatorJob::slotPutFinished()
{
auto *job = qobject_cast<PutMultiFileJob *>(sender());
Q_ASSERT(job);
slotJobDestroyed(job); // remove it from the _jobs list
const auto replyData = job->reply()->readAll();
const auto replyJson = QJsonDocument::fromJson(replyData);
const auto fullReplyObject = replyJson.object();
for (const auto &oneFile : _filesToUpload) {
slotPutFinishedOneFile(oneFile, job, fullReplyObject);
}
finalize();
}
void BulkPropagatorJob::slotUploadProgress(SyncFileItemPtr item, qint64 sent, qint64 total)
{
// Completion is signaled with sent=0, total=0; avoid accidentally
// resetting progress due to the sent being zero by ignoring it.
// finishedSignal() is bound to be emitted soon anyway.
// See https://bugreports.qt.io/browse/QTBUG-44782.
if (sent == 0 && total == 0) {
return;
}
propagator()->reportProgress(*item, sent - total);
}
void BulkPropagatorJob::slotJobDestroyed(QObject *job)
{
_jobs.erase(std::remove(_jobs.begin(), _jobs.end(), job), _jobs.end());
}
void BulkPropagatorJob::adjustLastJobTimeout(AbstractNetworkJob *job, qint64 fileSize) const
{
constexpr double threeMinutes = 3.0 * 60 * 1000;
job->setTimeout(qBound(
job->timeoutMsec(),
// Calculate 3 minutes for each gigabyte of data
qRound64(threeMinutes * static_cast<double>(fileSize) / 1e9),
// Maximum of 30 minutes
static_cast<qint64>(30 * 60 * 1000)));
}
void BulkPropagatorJob::finalizeOneFile(const BulkUploadItem &oneFile)
{
// Update the database entry
const auto result = propagator()->updateMetadata(*oneFile._item);
if (!result) {
done(oneFile._item, SyncFileItem::FatalError, tr("Error updating metadata: %1").arg(result.error()));
return;
} else if (*result == Vfs::ConvertToPlaceholderResult::Locked) {
done(oneFile._item, SyncFileItem::SoftError, tr("The file %1 is currently in use").arg(oneFile._item->_file));
return;
}
// Files that were new on the remote shouldn't have online-only pin state
// even if their parent folder is online-only.
if (oneFile._item->_instruction == CSYNC_INSTRUCTION_NEW
|| oneFile._item->_instruction == CSYNC_INSTRUCTION_TYPE_CHANGE) {
auto &vfs = propagator()->syncOptions()._vfs;
const auto pin = vfs->pinState(oneFile._item->_file);
if (pin && *pin == PinState::OnlineOnly && !vfs->setPinState(oneFile._item->_file, PinState::Unspecified)) {
qCWarning(lcBulkPropagatorJob) << "Could not set pin state of" << oneFile._item->_file << "to unspecified";
}
}
// Remove from the progress database:
propagator()->_journal->setUploadInfo(oneFile._item->_file, SyncJournalDb::UploadInfo());
propagator()->_journal->commit("upload file start");
}
void BulkPropagatorJob::finalize()
{
for(const auto &oneFile : _filesToUpload) {
if (!oneFile._item->hasErrorStatus()) {
finalizeOneFile(oneFile);
}
done(oneFile._item, oneFile._item->_status, {});
}
Q_ASSERT(!_filesToUpload.empty());
_filesToUpload.clear();
if (_items.empty()) {
if (!_jobs.empty()) {
// just wait for the other job to finish.
return;
}
if (!_pendingChecksumFiles.empty()) {
// just wait for the other job to finish.
return;
}
qCInfo(lcBulkPropagatorJob) << "final status" << _finalStatus;
emit finished(_finalStatus);
propagator()->scheduleNextJob();
} else {
scheduleSelfOrChild();
}
}
void BulkPropagatorJob::done(SyncFileItemPtr item,
SyncFileItem::Status status,
const QString &errorString)
{
item->_status = status;
item->_errorString = errorString;
qCInfo(lcBulkPropagatorJob) << "Item completed" << item->destination() << item->_status << item->_instruction << item->_errorString;
handleFileRestoration(item, errorString);
if (propagator()->_abortRequested && (item->_status == SyncFileItem::NormalError
|| item->_status == SyncFileItem::FatalError)) {
// an abort request is ongoing. Change the status to Soft-Error
item->_status = SyncFileItem::SoftError;
}
if (item->_status != SyncFileItem::Success) {
// Blacklist handling
handleBulkUploadBlackList(item);
propagator()->_anotherSyncNeeded = true;
}
handleJobDoneErrors(item, status);
emit propagator()->itemCompleted(item);
}
QMap<QByteArray, QByteArray> BulkPropagatorJob::headers(SyncFileItemPtr item) const
{
QMap<QByteArray, QByteArray> headers;
headers[QByteArrayLiteral("Content-Type")] = QByteArrayLiteral("application/octet-stream");
headers[QByteArrayLiteral("X-File-Mtime")] = QByteArray::number(qint64(item->_modtime));
if (qEnvironmentVariableIntValue("OWNCLOUD_LAZYOPS")) {
headers[QByteArrayLiteral("OC-LazyOps")] = QByteArrayLiteral("true");
}
if (item->_file.contains(QLatin1String(".sys.admin#recall#"))) {
// This is a file recall triggered by the admin. Note: the
// recall list file created by the admin and downloaded by the
// client (.sys.admin#recall#) also falls into this category
// (albeit users are not supposed to mess up with it)
// We use a special tag header so that the server may decide to store this file away in some admin stage area
// And not directly in the user's area (which would trigger redownloads etc).
headers["OC-Tag"] = ".sys.admin#recall#";
}
if (!item->_etag.isEmpty() && item->_etag != "empty_etag"
&& item->_instruction != CSYNC_INSTRUCTION_NEW // On new files never send a If-Match
&& item->_instruction != CSYNC_INSTRUCTION_TYPE_CHANGE) {
// We add quotes because the owncloud server always adds quotes around the etag, and
// csync_owncloud.c's owncloud_file_id always strips the quotes.
headers[QByteArrayLiteral("If-Match")] = '"' + item->_etag + '"';
}
// Set up a conflict file header pointing to the original file
auto conflictRecord = propagator()->_journal->conflictRecord(item->_file.toUtf8());
if (conflictRecord.isValid()) {
headers[QByteArrayLiteral("OC-Conflict")] = "1";
if (!conflictRecord.initialBasePath.isEmpty()) {
headers[QByteArrayLiteral("OC-ConflictInitialBasePath")] = conflictRecord.initialBasePath;
}
if (!conflictRecord.baseFileId.isEmpty()) {
headers[QByteArrayLiteral("OC-ConflictBaseFileId")] = conflictRecord.baseFileId;
}
if (conflictRecord.baseModtime != -1) {
headers[QByteArrayLiteral("OC-ConflictBaseMtime")] = QByteArray::number(conflictRecord.baseModtime);
}
if (!conflictRecord.baseEtag.isEmpty()) {
headers[QByteArrayLiteral("OC-ConflictBaseEtag")] = conflictRecord.baseEtag;
}
}
return headers;
}
void BulkPropagatorJob::abortWithError(SyncFileItemPtr item,
SyncFileItem::Status status,
const QString &error)
{
abort(AbortType::Synchronous);
done(item, status, error);
}
void BulkPropagatorJob::checkResettingErrors(SyncFileItemPtr item) const
{
if (item->_httpErrorCode == 412
|| propagator()->account()->capabilities().httpErrorCodesThatResetFailingChunkedUploads().contains(item->_httpErrorCode)) {
auto uploadInfo = propagator()->_journal->getUploadInfo(item->_file);
uploadInfo._errorCount += 1;
if (uploadInfo._errorCount > 3) {
qCInfo(lcBulkPropagatorJob) << "Reset transfer of" << item->_file
<< "due to repeated error" << item->_httpErrorCode;
uploadInfo = SyncJournalDb::UploadInfo();
} else {
qCInfo(lcBulkPropagatorJob) << "Error count for maybe-reset error" << item->_httpErrorCode
<< "on file" << item->_file
<< "is" << uploadInfo._errorCount;
}
propagator()->_journal->setUploadInfo(item->_file, uploadInfo);
propagator()->_journal->commit("Upload info");
}
}
void BulkPropagatorJob::commonErrorHandling(SyncFileItemPtr item)
{
// Ensure errors that should eventually reset the chunked upload are tracked.
checkResettingErrors(item);
abortWithError(item, SyncFileItem::NormalError, tr("Error"));
}
bool BulkPropagatorJob::checkFileStillExists(SyncFileItemPtr item,
const bool finished,
const QString &fullFilePath)
{
if (!FileSystem::fileExists(fullFilePath)) {
if (!finished) {
abortWithError(item, SyncFileItem::SoftError, tr("The local file was removed during sync."));
return false;
} else {
propagator()->_anotherSyncNeeded = true;
}
}
return true;
}
bool BulkPropagatorJob::checkFileChanged(SyncFileItemPtr item,
const bool finished,
const QString &fullFilePath)
{
if (!FileSystem::verifyFileUnchanged(fullFilePath, item->_size, item->_modtime)) {
propagator()->_anotherSyncNeeded = true;
if (!finished) {
abortWithError(item, SyncFileItem::SoftError, tr("Local file changed during sync."));
// FIXME: the legacy code was retrying for a few seconds.
// and also checking that after the last chunk, and removed the file in case of INSTRUCTION_NEW
return false;
}
}
return true;
}
void BulkPropagatorJob::computeFileId(SyncFileItemPtr item,
const QJsonObject &fileReply) const
{
const auto fid = getHeaderFromJsonReply(fileReply, "OC-FileID");
if (!fid.isEmpty()) {
if (!item->_fileId.isEmpty() && item->_fileId != fid) {
qCWarning(lcBulkPropagatorJob) << "File ID changed!" << item->_fileId << fid;
}
item->_fileId = fid;
}
}
void BulkPropagatorJob::handleFileRestoration(SyncFileItemPtr item,
const QString &errorString) const
{
if (item->_isRestoration) {
if (item->_status == SyncFileItem::Success
|| item->_status == SyncFileItem::Conflict) {
item->_status = SyncFileItem::Restoration;
} else {
item->_errorString += tr("; Restoration Failed: %1").arg(errorString);
}
} else {
if (item->_errorString.isEmpty()) {
item->_errorString = errorString;
}
}
}
void BulkPropagatorJob::handleBulkUploadBlackList(SyncFileItemPtr item) const
{
propagator()->addToBulkUploadBlackList(item->_file);
}
void BulkPropagatorJob::handleJobDoneErrors(SyncFileItemPtr item,
SyncFileItem::Status status)
{
if (item->hasErrorStatus()) {
qCWarning(lcPropagator) << "Could not complete propagation of" << item->destination() << "by" << this << "with status" << item->_status << "and error:" << item->_errorString;
} else {
qCInfo(lcPropagator) << "Completed propagation of" << item->destination() << "by" << this << "with status" << item->_status;
}
if (item->_status == SyncFileItem::FatalError) {
// Abort all remaining jobs.
propagator()->abort();
}
switch (item->_status)
{
case SyncFileItem::BlacklistedError:
case SyncFileItem::Conflict:
case SyncFileItem::FatalError:
case SyncFileItem::FileIgnored:
case SyncFileItem::FileLocked:
case SyncFileItem::FileNameInvalid:
case SyncFileItem::NoStatus:
case SyncFileItem::NormalError:
case SyncFileItem::Restoration:
case SyncFileItem::SoftError:
_finalStatus = SyncFileItem::NormalError;
qCInfo(lcBulkPropagatorJob) << "modify final status NormalError" << _finalStatus << status;
break;
case SyncFileItem::DetailError:
_finalStatus = SyncFileItem::DetailError;
qCInfo(lcBulkPropagatorJob) << "modify final status DetailError" << _finalStatus << status;
break;
case SyncFileItem::Success:
break;
}
}
}

View file

@ -0,0 +1,164 @@
/*
* Copyright 2021 (c) Matthieu Gallien <matthieu.gallien@nextcloud.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* for more details.
*/
#pragma once
#include "owncloudpropagator.h"
#include "abstractnetworkjob.h"
#include <QLoggingCategory>
#include <QVector>
#include <QMap>
#include <QByteArray>
#include <deque>
namespace OCC {
Q_DECLARE_LOGGING_CATEGORY(lcBulkPropagatorJob)
class ComputeChecksum;
class PutMultiFileJob;
class BulkPropagatorJob : public PropagatorJob
{
Q_OBJECT
/* This is a minified version of the SyncFileItem,
* that holds only the specifics about the file that's
* being uploaded.
*
* This is needed if we wanna apply changes on the file
* that's being uploaded while keeping the original on disk.
*/
struct UploadFileInfo {
QString _file; /// I'm still unsure if I should use a SyncFilePtr here.
QString _path; /// the full path on disk.
qint64 _size;
};
struct BulkUploadItem
{
AccountPtr _account;
SyncFileItemPtr _item;
UploadFileInfo _fileToUpload;
QString _remotePath;
QString _localPath;
qint64 _fileSize;
QMap<QByteArray, QByteArray> _headers;
};
public:
explicit BulkPropagatorJob(OwncloudPropagator *propagator,
const std::deque<SyncFileItemPtr> &items);
bool scheduleSelfOrChild() override;
JobParallelism parallelism() override;
private slots:
void startUploadFile(SyncFileItemPtr item, UploadFileInfo fileToUpload);
// Content checksum computed, compute the transmission checksum
void slotComputeTransmissionChecksum(SyncFileItemPtr item,
UploadFileInfo fileToUpload);
// transmission checksum computed, prepare the upload
void slotStartUpload(SyncFileItemPtr item,
UploadFileInfo fileToUpload,
const QByteArray &transmissionChecksumType,
const QByteArray &transmissionChecksum);
// invoked on internal error to unlock a folder and faile
void slotOnErrorStartFolderUnlock(SyncFileItemPtr item,
SyncFileItem::Status status,
const QString &errorString);
void slotPutFinished();
void slotUploadProgress(SyncFileItemPtr item, qint64 sent, qint64 total);
void slotJobDestroyed(QObject *job);
private:
void doStartUpload(SyncFileItemPtr item,
UploadFileInfo fileToUpload,
QByteArray transmissionChecksumHeader);
void adjustLastJobTimeout(AbstractNetworkJob *job,
qint64 fileSize) const;
void finalize();
void finalizeOneFile(const BulkUploadItem &oneFile);
void slotPutFinishedOneFile(const BulkUploadItem &singleFile,
OCC::PutMultiFileJob *job,
const QJsonObject &fullReplyObject);
void done(SyncFileItemPtr item,
SyncFileItem::Status status,
const QString &errorString);
/** Bases headers that need to be sent on the PUT, or in the MOVE for chunking-ng */
QMap<QByteArray, QByteArray> headers(SyncFileItemPtr item) const;
void abortWithError(SyncFileItemPtr item,
SyncFileItem::Status status,
const QString &error);
/**
* Checks whether the current error is one that should reset the whole
* transfer if it happens too often. If so: Bump UploadInfo::errorCount
* and maybe perform the reset.
*/
void checkResettingErrors(SyncFileItemPtr item) const;
/**
* Error handling functionality that is shared between jobs.
*/
void commonErrorHandling(SyncFileItemPtr item);
bool checkFileStillExists(SyncFileItemPtr item,
const bool finished,
const QString &fullFilePath);
bool checkFileChanged(SyncFileItemPtr item,
const bool finished,
const QString &fullFilePath);
void computeFileId(SyncFileItemPtr item,
const QJsonObject &fileReply) const;
void handleFileRestoration(SyncFileItemPtr item,
const QString &errorString) const;
void handleBulkUploadBlackList(SyncFileItemPtr item) const;
void handleJobDoneErrors(SyncFileItemPtr item,
SyncFileItem::Status status);
void triggerUpload();
std::deque<SyncFileItemPtr> _items;
QVector<AbstractNetworkJob *> _jobs; /// network jobs that are currently in transit
QSet<QString> _pendingChecksumFiles;
std::vector<BulkUploadItem> _filesToUpload;
SyncFileItem::Status _finalStatus = SyncFileItem::Status::NoStatus;
};
}

View file

@ -21,6 +21,7 @@
#include "propagateremotedelete.h"
#include "propagateremotemove.h"
#include "propagateremotemkdir.h"
#include "bulkpropagatorjob.h"
#include "propagatorjobs.h"
#include "filesystem.h"
#include "common/utility.h"
@ -173,7 +174,7 @@ static SyncJournalErrorBlacklistRecord createBlacklistEntry(
*
* May adjust the status or item._errorString.
*/
static void blacklistUpdate(SyncJournalDb *journal, SyncFileItem &item)
void blacklistUpdate(SyncJournalDb *journal, SyncFileItem &item)
{
SyncJournalErrorBlacklistRecord oldEntry = journal->errorBlacklistEntry(item._file);
@ -396,6 +397,8 @@ std::unique_ptr<PropagateUploadFileCommon> OwncloudPropagator::createUploadJob(S
job->setDeleteExisting(deleteExisting);
removeFromBulkUploadBlackList(item->_file);
return job;
}
@ -861,7 +864,7 @@ Result<Vfs::ConvertToPlaceholderResult, QString> OwncloudPropagator::staticUpdat
bool OwncloudPropagator::isDelayedUploadItem(const SyncFileItemPtr &item) const
{
return account()->capabilities().bulkUpload() && !_scheduleDelayedTasks && !item->_isEncrypted && _syncOptions._minChunkSize > item->_size;
return account()->capabilities().bulkUpload() && !_scheduleDelayedTasks && !item->_isEncrypted && _syncOptions._minChunkSize > item->_size && !isInBulkUploadBlackList(item->_file);
}
void OwncloudPropagator::setScheduleDelayedTasks(bool active)
@ -874,6 +877,23 @@ void OwncloudPropagator::clearDelayedTasks()
_delayedTasks.clear();
}
void OwncloudPropagator::addToBulkUploadBlackList(const QString &file)
{
qCDebug(lcPropagator) << "black list for bulk upload" << file;
_bulkUploadBlackList.insert(file);
}
void OwncloudPropagator::removeFromBulkUploadBlackList(const QString &file)
{
qCDebug(lcPropagator) << "black list for bulk upload" << file;
_bulkUploadBlackList.remove(file);
}
bool OwncloudPropagator::isInBulkUploadBlackList(const QString &file) const
{
return _bulkUploadBlackList.contains(file);
}
// ================================================================================
PropagatorJob::PropagatorJob(OwncloudPropagator *propagator)
@ -1304,13 +1324,4 @@ QString OwncloudPropagator::remotePath() const
return _remoteFolder;
}
BulkPropagatorJob::BulkPropagatorJob(OwncloudPropagator *propagator, const QVector<SyncFileItemPtr> &items)
: PropagatorCompositeJob(propagator)
, _items(items)
{
for(const auto &oneItemJob : _items) {
appendTask(oneItemJob);
}
_items.clear();
}
}

View file

@ -31,6 +31,8 @@
#include "accountfwd.h"
#include "syncoptions.h"
#include <deque>
namespace OCC {
Q_DECLARE_LOGGING_CATEGORY(lcPropagator)
@ -46,6 +48,8 @@ qint64 criticalFreeSpaceLimit();
*/
qint64 freeSpaceLimit();
void blacklistUpdate(SyncJournalDb *journal, SyncFileItem &item);
class SyncJournalDb;
class OwncloudPropagator;
class PropagatorCompositeJob;
@ -380,19 +384,6 @@ private:
bool scheduleDelayedJobs();
};
class BulkPropagatorJob : public PropagatorCompositeJob
{
Q_OBJECT
public:
explicit BulkPropagatorJob(OwncloudPropagator *propagator,
const QVector<SyncFileItemPtr> &items);
private:
QVector<SyncFileItemPtr> _items;
};
/**
* @brief Dummy job that just mark it as completed and ignored
* @ingroup libsync
@ -431,7 +422,8 @@ public:
public:
OwncloudPropagator(AccountPtr account, const QString &localDir,
const QString &remoteFolder, SyncJournalDb *progressDb)
const QString &remoteFolder, SyncJournalDb *progressDb,
QSet<QString> &bulkUploadBlackList)
: _journal(progressDb)
, _finishedEmited(false)
, _bandwidthManager(this)
@ -440,6 +432,7 @@ public:
, _account(account)
, _localDir((localDir.endsWith(QChar('/'))) ? localDir : localDir + '/')
, _remoteFolder((remoteFolder.endsWith(QChar('/'))) ? remoteFolder : remoteFolder + '/')
, _bulkUploadBlackList(bulkUploadBlackList)
{
qRegisterMetaType<PropagatorJob::AbortType>("PropagatorJob::AbortType");
}
@ -611,7 +604,7 @@ public:
Q_REQUIRED_RESULT bool isDelayedUploadItem(const SyncFileItemPtr &item) const;
Q_REQUIRED_RESULT const QVector<SyncFileItemPtr>& delayedTasks() const
Q_REQUIRED_RESULT const std::deque<SyncFileItemPtr>& delayedTasks() const
{
return _delayedTasks;
}
@ -620,6 +613,12 @@ public:
void clearDelayedTasks();
void addToBulkUploadBlackList(const QString &file);
void removeFromBulkUploadBlackList(const QString &file);
bool isInBulkUploadBlackList(const QString &file) const;
private slots:
void abortTimeout()
@ -674,8 +673,12 @@ private:
const QString _localDir; // absolute path to the local directory. ends with '/'
const QString _remoteFolder; // remote folder, ends with '/'
QVector<SyncFileItemPtr> _delayedTasks;
std::deque<SyncFileItemPtr> _delayedTasks;
bool _scheduleDelayedTasks = false;
QSet<QString> &_bulkUploadBlackList;
static bool _allowDelayedUpload;
};

View file

@ -18,9 +18,33 @@
#include "owncloudpropagator.h"
#include "syncfileitem.h"
#include "networkjobs.h"
#include "syncengine.h"
#include <QLoggingCategory>
#include <QNetworkReply>
namespace {
/**
* We do not want to upload files that are currently being modified.
* To avoid that, we don't upload files that have a modification time
* that is too close to the current time.
*
* This interacts with the msBetweenRequestAndSync delay in the folder
* manager. If that delay between file-change notification and sync
* has passed, we should accept the file for upload here.
*/
inline bool fileIsStillChanging(const OCC::SyncFileItem &item)
{
const auto modtime = OCC::Utility::qDateTimeFromTime_t(item._modtime);
const qint64 msSinceMod = modtime.msecsTo(QDateTime::currentDateTimeUtc());
return std::chrono::milliseconds(msSinceMod) < OCC::SyncEngine::minimumFileAgeForUpload
// if the mtime is too much in the future we *do* upload the file
&& msSinceMod > -10000;
}
}
namespace OCC {
inline QByteArray getEtagFromReply(QNetworkReply *reply)

View file

@ -96,7 +96,7 @@ public:
void giveBandwidthQuota(qint64 q);
qint64 currentDownloadPosition();
QString errorString() const;
QString errorString() const override;
void setErrorString(const QString &s) { _errorString = s; }
SyncFileItem::Status errorStatus() { return _errorStatus; }

View file

@ -49,25 +49,6 @@ Q_LOGGING_CATEGORY(lcPropagateUpload, "nextcloud.sync.propagator.upload", QtInfo
Q_LOGGING_CATEGORY(lcPropagateUploadV1, "nextcloud.sync.propagator.upload.v1", QtInfoMsg)
Q_LOGGING_CATEGORY(lcPropagateUploadNG, "nextcloud.sync.propagator.upload.ng", QtInfoMsg)
/**
* We do not want to upload files that are currently being modified.
* To avoid that, we don't upload files that have a modification time
* that is too close to the current time.
*
* This interacts with the msBetweenRequestAndSync delay in the folder
* manager. If that delay between file-change notification and sync
* has passed, we should accept the file for upload here.
*/
static bool fileIsStillChanging(const SyncFileItem &item)
{
const QDateTime modtime = Utility::qDateTimeFromTime_t(item._modtime);
const qint64 msSinceMod = modtime.msecsTo(QDateTime::currentDateTimeUtc());
return std::chrono::milliseconds(msSinceMod) < SyncEngine::minimumFileAgeForUpload
// if the mtime is too much in the future we *do* upload the file
&& msSinceMod > -10000;
}
PUTFileJob::~PUTFileJob()
{
// Make sure that we destroy the QNetworkReply before our _device of which it keeps an internal pointer.

View file

@ -0,0 +1,70 @@
/*
* Copyright 2021 (c) Matthieu Gallien <matthieu.gallien@nextcloud.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* for more details.
*/
#include "putmultifilejob.h"
#include <QHttpPart>
namespace OCC {
Q_LOGGING_CATEGORY(lcPutMultiFileJob, "nextcloud.sync.networkjob.put.multi", QtInfoMsg)
PutMultiFileJob::~PutMultiFileJob() = default;
void PutMultiFileJob::start()
{
QNetworkRequest req;
for(auto &oneDevice : _devices) {
auto onePart = QHttpPart{};
onePart.setBodyDevice(oneDevice._device.get());
for (QMap<QByteArray, QByteArray>::const_iterator it = oneDevice._headers.begin(); it != oneDevice._headers.end(); ++it) {
onePart.setRawHeader(it.key(), it.value());
}
req.setPriority(QNetworkRequest::LowPriority); // Long uploads must not block non-propagation jobs.
_body.append(onePart);
}
sendRequest("POST", _url, req, &_body);
if (reply()->error() != QNetworkReply::NoError) {
qCWarning(lcPutMultiFileJob) << " Network error: " << reply()->errorString();
}
connect(reply(), &QNetworkReply::uploadProgress, this, &PutMultiFileJob::uploadProgress);
connect(this, &AbstractNetworkJob::networkActivity, account().data(), &Account::propagatorNetworkActivity);
_requestTimer.start();
AbstractNetworkJob::start();
}
bool PutMultiFileJob::finished()
{
for(const auto &oneDevice : _devices) {
oneDevice._device->close();
}
qCInfo(lcPutMultiFileJob) << "POST of" << reply()->request().url().toString() << path() << "FINISHED WITH STATUS"
<< replyStatusString()
<< reply()->attribute(QNetworkRequest::HttpStatusCodeAttribute)
<< reply()->attribute(QNetworkRequest::HttpReasonPhraseAttribute);
emit finishedSignal();
return true;
}
}

View file

@ -0,0 +1,94 @@
/*
* Copyright 2021 (c) Matthieu Gallien <matthieu.gallien@nextcloud.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* for more details.
*/
#pragma once
#include "abstractnetworkjob.h"
#include "propagateupload.h"
#include "account.h"
#include <QLoggingCategory>
#include <QMap>
#include <QByteArray>
#include <QUrl>
#include <QString>
#include <QElapsedTimer>
#include <QHttpMultiPart>
#include <memory>
class QIODevice;
namespace OCC {
Q_DECLARE_LOGGING_CATEGORY(lcPutMultiFileJob)
struct SingleUploadFileData
{
std::unique_ptr<UploadDevice> _device;
QMap<QByteArray, QByteArray> _headers;
};
/**
* @brief The PutMultiFileJob class
* @ingroup libsync
*/
class OWNCLOUDSYNC_EXPORT PutMultiFileJob : public AbstractNetworkJob
{
Q_OBJECT
public:
explicit PutMultiFileJob(AccountPtr account, const QUrl &url,
std::vector<SingleUploadFileData> devices, QObject *parent = nullptr)
: AbstractNetworkJob(account, {}, parent)
, _devices(std::move(devices))
, _url(url)
{
_body.setContentType(QHttpMultiPart::RelatedType);
for(auto &singleDevice : _devices) {
singleDevice._device->setParent(this);
connect(this, &PutMultiFileJob::uploadProgress,
singleDevice._device.get(), &UploadDevice::slotJobUploadProgress);
}
}
~PutMultiFileJob() override;
void start() override;
bool finished() override;
QString errorString() const override
{
return _errorString.isEmpty() ? AbstractNetworkJob::errorString() : _errorString;
}
std::chrono::milliseconds msSinceStart() const
{
return std::chrono::milliseconds(_requestTimer.elapsed());
}
signals:
void finishedSignal();
void uploadProgress(qint64, qint64);
private:
QHttpMultiPart _body;
std::vector<SingleUploadFileData> _devices;
QString _errorString;
QUrl _url;
QElapsedTimer _requestTimer;
};
}

View file

@ -711,7 +711,7 @@ void SyncEngine::slotDiscoveryFinished()
_journal->commit(QStringLiteral("post treewalk"));
_propagator = QSharedPointer<OwncloudPropagator>(
new OwncloudPropagator(_account, _localPath, _remotePath, _journal));
new OwncloudPropagator(_account, _localPath, _remotePath, _journal, _bulkUploadBlackList));
_propagator->setSyncOptions(_syncOptions);
connect(_propagator.data(), &OwncloudPropagator::itemCompleted,
this, &SyncEngine::slotItemCompleted);

View file

@ -241,6 +241,8 @@ private:
QScopedPointer<DiscoveryPhase> _discoveryPhase;
QSharedPointer<OwncloudPropagator> _propagator;
QSet<QString> _bulkUploadBlackList;
// List of all files with conflicts
QSet<QString> _seenConflictFiles;

View file

@ -9,6 +9,10 @@
#include "httplogger.h"
#include "accessmanager.h"
#include <QJsonDocument>
#include <QJsonArray>
#include <QJsonObject>
#include <QJsonValue>
#include <memory>
@ -416,6 +420,109 @@ void FakePutReply::abort()
emit finished();
}
FakePutMultiFileReply::FakePutMultiFileReply(FileInfo &remoteRootFileInfo, QNetworkAccessManager::Operation op, const QNetworkRequest &request, const QString &contentType, const QByteArray &putPayload, QObject *parent)
: FakeReply { parent }
{
setRequest(request);
setUrl(request.url());
setOperation(op);
open(QIODevice::ReadOnly);
_allFileInfo = performMultiPart(remoteRootFileInfo, request, putPayload, contentType);
QMetaObject::invokeMethod(this, "respond", Qt::QueuedConnection);
}
QVector<FileInfo *> FakePutMultiFileReply::performMultiPart(FileInfo &remoteRootFileInfo, const QNetworkRequest &request, const QByteArray &putPayload, const QString &contentType)
{
QVector<FileInfo *> result;
auto stringPutPayload = QString::fromUtf8(putPayload);
constexpr int boundaryPosition = sizeof("multipart/related; boundary=");
const QString boundaryValue = QStringLiteral("--") + contentType.mid(boundaryPosition, contentType.length() - boundaryPosition - 1) + QStringLiteral("\r\n");
auto stringPutPayloadRef = QString{stringPutPayload}.left(stringPutPayload.size() - 2 - boundaryValue.size());
auto allParts = stringPutPayloadRef.split(boundaryValue, Qt::SkipEmptyParts);
for (const auto &onePart : allParts) {
auto headerEndPosition = onePart.indexOf(QStringLiteral("\r\n\r\n"));
auto onePartHeaderPart = onePart.left(headerEndPosition);
auto onePartBody = onePart.mid(headerEndPosition + 4, onePart.size() - headerEndPosition - 6);
auto onePartHeaders = onePartHeaderPart.split(QStringLiteral("\r\n"));
QMap<QString, QString> allHeaders;
for(auto oneHeader : onePartHeaders) {
auto headerParts = oneHeader.split(QStringLiteral(": "));
allHeaders[headerParts.at(0)] = headerParts.at(1);
}
auto fileName = allHeaders[QStringLiteral("X-File-Path")];
Q_ASSERT(!fileName.isEmpty());
FileInfo *fileInfo = remoteRootFileInfo.find(fileName);
if (fileInfo) {
fileInfo->size = onePartBody.size();
fileInfo->contentChar = onePartBody.at(0).toLatin1();
} else {
// Assume that the file is filled with the same character
fileInfo = remoteRootFileInfo.create(fileName, onePartBody.size(), onePartBody.at(0).toLatin1());
}
fileInfo->lastModified = OCC::Utility::qDateTimeFromTime_t(request.rawHeader("X-OC-Mtime").toLongLong());
remoteRootFileInfo.find(fileName, /*invalidateEtags=*/true);
result.push_back(fileInfo);
}
return result;
}
void FakePutMultiFileReply::respond()
{
QJsonDocument reply;
QJsonObject allFileInfoReply;
qint64 totalSize = 0;
std::for_each(_allFileInfo.begin(), _allFileInfo.end(), [&totalSize](const auto &fileInfo) {
totalSize += fileInfo->size;
});
for(auto fileInfo : qAsConst(_allFileInfo)) {
QJsonObject fileInfoReply;
fileInfoReply.insert("error", QStringLiteral("false"));
fileInfoReply.insert("OC-OperationStatus", fileInfo->operationStatus);
fileInfoReply.insert("X-File-Path", fileInfo->path());
fileInfoReply.insert("OC-ETag", QLatin1String{fileInfo->etag});
fileInfoReply.insert("ETag", QLatin1String{fileInfo->etag});
fileInfoReply.insert("etag", QLatin1String{fileInfo->etag});
fileInfoReply.insert("OC-FileID", QLatin1String{fileInfo->fileId});
fileInfoReply.insert("X-OC-MTime", "accepted"); // Prevents Q_ASSERT(!_runningNow) since we'll call PropagateItemJob::done twice in that case.
emit uploadProgress(fileInfo->size, totalSize);
allFileInfoReply.insert(fileInfo->path(), fileInfoReply);
}
reply.setObject(allFileInfoReply);
_payload = reply.toJson();
setAttribute(QNetworkRequest::HttpStatusCodeAttribute, 200);
setFinished(true);
if (bytesAvailable()) {
emit readyRead();
}
emit metaDataChanged();
emit finished();
}
void FakePutMultiFileReply::abort()
{
setError(OperationCanceledError, QStringLiteral("abort"));
emit finished();
}
qint64 FakePutMultiFileReply::bytesAvailable() const
{
return _payload.size() + QIODevice::bytesAvailable();
}
qint64 FakePutMultiFileReply::readData(char *data, qint64 maxlen)
{
qint64 len = std::min(qint64 { _payload.size() }, maxlen);
std::copy(_payload.cbegin(), _payload.cbegin() + len, data);
_payload.remove(0, static_cast<int>(len));
return len;
}
FakeMkcolReply::FakeMkcolReply(FileInfo &remoteRootFileInfo, QNetworkAccessManager::Operation op, const QNetworkRequest &request, QObject *parent)
: FakeReply { parent }
{
@ -813,43 +920,77 @@ FakeQNAM::FakeQNAM(FileInfo initialRoot)
setCookieJar(new OCC::CookieJar);
}
QJsonObject FakeQNAM::forEachReplyPart(QIODevice *outgoingData,
const QString &contentType,
std::function<QJsonObject (const QMap<QString, QByteArray> &)> replyFunction)
{
auto fullReply = QJsonObject{};
auto putPayload = outgoingData->peek(outgoingData->bytesAvailable());
outgoingData->reset();
auto stringPutPayload = QString::fromUtf8(putPayload);
constexpr int boundaryPosition = sizeof("multipart/related; boundary=");
const QString boundaryValue = QStringLiteral("--") + contentType.mid(boundaryPosition, contentType.length() - boundaryPosition - 1) + QStringLiteral("\r\n");
auto stringPutPayloadRef = QString{stringPutPayload}.left(stringPutPayload.size() - 2 - boundaryValue.size());
auto allParts = stringPutPayloadRef.split(boundaryValue, Qt::SkipEmptyParts);
for (const auto &onePart : qAsConst(allParts)) {
auto headerEndPosition = onePart.indexOf(QStringLiteral("\r\n\r\n"));
auto onePartHeaderPart = onePart.left(headerEndPosition);
auto onePartHeaders = onePartHeaderPart.split(QStringLiteral("\r\n"));
QMap<QString, QByteArray> allHeaders;
for(const auto &oneHeader : qAsConst(onePartHeaders)) {
auto headerParts = oneHeader.split(QStringLiteral(": "));
allHeaders[headerParts.at(0)] = headerParts.at(1).toLatin1();
}
auto reply = replyFunction(allHeaders);
if (reply.contains(QStringLiteral("error")) &&
reply.contains(QStringLiteral("etag"))) {
fullReply.insert(allHeaders[QStringLiteral("X-File-Path")], reply);
}
}
return fullReply;
}
QNetworkReply *FakeQNAM::createRequest(QNetworkAccessManager::Operation op, const QNetworkRequest &request, QIODevice *outgoingData)
{
QNetworkReply *reply = nullptr;
auto newRequest = request;
newRequest.setRawHeader("X-Request-ID", OCC::AccessManager::generateRequestId());
auto contentType = request.header(QNetworkRequest::ContentTypeHeader).toString();
if (_override) {
if (auto _reply = _override(op, newRequest, outgoingData)) {
reply = _reply;
}
}
if (!reply) {
const QString fileName = getFilePathFromUrl(newRequest.url());
Q_ASSERT(!fileName.isNull());
if (_errorPaths.contains(fileName)) {
reply = new FakeErrorReply { op, newRequest, this, _errorPaths[fileName] };
}
reply = overrideReplyWithError(getFilePathFromUrl(newRequest.url()), op, newRequest);
}
if (!reply) { const bool isUpload = newRequest.url().path().startsWith(sUploadUrl.path());
if (!reply) {
const bool isUpload = newRequest.url().path().startsWith(sUploadUrl.path());
FileInfo &info = isUpload ? _uploadFileInfo : _remoteRootFileInfo;
auto verb = newRequest.attribute(QNetworkRequest::CustomVerbAttribute);
if (verb == QLatin1String("PROPFIND"))
if (verb == QLatin1String("PROPFIND")) {
// Ignore outgoingData always returning somethign good enough, works for now.
reply = new FakePropfindReply { info, op, newRequest, this };
else if (verb == QLatin1String("GET") || op == QNetworkAccessManager::GetOperation)
} else if (verb == QLatin1String("GET") || op == QNetworkAccessManager::GetOperation) {
reply = new FakeGetReply { info, op, newRequest, this };
else if (verb == QLatin1String("PUT") || op == QNetworkAccessManager::PutOperation)
} else if (verb == QLatin1String("PUT") || op == QNetworkAccessManager::PutOperation) {
reply = new FakePutReply { info, op, newRequest, outgoingData->readAll(), this };
else if (verb == QLatin1String("MKCOL"))
} else if (verb == QLatin1String("MKCOL")) {
reply = new FakeMkcolReply { info, op, newRequest, this };
else if (verb == QLatin1String("DELETE") || op == QNetworkAccessManager::DeleteOperation)
} else if (verb == QLatin1String("DELETE") || op == QNetworkAccessManager::DeleteOperation) {
reply = new FakeDeleteReply { info, op, newRequest, this };
else if (verb == QLatin1String("MOVE") && !isUpload)
} else if (verb == QLatin1String("MOVE") && !isUpload) {
reply = new FakeMoveReply { info, op, newRequest, this };
else if (verb == QLatin1String("MOVE") && isUpload)
} else if (verb == QLatin1String("MOVE") && isUpload) {
reply = new FakeChunkMoveReply { info, _remoteRootFileInfo, op, newRequest, this };
else {
} else if (verb == QLatin1String("POST") || op == QNetworkAccessManager::PostOperation) {
if (contentType.startsWith(QStringLiteral("multipart/related; boundary="))) {
reply = new FakePutMultiFileReply { info, op, newRequest, contentType, outgoingData->readAll(), this };
}
} else {
qDebug() << verb << outgoingData;
Q_UNREACHABLE();
}
@ -858,6 +999,18 @@ QNetworkReply *FakeQNAM::createRequest(QNetworkAccessManager::Operation op, cons
return reply;
}
QNetworkReply * FakeQNAM::overrideReplyWithError(QString fileName, QNetworkAccessManager::Operation op, QNetworkRequest newRequest)
{
QNetworkReply *reply = nullptr;
Q_ASSERT(!fileName.isNull());
if (_errorPaths.contains(fileName)) {
reply = new FakeErrorReply { op, newRequest, this, _errorPaths[fileName] };
}
return reply;
}
FakeFolder::FakeFolder(const FileInfo &fileTemplate, const OCC::Optional<FileInfo> &localFileInfo, const QString &remotePath)
: _localModifier(_tempDir.path())
{
@ -1079,3 +1232,12 @@ FakeReply::FakeReply(QObject *parent)
}
FakeReply::~FakeReply() = default;
FakeJsonErrorReply::FakeJsonErrorReply(QNetworkAccessManager::Operation op,
const QNetworkRequest &request,
QObject *parent,
int httpErrorCode,
const QJsonDocument &reply)
: FakeErrorReply{ op, request, parent, httpErrorCode, reply.toJson() }
{
}

View file

@ -28,6 +28,8 @@
#include <cookiejar.h>
#include <QTimer>
class QJsonDocument;
/*
* TODO: In theory we should use QVERIFY instead of Q_ASSERT for testing, but this
* only works when directly called from a QTest :-(
@ -148,6 +150,7 @@ public:
void fixupParentPathRecursively();
QString name;
int operationStatus = 200;
bool isDir = true;
bool isShared = false;
OCC::RemotePermissions permissions; // When uset, defaults to everything
@ -214,6 +217,27 @@ public:
qint64 readData(char *, qint64) override { return 0; }
};
class FakePutMultiFileReply : public FakeReply
{
Q_OBJECT
public:
FakePutMultiFileReply(FileInfo &remoteRootFileInfo, QNetworkAccessManager::Operation op, const QNetworkRequest &request, const QString &contentType, const QByteArray &putPayload, QObject *parent);
static QVector<FileInfo *> performMultiPart(FileInfo &remoteRootFileInfo, const QNetworkRequest &request, const QByteArray &putPayload, const QString &contentType);
Q_INVOKABLE virtual void respond();
void abort() override;
qint64 bytesAvailable() const override;
qint64 readData(char *data, qint64 maxlen) override;
private:
QVector<FileInfo *> _allFileInfo;
QByteArray _payload;
};
class FakeMkcolReply : public FakeReply
{
Q_OBJECT
@ -354,6 +378,17 @@ public:
QByteArray _body;
};
class FakeJsonErrorReply : public FakeErrorReply
{
Q_OBJECT
public:
FakeJsonErrorReply(QNetworkAccessManager::Operation op,
const QNetworkRequest &request,
QObject *parent,
int httpErrorCode,
const QJsonDocument &reply = QJsonDocument());
};
// A reply that never responds
class FakeHangingReply : public FakeReply
{
@ -409,6 +444,12 @@ public:
void setOverride(const Override &override) { _override = override; }
QJsonObject forEachReplyPart(QIODevice *outgoingData,
const QString &contentType,
std::function<QJsonObject(const QMap<QString, QByteArray> &)> replyFunction);
QNetworkReply *overrideReplyWithError(QString fileName, Operation op, QNetworkRequest newRequest);
protected:
QNetworkReply *createRequest(Operation op, const QNetworkRequest &request,
QIODevice *outgoingData = nullptr) override;
@ -467,6 +508,11 @@ public:
};
ErrorList serverErrorPaths() { return {_fakeQnam}; }
void setServerOverride(const FakeQNAM::Override &override) { _fakeQnam->setOverride(override); }
QJsonObject forEachReplyPart(QIODevice *outgoingData,
const QString &contentType,
std::function<QJsonObject(const QMap<QString, QByteArray>&)> replyFunction) {
return _fakeQnam->forEachReplyPart(outgoingData, contentType, replyFunction);
}
QString localPath() const;

View file

@ -41,6 +41,18 @@ bool itemDidCompleteSuccessfullyWithExpectedRank(const ItemCompletedSpy &spy, co
return false;
}
int itemSuccessfullyCompletedGetRank(const ItemCompletedSpy &spy, const QString &path)
{
auto itItem = std::find_if(spy.begin(), spy.end(), [&path] (auto currentItem) {
auto item = currentItem[0].template value<OCC::SyncFileItemPtr>();
return item->destination() == path;
});
if (itItem != spy.end()) {
return itItem - spy.begin();
}
return -1;
}
class TestSyncEngine : public QObject
{
Q_OBJECT
@ -106,12 +118,18 @@ private slots:
fakeFolder.syncOnce();
QVERIFY(itemDidCompleteSuccessfullyWithExpectedRank(completeSpy, "Y", 0));
QVERIFY(itemDidCompleteSuccessfullyWithExpectedRank(completeSpy, "Z", 1));
QVERIFY(itemDidCompleteSuccessfullyWithExpectedRank(completeSpy, "Y/d0", 2));
QVERIFY(itemDidCompleteSuccessfullyWithExpectedRank(completeSpy, "Z/d0", 3));
QVERIFY(itemDidCompleteSuccessfullyWithExpectedRank(completeSpy, "A/a0", 4));
QVERIFY(itemDidCompleteSuccessfullyWithExpectedRank(completeSpy, "B/b0", 5));
QVERIFY(itemDidCompleteSuccessfullyWithExpectedRank(completeSpy, "r0", 6));
QVERIFY(itemDidCompleteSuccessfullyWithExpectedRank(completeSpy, "r1", 7));
QVERIFY(itemDidCompleteSuccessfully(completeSpy, "Y/d0"));
QVERIFY(itemSuccessfullyCompletedGetRank(completeSpy, "Y/d0") > 1);
QVERIFY(itemDidCompleteSuccessfully(completeSpy, "Z/d0"));
QVERIFY(itemSuccessfullyCompletedGetRank(completeSpy, "Z/d0") > 1);
QVERIFY(itemDidCompleteSuccessfully(completeSpy, "A/a0"));
QVERIFY(itemSuccessfullyCompletedGetRank(completeSpy, "A/a0") > 1);
QVERIFY(itemDidCompleteSuccessfully(completeSpy, "B/b0"));
QVERIFY(itemSuccessfullyCompletedGetRank(completeSpy, "B/b0") > 1);
QVERIFY(itemDidCompleteSuccessfully(completeSpy, "r0"));
QVERIFY(itemSuccessfullyCompletedGetRank(completeSpy, "r0") > 1);
QVERIFY(itemDidCompleteSuccessfully(completeSpy, "r1"));
QVERIFY(itemSuccessfullyCompletedGetRank(completeSpy, "r1") > 1);
QCOMPARE(fakeFolder.currentLocalState(), fakeFolder.currentRemoteState());
}
@ -492,7 +510,9 @@ private slots:
int remoteQuota = 1000;
int n507 = 0, nPUT = 0;
QObject parent;
fakeFolder.setServerOverride([&](QNetworkAccessManager::Operation op, const QNetworkRequest &request, QIODevice *) -> QNetworkReply * {
fakeFolder.setServerOverride([&](QNetworkAccessManager::Operation op, const QNetworkRequest &request, QIODevice *outgoingData) -> QNetworkReply * {
Q_UNUSED(outgoingData)
if (op == QNetworkAccessManager::PutOperation) {
nPUT++;
if (request.rawHeader("OC-Total-Length").toInt() > remoteQuota) {
@ -778,6 +798,95 @@ private slots:
QCOMPARE(QFileInfo(fakeFolder.localPath() + "foo").lastModified(), datetime);
}
/**
* Checks whether subsequent large uploads are skipped after a 507 error
*/
void testErrorsWithBulkUpload()
{
FakeFolder fakeFolder{ FileInfo::A12_B12_C12_S12() };
fakeFolder.syncEngine().account()->setCapabilities({ { "dav", QVariantMap{ {"bulkupload", "1.0"} } } });
// Disable parallel uploads
SyncOptions syncOptions;
syncOptions._parallelNetworkJobs = 0;
fakeFolder.syncEngine().setSyncOptions(syncOptions);
int nPUT = 0;
int nPOST = 0;
fakeFolder.setServerOverride([&](QNetworkAccessManager::Operation op, const QNetworkRequest &request, QIODevice *outgoingData) -> QNetworkReply * {
auto contentType = request.header(QNetworkRequest::ContentTypeHeader).toString();
if (op == QNetworkAccessManager::PostOperation) {
++nPOST;
if (contentType.startsWith(QStringLiteral("multipart/related; boundary="))) {
auto jsonReplyObject = fakeFolder.forEachReplyPart(outgoingData, contentType, [] (const QMap<QString, QByteArray> &allHeaders) -> QJsonObject {
auto reply = QJsonObject{};
const auto fileName = allHeaders[QStringLiteral("X-File-Path")];
if (fileName.endsWith("A/big2") ||
fileName.endsWith("A/big3") ||
fileName.endsWith("A/big4") ||
fileName.endsWith("A/big5") ||
fileName.endsWith("A/big7") ||
fileName.endsWith("B/big8")) {
reply.insert(QStringLiteral("error"), true);
reply.insert(QStringLiteral("etag"), {});
return reply;
} else {
reply.insert(QStringLiteral("error"), false);
reply.insert(QStringLiteral("etag"), {});
}
return reply;
});
if (jsonReplyObject.size()) {
auto jsonReply = QJsonDocument{};
jsonReply.setObject(jsonReplyObject);
return new FakeJsonErrorReply{op, request, this, 200, jsonReply};
}
return nullptr;
}
} else if (op == QNetworkAccessManager::PutOperation) {
++nPUT;
const auto fileName = getFilePathFromUrl(request.url());
if (fileName.endsWith("A/big2") ||
fileName.endsWith("A/big3") ||
fileName.endsWith("A/big4") ||
fileName.endsWith("A/big5") ||
fileName.endsWith("A/big7") ||
fileName.endsWith("B/big8")) {
return new FakeErrorReply(op, request, this, 412);
}
return nullptr;
}
return nullptr;
});
fakeFolder.localModifier().insert("A/big", 1);
QVERIFY(fakeFolder.syncOnce());
QCOMPARE(nPUT, 0);
QCOMPARE(nPOST, 1);
nPUT = 0;
nPOST = 0;
fakeFolder.localModifier().insert("A/big1", 1); // ok
fakeFolder.localModifier().insert("A/big2", 1); // ko
fakeFolder.localModifier().insert("A/big3", 1); // ko
fakeFolder.localModifier().insert("A/big4", 1); // ko
fakeFolder.localModifier().insert("A/big5", 1); // ko
fakeFolder.localModifier().insert("A/big6", 1); // ok
fakeFolder.localModifier().insert("A/big7", 1); // ko
fakeFolder.localModifier().insert("A/big8", 1); // ok
fakeFolder.localModifier().insert("B/big8", 1); // ko
QVERIFY(!fakeFolder.syncOnce());
QCOMPARE(nPUT, 0);
QCOMPARE(nPOST, 1);
nPUT = 0;
nPOST = 0;
QVERIFY(!fakeFolder.syncOnce());
QCOMPARE(nPUT, 6);
QCOMPARE(nPOST, 0);
}
};
QTEST_GUILESS_MAIN(TestSyncEngine)