Merge pull request #5102 from owncloud/chunking-ng

Chunking ng
This commit is contained in:
Markus Goetz 2016-11-15 15:56:52 +01:00 committed by GitHub
commit 52552a4204
22 changed files with 1394 additions and 485 deletions

View file

@ -50,6 +50,8 @@ set(libsync_SRCS
propagatorjobs.cpp
propagatedownload.cpp
propagateupload.cpp
propagateuploadv1.cpp
propagateuploadng.cpp
propagateremotedelete.cpp
propagateremotemove.cpp
propagateremotemkdir.cpp

View file

@ -75,11 +75,19 @@ AccountPtr Account::sharedFromThis()
return _sharedThis.toStrongRef();
}
QString Account::user() const
{
return _user.isEmpty() ? _credentials->user() : _user;
}
void Account::setUser(const QString &user)
{
_user = user;
}
QString Account::displayName() const
{
auto user = _credentials->user();
QString dn = QString("%1@%2").arg(user, _url.host());
QString dn = QString("%1@%2").arg(user(), _url.host());
int port = url().port();
if (port > 0 && port != 80 && port != 443) {
dn.append(QLatin1Char(':'));

View file

@ -68,6 +68,10 @@ public:
AccountPtr sharedFromThis();
/// The user that can be used in dav url
QString user() const;
void setUser(const QString &user);
/// The name of the account as shown in the toolbar
QString displayName() const;
@ -198,6 +202,7 @@ private:
QWeakPointer<Account> _sharedThis;
QString _id;
QString _user;
QMap<QString, QVariant> _settingsMap;
QUrl _url;
QList<QSslCertificate> _approvedCerts;

View file

@ -107,4 +107,10 @@ QByteArray Capabilities::uploadChecksumType() const
return QByteArray();
}
bool Capabilities::chunkingNg() const
{
return _capabilities["dav"].toMap()["chunking"].toByteArray() >= "1.0";
}
}

View file

@ -39,6 +39,7 @@ public:
bool sharePublicLinkEnforceExpireDate() const;
int sharePublicLinkExpireDateDays() const;
bool shareResharing() const;
bool chunkingNg() const;
/// returns true if the capabilities report notifications
bool notificationsAvailable() const;

View file

@ -229,10 +229,26 @@ void ConnectionValidator::slotCapabilitiesRecieved(const QVariantMap &json)
auto caps = json.value("ocs").toMap().value("data").toMap().value("capabilities");
qDebug() << "Server capabilities" << caps;
_account->setCapabilities(caps.toMap());
reportResult(Connected);
return;
fetchUser();
}
void ConnectionValidator::fetchUser()
{
JsonApiJob *job = new JsonApiJob(_account, QLatin1String("ocs/v1.php/cloud/user"), this);
job->setTimeout(timeoutToUseMsec);
QObject::connect(job, SIGNAL(jsonReceived(QVariantMap, int)), this, SLOT(slotUserFetched(QVariantMap)));
job->start();
}
void ConnectionValidator::slotUserFetched(const QVariantMap &json)
{
QString user = json.value("ocs").toMap().value("data").toMap().value("id").toString();
if (!user.isEmpty()) {
_account->setUser(user);
}
reportResult(Connected);
}
void ConnectionValidator::reportResult(Status status)
{

View file

@ -27,7 +27,7 @@ namespace OCC {
* This is a job-like class to check that the server is up and that we are connected.
* There are two entry points: checkServerAndAuth and checkAuthentication
* checkAuthentication is the quick version that only does the propfind
* while checkServerAndAuth is doing the 3 calls.
* while checkServerAndAuth is doing the 4 calls.
*
* We cannot use the capabilites call to test the login and the password because of
* https://github.com/owncloud/core/issues/12930
@ -60,7 +60,15 @@ namespace OCC {
+-> checkServerCapabilities (cloud/capabilities)
JsonApiJob
|
+-> slotCapabilitiesRecieved --> X
+-> slotCapabilitiesRecieved -+
|
+-----------------------------------+
|
+-> fetchUser
PropfindJob
|
+-> slotUserFetched --> X
\endcode
*/
class OWNCLOUDSYNC_EXPORT ConnectionValidator : public QObject
@ -109,10 +117,12 @@ protected slots:
void slotAuthSuccess();
void slotCapabilitiesRecieved(const QVariantMap&);
void slotUserFetched(const QVariantMap &);
private:
void reportResult(Status status);
void checkServerCapabilities();
void fetchUser();
QStringList _errors;
AccountPtr _account;

View file

@ -106,14 +106,24 @@ MkColJob::MkColJob(AccountPtr account, const QString &path, QObject *parent)
{
}
MkColJob::MkColJob(AccountPtr account, const QUrl &url,
const QMap<QByteArray, QByteArray> &extraHeaders, QObject *parent)
: AbstractNetworkJob(account, QString(), parent), _url(url), _extraHeaders(extraHeaders)
{
}
void MkColJob::start()
{
// add 'Content-Length: 0' header (see https://github.com/owncloud/client/issues/3256)
QNetworkRequest req;
req.setRawHeader("Content-Length", "0");
for(auto it = _extraHeaders.constBegin(); it != _extraHeaders.constEnd(); ++it) {
req.setRawHeader(it.key(), it.value());
}
// assumes ownership
QNetworkReply *reply = davRequest("MKCOL", path(), req);
QNetworkReply *reply = _url.isValid() ? davRequest("MKCOL", _url, req)
: davRequest("MKCOL", path(), req);
setReply(reply);
setupConnections(reply);
AbstractNetworkJob::start();
@ -264,6 +274,11 @@ LsColJob::LsColJob(AccountPtr account, const QString &path, QObject *parent)
{
}
LsColJob::LsColJob(AccountPtr account, const QUrl &url, QObject *parent)
: AbstractNetworkJob(account, QString(), parent), _url(url)
{
}
void LsColJob::setProperties(QList<QByteArray> properties)
{
_properties = properties;
@ -307,7 +322,8 @@ void LsColJob::start()
QBuffer *buf = new QBuffer(this);
buf->setData(xml);
buf->open(QIODevice::ReadOnly);
QNetworkReply *reply = davRequest("PROPFIND", path(), req, buf);
QNetworkReply *reply = _url.isValid() ? davRequest("PROPFIND", _url, req, buf)
: davRequest("PROPFIND", path(), req, buf);
buf->setParent(reply);
setReply(reply);
setupConnections(reply);

View file

@ -62,6 +62,7 @@ class OWNCLOUDSYNC_EXPORT LsColJob : public AbstractNetworkJob {
Q_OBJECT
public:
explicit LsColJob(AccountPtr account, const QString &path, QObject *parent = 0);
explicit LsColJob(AccountPtr account, const QUrl &url, QObject *parent = 0);
void start() Q_DECL_OVERRIDE;
QHash<QString, qint64> _sizes;
@ -87,6 +88,7 @@ private slots:
private:
QList<QByteArray> _properties;
QUrl _url; // Used instead of path() if the url is specified in the constructor
};
/**
@ -170,8 +172,12 @@ private:
*/
class OWNCLOUDSYNC_EXPORT MkColJob : public AbstractNetworkJob {
Q_OBJECT
QUrl _url; // Only used if the constructor taking a url is taken.
QMap<QByteArray, QByteArray> _extraHeaders;
public:
explicit MkColJob(AccountPtr account, const QString &path, QObject *parent = 0);
explicit MkColJob(AccountPtr account, const QUrl &url,
const QMap<QByteArray, QByteArray> &extraHeaders, QObject *parent = 0);
void start() Q_DECL_OVERRIDE;
signals:

View file

@ -270,7 +270,14 @@ PropagateItemJob* OwncloudPropagator::createJob(const SyncFileItemPtr &item) {
job->setDeleteExistingFolder(deleteExisting);
return job;
} else {
auto job = new PropagateUploadFile(this, item);
PropagateUploadFileCommon *job = 0;
static const auto chunkng = qgetenv("OWNCLOUD_CHUNKING_NG");
if (item->_size > chunkSize()
&& (account()->capabilities().chunkingNg() || chunkng == "1") && chunkng != "0") {
job = new PropagateUploadFileNG(this, item);
} else {
job = new PropagateUploadFileV1(this, item);
}
job->setDeleteExisting(deleteExisting);
return job;
}

View file

@ -380,10 +380,11 @@ private:
#if QT_VERSION < QT_VERSION_CHECK(5, 0, 0)
// access to signals which are protected in Qt4
friend class PropagateDownloadFile;
friend class PropagateUploadFile;
friend class PropagateLocalMkdir;
friend class PropagateLocalRename;
friend class PropagateRemoteMove;
friend class PropagateUploadFileV1;
friend class PropagateUploadFileNG;
#endif
};

View file

@ -22,11 +22,14 @@ DeleteJob::DeleteJob(AccountPtr account, const QString& path, QObject* parent)
: AbstractNetworkJob(account, path, parent)
{ }
DeleteJob::DeleteJob(AccountPtr account, const QUrl& url, QObject* parent)
: AbstractNetworkJob(account, QString(), parent), _url(url)
{ }
void DeleteJob::start()
{
QNetworkRequest req;
setReply(davRequest("DELETE", path(), req));
setReply(_url.isValid() ? davRequest("DELETE", _url, req) : davRequest("DELETE", path(), req));
setupConnections(reply());
if( reply()->error() != QNetworkReply::NoError ) {

View file

@ -24,8 +24,10 @@ namespace OCC {
*/
class DeleteJob : public AbstractNetworkJob {
Q_OBJECT
QUrl _url; // Only used if the constructor taking a url is taken.
public:
explicit DeleteJob(AccountPtr account, const QString& path, QObject* parent = 0);
explicit DeleteJob(AccountPtr account, const QUrl& url, QObject* parent = 0);
void start() Q_DECL_OVERRIDE;
bool finished() Q_DECL_OVERRIDE;

View file

@ -28,12 +28,20 @@ MoveJob::MoveJob(AccountPtr account, const QString& path,
: AbstractNetworkJob(account, path, parent), _destination(destination)
{ }
MoveJob::MoveJob(AccountPtr account, const QUrl& url, const QString &destination,
QMap<QByteArray, QByteArray> extraHeaders, QObject* parent)
: AbstractNetworkJob(account, QString(), parent), _destination(destination), _url(url)
, _extraHeaders(extraHeaders)
{ }
void MoveJob::start()
{
QNetworkRequest req;
req.setRawHeader("Destination", QUrl::toPercentEncoding(_destination, "/"));
setReply(davRequest("MOVE", path(), req));
for(auto it = _extraHeaders.constBegin(); it != _extraHeaders.constEnd(); ++it) {
req.setRawHeader(it.key(), it.value());
}
setReply(_url.isValid() ? davRequest("MOVE", _url, req) : davRequest("MOVE", path(), req));
setupConnections(reply());
if( reply()->error() != QNetworkReply::NoError ) {

View file

@ -25,8 +25,12 @@ namespace OCC {
class MoveJob : public AbstractNetworkJob {
Q_OBJECT
const QString _destination;
const QUrl _url; // Only used (instead of path) when the constructor taking an URL is used
QMap<QByteArray, QByteArray> _extraHeaders;
public:
explicit MoveJob(AccountPtr account, const QString& path, const QString &destination, QObject* parent = 0);
explicit MoveJob(AccountPtr account, const QUrl& url, const QString &destination,
QMap<QByteArray, QByteArray> _extraHeaders, QObject* parent = 0);
void start() Q_DECL_OVERRIDE;
bool finished() Q_DECL_OVERRIDE;

View file

@ -72,7 +72,8 @@ void PUTFileJob::start() {
req.setRawHeader(it.key(), it.value());
}
setReply(davRequest("PUT", path(), req, _device.data()));
setReply(_url.isValid() ? davRequest("PUT", _url, req, _device.data())
: davRequest("PUT", path(), req, _device.data()));
setupConnections(reply());
if( reply()->error() != QNetworkReply::NoError ) {
@ -184,7 +185,13 @@ bool PollJob::finished()
return true;
}
void PropagateUploadFile::start()
void PropagateUploadFileCommon::setDeleteExisting(bool enabled)
{
_deleteExisting = enabled;
}
void PropagateUploadFileCommon::start()
{
if (_propagator->_abortRequested.fetchAndAddRelaxed(0)) {
return;
@ -205,7 +212,7 @@ void PropagateUploadFile::start()
job->start();
}
void PropagateUploadFile::slotComputeContentChecksum()
void PropagateUploadFileCommon::slotComputeContentChecksum()
{
if (_propagator->_abortRequested.fetchAndAddRelaxed(0)) {
return;
@ -239,12 +246,7 @@ void PropagateUploadFile::slotComputeContentChecksum()
computeChecksum->start(filePath);
}
void PropagateUploadFile::setDeleteExisting(bool enabled)
{
_deleteExisting = enabled;
}
void PropagateUploadFile::slotComputeTransmissionChecksum(const QByteArray& contentChecksumType, const QByteArray& contentChecksum)
void PropagateUploadFileCommon::slotComputeTransmissionChecksum(const QByteArray& contentChecksumType, const QByteArray& contentChecksum)
{
_item->_contentChecksum = contentChecksum;
_item->_contentChecksumType = contentChecksumType;
@ -276,7 +278,7 @@ void PropagateUploadFile::slotComputeTransmissionChecksum(const QByteArray& cont
computeChecksum->start(filePath);
}
void PropagateUploadFile::slotStartUpload(const QByteArray& transmissionChecksumType, const QByteArray& transmissionChecksum)
void PropagateUploadFileCommon::slotStartUpload(const QByteArray& transmissionChecksumType, const QByteArray& transmissionChecksum)
{
// Remove ourselfs from the list of active job, before any posible call to done()
// When we start chunks, we will add it again, once for every chunks.
@ -322,23 +324,7 @@ void PropagateUploadFile::slotStartUpload(const QByteArray& transmissionChecksum
return;
}
_chunkCount = std::ceil(fileSize/double(chunkSize()));
_startChunk = 0;
_transferId = qrand() ^ _item->_modtime ^ (_item->_size << 16);
const SyncJournalDb::UploadInfo progressInfo = _propagator->_journal->getUploadInfo(_item->_file);
if (progressInfo._valid && Utility::qDateTimeToTime_t(progressInfo._modtime) == _item->_modtime ) {
_startChunk = progressInfo._chunk;
_transferId = progressInfo._transferid;
qDebug() << Q_FUNC_INFO << _item->_file << ": Resuming from chunk " << _startChunk;
}
_currentChunk = 0;
_duration.start();
emit progress(*_item, 0);
this->startNextChunk();
doStartUpload();
}
UploadDevice::UploadDevice(BandwidthManager *bwm)
@ -476,24 +462,64 @@ void UploadDevice::setChoked(bool b) {
}
}
void PropagateUploadFile::startNextChunk()
void PropagateUploadFileCommon::startPollJob(const QString& path)
{
if (_propagator->_abortRequested.fetchAndAddRelaxed(0))
return;
PollJob* job = new PollJob(_propagator->account(), path, _item,
_propagator->_journal, _propagator->_localDir, this);
connect(job, SIGNAL(finishedSignal()), SLOT(slotPollFinished()));
SyncJournalDb::PollInfo info;
info._file = _item->_file;
info._url = path;
info._modtime = _item->_modtime;
_propagator->_journal->setPollInfo(info);
_propagator->_journal->commit("add poll info");
_propagator->_activeJobList.append(this);
job->start();
}
if (! _jobs.isEmpty() && _currentChunk + _startChunk >= _chunkCount - 1) {
// Don't do parallel upload of chunk if this might be the last chunk because the server cannot handle that
// https://github.com/owncloud/core/issues/11106
// We return now and when the _jobs are finished we will proceed with the last chunk
// NOTE: Some other parts of the code such as slotUploadProgress also assume that the last chunk
// is sent last.
void PropagateUploadFileCommon::slotPollFinished()
{
PollJob *job = qobject_cast<PollJob *>(sender());
Q_ASSERT(job);
_propagator->_activeJobList.removeOne(this);
if (job->_item->_status != SyncFileItem::Success) {
_finished = true;
done(job->_item->_status, job->_item->_errorString);
return;
}
quint64 fileSize = _item->_size;
finalize();
}
void PropagateUploadFileCommon::slotJobDestroyed(QObject* job)
{
_jobs.erase(std::remove(_jobs.begin(), _jobs.end(), job) , _jobs.end());
}
void PropagateUploadFileCommon::abort()
{
foreach(auto *job, _jobs) {
if (job->reply()) {
qDebug() << Q_FUNC_INFO << job << this->_item->_file;
job->reply()->abort();
}
}
}
// This function is used whenever there is an error occuring and jobs might be in progress
void PropagateUploadFileCommon::abortWithError(SyncFileItem::Status status, const QString &error)
{
_finished = true;
abort();
done(status, error);
}
QMap<QByteArray, QByteArray> PropagateUploadFileCommon::headers()
{
QMap<QByteArray, QByteArray> headers;
headers["OC-Total-Length"] = QByteArray::number(fileSize);
headers["OC-Async"] = "1";
headers["OC-Chunk-Size"]= QByteArray::number(quint64(chunkSize()));
headers["Content-Type"] = "application/octet-stream";
headers["X-OC-Mtime"] = QByteArray::number(qint64(_item->_modtime));
@ -509,291 +535,20 @@ void PropagateUploadFile::startNextChunk()
}
if (!_item->_etag.isEmpty() && _item->_etag != "empty_etag"
&& _item->_instruction != CSYNC_INSTRUCTION_NEW // On new files never send a If-Match
&& _item->_instruction != CSYNC_INSTRUCTION_TYPE_CHANGE
&& !_deleteExisting
) {
&& _item->_instruction != CSYNC_INSTRUCTION_NEW // On new files never send a If-Match
&& _item->_instruction != CSYNC_INSTRUCTION_TYPE_CHANGE
&& !_deleteExisting
) {
// We add quotes because the owncloud server always adds quotes around the etag, and
// csync_owncloud.c's owncloud_file_id always strips the quotes.
headers["If-Match"] = '"' + _item->_etag + '"';
}
QString path = _item->_file;
UploadDevice *device = new UploadDevice(&_propagator->_bandwidthManager);
qint64 chunkStart = 0;
qint64 currentChunkSize = fileSize;
bool isFinalChunk = false;
if (_chunkCount > 1) {
int sendingChunk = (_currentChunk + _startChunk) % _chunkCount;
// XOR with chunk size to make sure everything goes well if chunk size changes between runs
uint transid = _transferId ^ chunkSize();
qDebug() << "Upload chunk" << sendingChunk << "of" << _chunkCount << "transferid(remote)=" << transid;
path += QString("-chunking-%1-%2-%3").arg(transid).arg(_chunkCount).arg(sendingChunk);
headers["OC-Chunked"] = "1";
chunkStart = chunkSize() * quint64(sendingChunk);
currentChunkSize = chunkSize();
if (sendingChunk == _chunkCount - 1) { // last chunk
currentChunkSize = (fileSize % chunkSize());
if( currentChunkSize == 0 ) { // if the last chunk pretends to be 0, its actually the full chunk size.
currentChunkSize = chunkSize();
}
isFinalChunk = true;
}
} else {
// if there's only one chunk, it's the final one
isFinalChunk = true;
}
if (isFinalChunk && !_transmissionChecksumType.isEmpty()) {
headers[checkSumHeaderC] = makeChecksumHeader(
_transmissionChecksumType, _transmissionChecksum);
}
const QString fileName = _propagator->getFilePath(_item->_file);
if (! device->prepareAndOpen(fileName, chunkStart, currentChunkSize)) {
qDebug() << "ERR: Could not prepare upload device: " << device->errorString();
// If the file is currently locked, we want to retry the sync
// when it becomes available again.
if (FileSystem::isFileLocked(fileName)) {
emit _propagator->seenLockedFile(fileName);
}
// Soft error because this is likely caused by the user modifying his files while syncing
abortWithError( SyncFileItem::SoftError, device->errorString() );
delete device;
return;
}
// job takes ownership of device via a QScopedPointer. Job deletes itself when finishing
PUTFileJob* job = new PUTFileJob(_propagator->account(), _propagator->_remoteFolder + path, device, headers, _currentChunk);
_jobs.append(job);
connect(job, SIGNAL(finishedSignal()), this, SLOT(slotPutFinished()));
connect(job, SIGNAL(uploadProgress(qint64,qint64)), this, SLOT(slotUploadProgress(qint64,qint64)));
connect(job, SIGNAL(uploadProgress(qint64,qint64)), device, SLOT(slotJobUploadProgress(qint64,qint64)));
connect(job, SIGNAL(destroyed(QObject*)), this, SLOT(slotJobDestroyed(QObject*)));
job->start();
_propagator->_activeJobList.append(this);
_currentChunk++;
bool parallelChunkUpload = true;
QByteArray env = qgetenv("OWNCLOUD_PARALLEL_CHUNK");
if (!env.isEmpty()) {
parallelChunkUpload = env != "false" && env != "0";
} else {
int versionNum = _propagator->account()->serverVersionInt();
if (versionNum < 0x080003) {
// Disable parallel chunk upload severs older than 8.0.3 to avoid too many
// internal sever errors (#2743, #2938)
parallelChunkUpload = false;
}
}
if (_currentChunk + _startChunk >= _chunkCount - 1) {
// Don't do parallel upload of chunk if this might be the last chunk because the server cannot handle that
// https://github.com/owncloud/core/issues/11106
parallelChunkUpload = false;
}
if (parallelChunkUpload && (_propagator->_activeJobList.count() < _propagator->maximumActiveJob())
&& _currentChunk < _chunkCount ) {
startNextChunk();
}
if (!parallelChunkUpload || _chunkCount - _currentChunk <= 0) {
emit ready();
}
return headers;
}
void PropagateUploadFile::slotPutFinished()
void PropagateUploadFileCommon::finalize()
{
PUTFileJob *job = qobject_cast<PUTFileJob *>(sender());
Q_ASSERT(job);
slotJobDestroyed(job); // remove it from the _jobs list
qDebug() << Q_FUNC_INFO << job->reply()->request().url() << "FINISHED WITH STATUS"
<< job->reply()->error()
<< (job->reply()->error() == QNetworkReply::NoError ? QLatin1String("") : job->reply()->errorString())
<< job->reply()->attribute(QNetworkRequest::HttpStatusCodeAttribute)
<< job->reply()->attribute(QNetworkRequest::HttpReasonPhraseAttribute);
_propagator->_activeJobList.removeOne(this);
if (_finished) {
// We have sent the finished signal already. We don't need to handle any remaining jobs
return;
}
QNetworkReply::NetworkError err = job->reply()->error();
#if QT_VERSION < QT_VERSION_CHECK(5, 4, 2)
if (err == QNetworkReply::OperationCanceledError && job->reply()->property(owncloudShouldSoftCancelPropertyName).isValid()) {
// Abort the job and try again later.
// This works around a bug in QNAM wich might reuse a non-empty buffer for the next request.
qDebug() << "Forcing job abort on HTTP connection reset with Qt < 5.4.2.";
_propagator->_anotherSyncNeeded = true;
abortWithError(SyncFileItem::SoftError, tr("Forcing job abort on HTTP connection reset with Qt < 5.4.2."));
return;
}
#endif
if (err != QNetworkReply::NoError) {
_item->_httpErrorCode = job->reply()->attribute(QNetworkRequest::HttpStatusCodeAttribute).toInt();
if(checkForProblemsWithShared(_item->_httpErrorCode,
tr("The file was edited locally but is part of a read only share. "
"It is restored and your edit is in the conflict file."))) {
return;
}
QByteArray replyContent = job->reply()->readAll();
qDebug() << replyContent; // display the XML error in the debug
QString errorString = errorMessage(job->errorString(), replyContent);
if (job->reply()->hasRawHeader("OC-ErrorString")) {
errorString = job->reply()->rawHeader("OC-ErrorString");
}
if (_item->_httpErrorCode == 412) {
// Precondition Failed: Maybe the bad etag is in the database, we need to clear the
// parent folder etag so we won't read from DB next sync.
_propagator->_journal->avoidReadFromDbOnNextSync(_item->_file);
_propagator->_anotherSyncNeeded = true;
}
SyncFileItem::Status status = classifyError(err, _item->_httpErrorCode,
&_propagator->_anotherSyncNeeded);
abortWithError(status, errorString);
return;
}
_item->_httpErrorCode = job->reply()->attribute(QNetworkRequest::HttpStatusCodeAttribute).toInt();
// The server needs some time to process the request and provide us with a poll URL
if (_item->_httpErrorCode == 202) {
_finished = true;
QString path = QString::fromUtf8(job->reply()->rawHeader("OC-Finish-Poll"));
if (path.isEmpty()) {
done(SyncFileItem::NormalError, tr("Poll URL missing"));
return;
}
startPollJob(path);
return;
}
// Check the file again post upload.
// Two cases must be considered separately: If the upload is finished,
// the file is on the server and has a changed ETag. In that case,
// the etag has to be properly updated in the client journal, and because
// of that we can bail out here with an error. But we can reschedule a
// sync ASAP.
// But if the upload is ongoing, because not all chunks were uploaded
// yet, the upload can be stopped and an error can be displayed, because
// the server hasn't registered the new file yet.
QByteArray etag = getEtagFromReply(job->reply());
bool finished = etag.length() > 0;
// Check if the file still exists
const QString fullFilePath(_propagator->getFilePath(_item->_file));
if( !FileSystem::fileExists(fullFilePath) ) {
if (!finished) {
abortWithError(SyncFileItem::SoftError, tr("The local file was removed during sync."));
return;
} else {
_propagator->_anotherSyncNeeded = true;
}
}
// Check whether the file changed since discovery.
if (! FileSystem::verifyFileUnchanged(fullFilePath, _item->_size, _item->_modtime)) {
_propagator->_anotherSyncNeeded = true;
if( !finished ) {
abortWithError(SyncFileItem::SoftError, tr("Local file changed during sync."));
// FIXME: the legacy code was retrying for a few seconds.
// and also checking that after the last chunk, and removed the file in case of INSTRUCTION_NEW
return;
}
}
if (!finished) {
// Proceed to next chunk.
if (_currentChunk >= _chunkCount) {
if (!_jobs.empty()) {
// just wait for the other job to finish.
return;
}
_finished = true;
done(SyncFileItem::NormalError, tr("The server did not acknowledge the last chunk. (No e-tag was present)"));
return;
}
// Deletes an existing blacklist entry on successful chunk upload
if (_item->_hasBlacklistEntry) {
_propagator->_journal->wipeErrorBlacklistEntry(_item->_file);
_item->_hasBlacklistEntry = false;
}
SyncJournalDb::UploadInfo pi;
pi._valid = true;
auto currentChunk = job->_chunk;
foreach (auto *job, _jobs) {
// Take the minimum finished one
if (auto putJob = qobject_cast<PUTFileJob*>(job)) {
currentChunk = qMin(currentChunk, putJob->_chunk - 1);
}
}
pi._chunk = (currentChunk + _startChunk + 1) % _chunkCount ; // next chunk to start with
pi._transferid = _transferId;
pi._modtime = Utility::qDateTimeFromTime_t(_item->_modtime);
_propagator->_journal->setUploadInfo(_item->_file, pi);
_propagator->_journal->commit("Upload info");
startNextChunk();
return;
}
// the following code only happens after all chunks were uploaded.
_finished = true;
// the file id should only be empty for new files up- or downloaded
QByteArray fid = job->reply()->rawHeader("OC-FileID");
if( !fid.isEmpty() ) {
if( !_item->_fileId.isEmpty() && _item->_fileId != fid ) {
qDebug() << "WARN: File ID changed!" << _item->_fileId << fid;
}
_item->_fileId = fid;
}
_item->_etag = etag;
_item->_responseTimeStamp = job->responseTimestamp();
if (job->reply()->rawHeader("X-OC-MTime") != "accepted") {
// X-OC-MTime is supported since owncloud 5.0. But not when chunking.
// Normally Owncloud 6 always puts X-OC-MTime
qWarning() << "Server does not support X-OC-MTime" << job->reply()->rawHeader("X-OC-MTime");
// Well, the mtime was not set
done(SyncFileItem::SoftError, "Server does not support X-OC-MTime");
}
// performance logging
_item->_requestDuration = _stopWatch.stop();
qDebug() << "*==* duration UPLOAD" << _item->_size
<< _stopWatch.durationOfLap(QLatin1String("ContentChecksum"))
<< _stopWatch.durationOfLap(QLatin1String("TransmissionChecksum"))
<< _item->_requestDuration;
// The job might stay alive for the whole sync, release this tiny bit of memory.
_stopWatch.reset();
finalize(*_item);
}
void PropagateUploadFile::finalize(const SyncFileItem &copy)
{
// Normally, copy == _item, but when it comes from the UpdateMTimeAndETagJob, we need to do
// some updates
_item->_etag = copy._etag;
_item->_fileId = copy._fileId;
_item->_requestDuration = _duration.elapsed();
_finished = true;
if (!_propagator->_journal->setFileRecord(SyncJournalFileRecord(*_item, _propagator->getFilePath(_item->_file)))) {
@ -807,92 +562,5 @@ void PropagateUploadFile::finalize(const SyncFileItem &copy)
done(SyncFileItem::Success);
}
void PropagateUploadFile::slotUploadProgress(qint64 sent, qint64 total)
{
// Completion is signaled with sent=0, total=0; avoid accidentally
// resetting progress due to the sent being zero by ignoring it.
// finishedSignal() is bound to be emitted soon anyway.
// See https://bugreports.qt.io/browse/QTBUG-44782.
if (sent == 0 && total == 0) {
return;
}
int progressChunk = _currentChunk + _startChunk - 1;
if (progressChunk >= _chunkCount)
progressChunk = _currentChunk - 1;
// amount is the number of bytes already sent by all the other chunks that were sent
// not including this one.
// FIXME: this assumes all chunks have the same size, which is true only if the last chunk
// has not been finished (which should not happen because the last chunk is sent sequentially)
quint64 amount = progressChunk * chunkSize();
sender()->setProperty("byteWritten", sent);
if (_jobs.count() > 1) {
amount -= (_jobs.count() -1) * chunkSize();
foreach (QObject *j, _jobs) {
amount += j->property("byteWritten").toULongLong();
}
} else {
// sender() is the only current job, no need to look at the byteWritten properties
amount += sent;
}
emit progress(*_item, amount);
}
void PropagateUploadFile::startPollJob(const QString& path)
{
PollJob* job = new PollJob(_propagator->account(), path, _item,
_propagator->_journal, _propagator->_localDir, this);
connect(job, SIGNAL(finishedSignal()), SLOT(slotPollFinished()));
SyncJournalDb::PollInfo info;
info._file = _item->_file;
info._url = path;
info._modtime = _item->_modtime;
_propagator->_journal->setPollInfo(info);
_propagator->_journal->commit("add poll info");
_propagator->_activeJobList.append(this);
job->start();
}
void PropagateUploadFile::slotPollFinished()
{
PollJob *job = qobject_cast<PollJob *>(sender());
Q_ASSERT(job);
_propagator->_activeJobList.removeOne(this);
if (job->_item->_status != SyncFileItem::Success) {
_finished = true;
done(job->_item->_status, job->_item->_errorString);
return;
}
finalize(*job->_item);
}
void PropagateUploadFile::slotJobDestroyed(QObject* job)
{
_jobs.erase(std::remove(_jobs.begin(), _jobs.end(), job) , _jobs.end());
}
void PropagateUploadFile::abort()
{
foreach(auto *job, _jobs) {
if (job->reply()) {
qDebug() << Q_FUNC_INFO << job << this->_item->_file;
job->reply()->abort();
}
}
}
// This function is used whenever there is an error occuring and jobs might be in progress
void PropagateUploadFile::abortWithError(SyncFileItem::Status status, const QString &error)
{
_finished = true;
abort();
done(status, error);
}
}

View file

@ -89,12 +89,17 @@ private:
QScopedPointer<QIODevice> _device;
QMap<QByteArray, QByteArray> _headers;
QString _errorString;
QUrl _url;
public:
// Takes ownership of the device
explicit PUTFileJob(AccountPtr account, const QString& path, QIODevice *device,
const QMap<QByteArray, QByteArray> &headers, int chunk, QObject* parent = 0)
: AbstractNetworkJob(account, path, parent), _device(device), _headers(headers), _chunk(chunk) {}
explicit PUTFileJob(AccountPtr account, const QUrl& url, QIODevice *device,
const QMap<QByteArray, QByteArray> &headers, int chunk, QObject* parent = 0)
: AbstractNetworkJob(account, QString(), parent), _device(device), _headers(headers)
, _url(url), _chunk(chunk) {}
~PUTFileJob();
int _chunk;
@ -155,10 +160,90 @@ signals:
};
/**
* @brief The PropagateUploadFile class
* @brief The PropagateUploadFileCommon class is the code common between all chunking algorithms
* @ingroup libsync
*
* State Machine:
*
* +---> start() --> (delete job) -------+
* | |
* +--> slotComputeContentChecksum() <---+
* |
* v
* slotComputeTransmissionChecksum()
* |
* v
* slotStartUpload() -> doStartUpload()
* .
* .
* v
* finalize() or abortWithError() or startPollJob()
*/
class PropagateUploadFile : public PropagateItemJob {
class PropagateUploadFileCommon : public PropagateItemJob {
Q_OBJECT
protected:
QElapsedTimer _duration;
QVector<AbstractNetworkJob*> _jobs; /// network jobs that are currently in transit
bool _finished; /// Tells that all the jobs have been finished
bool _deleteExisting;
// measure the performance of checksum calc and upload
Utility::StopWatch _stopWatch;
QByteArray _transmissionChecksum;
QByteArray _transmissionChecksumType;
public:
PropagateUploadFileCommon(OwncloudPropagator* propagator,const SyncFileItemPtr& item)
: PropagateItemJob(propagator, item), _finished(false), _deleteExisting(false) {}
/**
* Whether an existing entity with the same name may be deleted before
* the upload.
*
* Default: false.
*/
void setDeleteExisting(bool enabled);
void start() Q_DECL_OVERRIDE;
bool isLikelyFinishedQuickly() Q_DECL_OVERRIDE { return _item->_size < 100*1024; }
private slots:
void slotComputeContentChecksum();
// Content checksum computed, compute the transmission checksum
void slotComputeTransmissionChecksum(const QByteArray& contentChecksumType, const QByteArray& contentChecksum);
// transmission checksum computed, prepare the upload
void slotStartUpload(const QByteArray& transmissionChecksumType, const QByteArray& transmissionChecksum);
public:
virtual void doStartUpload() = 0;
void startPollJob(const QString& path);
void finalize();
void abortWithError(SyncFileItem::Status status, const QString &error);
public slots:
void abort() Q_DECL_OVERRIDE;
void slotJobDestroyed(QObject *job);
private slots:
void slotPollFinished();
protected:
// Bases headers that need to be sent with every chunk
QMap<QByteArray, QByteArray> headers();
};
/**
* @ingroup libsync
*
* Propagation job, impementing the old chunking agorithm
*
*/
class PropagateUploadFileV1 : public PropagateUploadFileCommon {
Q_OBJECT
private:
@ -176,51 +261,66 @@ private:
int _currentChunk;
int _chunkCount; /// Total number of chunks for this file
int _transferId; /// transfer id (part of the url)
QElapsedTimer _duration;
QVector<AbstractNetworkJob*> _jobs; /// network jobs that are currently in transit
bool _finished; // Tells that all the jobs have been finished
// measure the performance of checksum calc and upload
Utility::StopWatch _stopWatch;
QByteArray _transmissionChecksum;
QByteArray _transmissionChecksumType;
bool _deleteExisting;
quint64 chunkSize() const { return _propagator->chunkSize(); }
public:
PropagateUploadFile(OwncloudPropagator* propagator,const SyncFileItemPtr& item)
: PropagateItemJob(propagator, item), _startChunk(0), _currentChunk(0), _chunkCount(0), _transferId(0), _finished(false), _deleteExisting(false) {}
void start() Q_DECL_OVERRIDE;
PropagateUploadFileV1(OwncloudPropagator* propagator,const SyncFileItemPtr& item) :
PropagateUploadFileCommon(propagator,item) {}
bool isLikelyFinishedQuickly() Q_DECL_OVERRIDE { return _item->_size < 100*1024; }
/**
* Whether an existing entity with the same name may be deleted before
* the upload.
*
* Default: false.
*/
void setDeleteExisting(bool enabled);
void doStartUpload() Q_DECL_OVERRIDE;
private slots:
void slotPutFinished();
void slotPollFinished();
void slotUploadProgress(qint64,qint64);
void abort() Q_DECL_OVERRIDE;
void startNextChunk();
void finalize(const SyncFileItem&);
void slotJobDestroyed(QObject *job);
void slotStartUpload(const QByteArray& transmissionChecksumType, const QByteArray& transmissionChecksum);
void slotComputeTransmissionChecksum(const QByteArray& contentChecksumType, const QByteArray& contentChecksum);
void slotComputeContentChecksum();
private:
void startPollJob(const QString& path);
void abortWithError(SyncFileItem::Status status, const QString &error);
void slotPutFinished();
void slotUploadProgress(qint64,qint64);
};
/**
* @ingroup libsync
*
* Propagation job, impementing the new chunking agorithm
*
*/
class PropagateUploadFileNG : public PropagateUploadFileCommon {
Q_OBJECT
private:
quint64 _sent; /// amount of data (bytes) that was already sent
uint _transferId; /// transfer id (part of the url)
int _currentChunk; /// Id of the next chunk that will be sent
bool _removeJobError; /// If not null, there was an error removing the job
// Map chunk number with its size from the PROPFIND on resume.
// (Only used from slotPropfindIterate/slotPropfindFinished because the LsColJob use signals to report data.)
QMap<int, quint64> _serverChunks;
quint64 chunkSize() const { return _propagator->chunkSize(); }
/**
* Return the URL of a chunk.
* If chunk == -1, returns the URL of the parent folder containing the chunks
*/
QUrl chunkUrl(int chunk = -1);
public:
PropagateUploadFileNG(OwncloudPropagator* propagator,const SyncFileItemPtr& item) :
PropagateUploadFileCommon(propagator,item) {}
void doStartUpload() Q_DECL_OVERRIDE;
private:
void startNewUpload();
void startNextChunk();
private slots:
void slotPropfindFinished();
void slotPropfindFinishedWithError();
void slotPropfindIterate(const QString &name, const QMap<QString,QString> &properties);
void slotDeleteJobFinished();
void slotMkColFinished(QNetworkReply::NetworkError);
void slotPutFinished();
void slotMoveJobFinished();
void slotUploadProgress(qint64,qint64);
};
}

View file

@ -0,0 +1,488 @@
/*
* Copyright (C) by Olivier Goffart <ogoffart@owncloud.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* for more details.
*/
#include "config.h"
#include "propagateupload.h"
#include "owncloudpropagator_p.h"
#include "networkjobs.h"
#include "account.h"
#include "syncjournaldb.h"
#include "syncjournalfilerecord.h"
#include "utility.h"
#include "filesystem.h"
#include "propagatorjobs.h"
#include "syncengine.h"
#include "propagateremotemove.h"
#include "propagateremotedelete.h"
#include <QNetworkAccessManager>
#include <QFileInfo>
#include <QDir>
#include <cmath>
#include <cstring>
namespace OCC {
QUrl PropagateUploadFileNG::chunkUrl(int chunk)
{
QString path = QLatin1String("remote.php/dav/uploads/")
+ _propagator->account()->user()
+ QLatin1Char('/') + QString::number(_transferId);
if (chunk >= 0) {
path += QLatin1Char('/') + QString::number(chunk);
}
return Utility::concatUrlPath(_propagator->account()->url(), path);
}
/*
State machine:
*----> doStartUpload()
Check the db: is there an entry?
/ \
no yes
/ \
/ PROPFIND
startNewUpload() <-+ +----------------------------\
| | | \
MKCOL + slotPropfindFinishedWithError() slotPropfindFinished()
| Is there stale files to remove?
slotMkColFinished() | |
| no yes
| | |
| | DeleteJob
| | |
+-----+<------------------------------------------------------+<--- slotDeleteJobFinished()
|
+----> startNextChunk() ---finished? --+
^ | |
+---------------+ |
|
+----------------------------------------+
|
+-> MOVE ------> moveJobFinished() ---> finalize()
*/
void PropagateUploadFileNG::doStartUpload()
{
_duration.start();
_propagator->_activeJobList.append(this);
const SyncJournalDb::UploadInfo progressInfo = _propagator->_journal->getUploadInfo(_item->_file);
if (progressInfo._valid && Utility::qDateTimeToTime_t(progressInfo._modtime) == _item->_modtime ) {
_transferId = progressInfo._transferid;
auto url = chunkUrl();
auto job = new LsColJob(_propagator->account(), url, this);
_jobs.append(job);
job->setProperties(QList<QByteArray>() << "resourcetype" << "getcontentlength");
connect(job, SIGNAL(finishedWithoutError()), this, SLOT(slotPropfindFinished()));
connect(job, SIGNAL(finishedWithError(QNetworkReply*)),
this, SLOT(slotPropfindFinishedWithError()));
connect(job, SIGNAL(destroyed(QObject*)), this, SLOT(slotJobDestroyed(QObject*)));
connect(job, SIGNAL(directoryListingIterated(QString,QMap<QString,QString>)),
this, SLOT(slotPropfindIterate(QString,QMap<QString,QString>)));
job->start();
return;
}
startNewUpload();
}
void PropagateUploadFileNG::slotPropfindIterate(const QString &name, const QMap<QString,QString> &properties)
{
if (name == chunkUrl().path()) {
return; // skip the info about the path itself
}
bool ok = false;
auto chunkId = name.mid(name.lastIndexOf('/')+1).toUInt(&ok);
if (ok) {
_serverChunks[chunkId] = properties["getcontentlength"].toULongLong();
}
}
void PropagateUploadFileNG::slotPropfindFinished()
{
auto job = qobject_cast<LsColJob *>(sender());
slotJobDestroyed(job); // remove it from the _jobs list
_propagator->_activeJobList.removeOne(this);
_currentChunk = 0;
_sent = 0;
while (_serverChunks.contains(_currentChunk)) {
_sent += _serverChunks[_currentChunk];
_serverChunks.remove(_currentChunk);
++_currentChunk;
}
if (_sent > _item->_size) {
// Normally this can't happen because the size is xor'ed with the transfer id, and it is
// therefore impossible that there is more data on the server than on the file.
qWarning() << "Inconsistency while resuming " << _item->_file
<< ": the size on the server (" << _sent << ") is bigger than the size of the file ("
<< _item->_size << ")";
startNewUpload();
return;
}
qDebug() << "Resuming "<< _item->_file << " from chunk " << _currentChunk << "; sent ="<< _sent;
if (!_serverChunks.isEmpty()) {
qDebug() << "To Delete" << _serverChunks;
_propagator->_activeJobList.append(this);
_removeJobError = false;
// Make sure that if there is a "hole" and then a few more chunks, on the server
// we should remove the later chunks. Otherwise when we do dynamic chunk sizing, we may end up
// with corruptions if there are too many chunks, or if we abort and there are still stale chunks.
for (auto it = _serverChunks.begin(); it != _serverChunks.end(); ++it) {
auto job = new DeleteJob(_propagator->account(), Utility::concatUrlPath(chunkUrl(), QString::number(it.key())), this);
QObject::connect(job, SIGNAL(finishedSignal()), this, SLOT(slotDeleteJobFinished()));
_jobs.append(job);
job->start();
}
return;
}
startNextChunk();
}
void PropagateUploadFileNG::slotPropfindFinishedWithError()
{
auto job = qobject_cast<LsColJob *>(sender());
slotJobDestroyed(job); // remove it from the _jobs list
QNetworkReply::NetworkError err = job->reply()->error();
auto httpErrorCode = job->reply()->attribute(QNetworkRequest::HttpStatusCodeAttribute).toInt();
auto status = classifyError(err, httpErrorCode, &_propagator->_anotherSyncNeeded);
if (status == SyncFileItem::FatalError) {
_propagator->_activeJobList.removeOne(this);
QString errorString = errorMessage(job->reply()->errorString(), job->reply()->readAll());
abortWithError(status, errorString);
return;
}
startNewUpload();
}
void PropagateUploadFileNG::slotDeleteJobFinished()
{
auto job = qobject_cast<DeleteJob *>(sender());
Q_ASSERT(job);
_jobs.remove(_jobs.indexOf(job));
QNetworkReply::NetworkError err = job->reply()->error();
if (err != QNetworkReply::NoError && err != QNetworkReply::ContentNotFoundError) {
const int httpStatus = job->reply()->attribute(QNetworkRequest::HttpStatusCodeAttribute).toInt();
SyncFileItem::Status status = classifyError(err, httpStatus);
if (status == SyncFileItem::FatalError) {
abortWithError(status, job->errorString());
return;
} else {
qWarning() << "DeleteJob errored out" << job->errorString() << job->reply()->url();
_removeJobError = true;
// Let the other jobs finish
}
}
if (_jobs.isEmpty()) {
_propagator->_activeJobList.removeOne(this);
if (_removeJobError) {
// There was an error removing some files, just start over
startNewUpload();
} else {
startNextChunk();
}
}
}
void PropagateUploadFileNG::startNewUpload()
{
Q_ASSERT(_propagator->_activeJobList.count(this) == 1);
_transferId = qrand() ^ _item->_modtime ^ (_item->_size << 16) ^ qHash(_item->_file);
_sent = 0;
_currentChunk = 0;
emit progress(*_item, 0);
SyncJournalDb::UploadInfo pi;
pi._valid = true;
pi._transferid = _transferId;
pi._modtime = Utility::qDateTimeFromTime_t(_item->_modtime);
_propagator->_journal->setUploadInfo(_item->_file, pi);
_propagator->_journal->commit("Upload info");
QMap<QByteArray, QByteArray> headers;
headers["OC-Total-Length"] = QByteArray::number(_item->_size);
auto job = new MkColJob(_propagator->account(), chunkUrl(), headers, this);
connect(job, SIGNAL(finished(QNetworkReply::NetworkError)),
this, SLOT(slotMkColFinished(QNetworkReply::NetworkError)));
connect(job, SIGNAL(destroyed(QObject*)), this, SLOT(slotJobDestroyed(QObject*)));
job->start();
}
void PropagateUploadFileNG::slotMkColFinished(QNetworkReply::NetworkError)
{
_propagator->_activeJobList.removeOne(this);
auto job = qobject_cast<MkColJob *>(sender());
slotJobDestroyed(job); // remove it from the _jobs list
QNetworkReply::NetworkError err = job->reply()->error();
_item->_httpErrorCode = job->reply()->attribute(QNetworkRequest::HttpStatusCodeAttribute).toInt();
if (err != QNetworkReply::NoError || _item->_httpErrorCode != 201) {
SyncFileItem::Status status = classifyError(err, _item->_httpErrorCode,
&_propagator->_anotherSyncNeeded);
QString errorString = errorMessage(job->reply()->errorString(), job->reply()->readAll());
if (job->reply()->hasRawHeader("OC-ErrorString")) {
errorString = job->reply()->rawHeader("OC-ErrorString");
}
abortWithError(status, errorString);
return;
}
startNextChunk();
}
void PropagateUploadFileNG::startNextChunk()
{
if (_propagator->_abortRequested.fetchAndAddRelaxed(0))
return;
quint64 fileSize = _item->_size;
Q_ASSERT(fileSize >= _sent);
quint64 currentChunkSize = qMin(chunkSize(), fileSize - _sent);
if (currentChunkSize == 0) {
Q_ASSERT(_jobs.isEmpty()); // There should be no running job anymore
_finished = true;
// Finish with a MOVE
QString destination = _propagator->account()->url().path()
+ QLatin1String("/remote.php/dav/files/") + _propagator->account()->user()
+ _propagator->_remoteFolder + _item->_file;
auto headers = PropagateUploadFileCommon::headers();
// "If-Match applies to the source, but we are interested in comparing the etag of the destination
auto ifMatch = headers.take("If-Match");
if (!ifMatch.isEmpty()) {
headers["If"] = "<" + destination.toUtf8() + "> ([" + ifMatch + "])";
}
if (!_transmissionChecksumType.isEmpty()) {
headers[checkSumHeaderC] = makeChecksumHeader(
_transmissionChecksumType, _transmissionChecksum);
}
auto job = new MoveJob(_propagator->account(), Utility::concatUrlPath(chunkUrl(), "/.file"),
destination, headers, this);
_jobs.append(job);
connect(job, SIGNAL(finishedSignal()), this, SLOT(slotMoveJobFinished()));
connect(job, SIGNAL(destroyed(QObject*)), this, SLOT(slotJobDestroyed(QObject*)));
_propagator->_activeJobList.append(this);
job->start();
return;
}
auto device = new UploadDevice(&_propagator->_bandwidthManager);
const QString fileName = _propagator->getFilePath(_item->_file);
if (! device->prepareAndOpen(fileName, _sent, currentChunkSize)) {
qDebug() << "ERR: Could not prepare upload device: " << device->errorString();
// If the file is currently locked, we want to retry the sync
// when it becomes available again.
if (FileSystem::isFileLocked(fileName)) {
emit _propagator->seenLockedFile(fileName);
}
// Soft error because this is likely caused by the user modifying his files while syncing
abortWithError( SyncFileItem::SoftError, device->errorString() );
return;
}
QMap<QByteArray, QByteArray> headers;
headers["OC-Chunk-Offset"] = QByteArray::number(_sent);
_sent += currentChunkSize;
QUrl url = chunkUrl(_currentChunk);
// job takes ownership of device via a QScopedPointer. Job deletes itself when finishing
PUTFileJob* job = new PUTFileJob(_propagator->account(), url, device, headers, _currentChunk);
_jobs.append(job);
connect(job, SIGNAL(finishedSignal()), this, SLOT(slotPutFinished()));
connect(job, SIGNAL(uploadProgress(qint64,qint64)),
this, SLOT(slotUploadProgress(qint64,qint64)));
connect(job, SIGNAL(uploadProgress(qint64,qint64)),
device, SLOT(slotJobUploadProgress(qint64,qint64)));
connect(job, SIGNAL(destroyed(QObject*)), this, SLOT(slotJobDestroyed(QObject*)));
job->start();
_propagator->_activeJobList.append(this);
_currentChunk++;
// FIXME! parallel chunk?
}
void PropagateUploadFileNG::slotPutFinished()
{
PUTFileJob *job = qobject_cast<PUTFileJob *>(sender());
Q_ASSERT(job);
slotJobDestroyed(job); // remove it from the _jobs list
qDebug() << job->reply()->request().url() << "FINISHED WITH STATUS"
<< job->reply()->error()
<< (job->reply()->error() == QNetworkReply::NoError ? QLatin1String("") : job->reply()->errorString())
<< job->reply()->attribute(QNetworkRequest::HttpStatusCodeAttribute)
<< job->reply()->attribute(QNetworkRequest::HttpReasonPhraseAttribute);
_propagator->_activeJobList.removeOne(this);
if (_finished) {
// We have sent the finished signal already. We don't need to handle any remaining jobs
return;
}
QNetworkReply::NetworkError err = job->reply()->error();
#if QT_VERSION < QT_VERSION_CHECK(5, 4, 2)
if (err == QNetworkReply::OperationCanceledError && job->reply()->property("owncloud-should-soft-cancel").isValid()) {
// Abort the job and try again later.
// This works around a bug in QNAM wich might reuse a non-empty buffer for the next request.
qDebug() << "Forcing job abort on HTTP connection reset with Qt < 5.4.2.";
_propagator->_anotherSyncNeeded = true;
abortWithError(SyncFileItem::SoftError, tr("Forcing job abort on HTTP connection reset with Qt < 5.4.2."));
return;
}
#endif
if (err != QNetworkReply::NoError) {
_item->_httpErrorCode = job->reply()->attribute(QNetworkRequest::HttpStatusCodeAttribute).toInt();
QByteArray replyContent = job->reply()->readAll();
qDebug() << replyContent; // display the XML error in the debug
QString errorString = errorMessage(job->errorString(), replyContent);
if (job->reply()->hasRawHeader("OC-ErrorString")) {
errorString = job->reply()->rawHeader("OC-ErrorString");
}
// FIXME! can this happen for the chunks?
if (_item->_httpErrorCode == 412) {
// Precondition Failed: Maybe the bad etag is in the database, we need to clear the
// parent folder etag so we won't read from DB next sync.
_propagator->_journal->avoidReadFromDbOnNextSync(_item->_file);
_propagator->_anotherSyncNeeded = true;
}
SyncFileItem::Status status = classifyError(err, _item->_httpErrorCode,
&_propagator->_anotherSyncNeeded);
abortWithError(status, errorString);
return;
}
Q_ASSERT(_sent <= _item->_size);
bool finished = _sent == _item->_size;
// Check if the file still exists
const QString fullFilePath(_propagator->getFilePath(_item->_file));
if( !FileSystem::fileExists(fullFilePath) ) {
if (!finished) {
abortWithError(SyncFileItem::SoftError, tr("The local file was removed during sync."));
return;
} else {
_propagator->_anotherSyncNeeded = true;
}
}
// Check whether the file changed since discovery.
if (! FileSystem::verifyFileUnchanged(fullFilePath, _item->_size, _item->_modtime)) {
_propagator->_anotherSyncNeeded = true;
if( !finished ) {
abortWithError(SyncFileItem::SoftError, tr("Local file changed during sync."));
return;
}
}
if (!finished) {
// Deletes an existing blacklist entry on successful chunk upload
if (_item->_hasBlacklistEntry) {
_propagator->_journal->wipeErrorBlacklistEntry(_item->_file);
_item->_hasBlacklistEntry = false;
}
}
startNextChunk();
}
void PropagateUploadFileNG::slotMoveJobFinished()
{
_propagator->_activeJobList.removeOne(this);
auto job = qobject_cast<MoveJob *>(sender());
slotJobDestroyed(job); // remove it from the _jobs list
QNetworkReply::NetworkError err = job->reply()->error();
_item->_httpErrorCode = job->reply()->attribute(QNetworkRequest::HttpStatusCodeAttribute).toInt();
if (err != QNetworkReply::NoError) {
SyncFileItem::Status status = classifyError(err, _item->_httpErrorCode,
&_propagator->_anotherSyncNeeded);
QString errorString = errorMessage(job->errorString(), job->reply()->readAll());
abortWithError(status, errorString);
return;
}
if (_item->_httpErrorCode != 201 && _item->_httpErrorCode != 204) {
abortWithError(SyncFileItem::NormalError, tr("Unexpected return code from server (%1)").arg(_item->_httpErrorCode));
return;
}
QByteArray fid = job->reply()->rawHeader("OC-FileID");
if(fid.isEmpty()) {
qWarning() << "Server did not return a OC-FileID" << _item->_file;
abortWithError(SyncFileItem::NormalError, tr("Missing File ID from server"));
return;
} else {
// the old file id should only be empty for new files uploaded
if( !_item->_fileId.isEmpty() && _item->_fileId != fid ) {
qDebug() << "WARN: File ID changed!" << _item->_fileId << fid;
}
_item->_fileId = fid;
}
_item->_etag = getEtagFromReply(job->reply());;
if (_item->_etag.isEmpty()) {
qWarning() << "Server did not return an ETAG" << _item->_file;
abortWithError(SyncFileItem::NormalError, tr("Missing ETag from server"));
return;
}
_item->_responseTimeStamp = job->responseTimestamp();
// performance logging
_item->_requestDuration = _stopWatch.stop();
qDebug() << "*==* duration UPLOAD" << _item->_size
<< _stopWatch.durationOfLap(QLatin1String("ContentChecksum"))
<< _stopWatch.durationOfLap(QLatin1String("TransmissionChecksum"))
<< _item->_requestDuration;
// The job might stay alive for the whole sync, release this tiny bit of memory.
_stopWatch.reset();
finalize();
}
void PropagateUploadFileNG::slotUploadProgress(qint64 sent, qint64 total)
{
// Completion is signaled with sent=0, total=0; avoid accidentally
// resetting progress due to the sent being zero by ignoring it.
// finishedSignal() is bound to be emitted soon anyway.
// See https://bugreports.qt.io/browse/QTBUG-44782.
if (sent == 0 && total == 0) {
return;
}
emit progress(*_item, _sent + sent - total);
}
}

View file

@ -0,0 +1,375 @@
/*
* Copyright (C) by Olivier Goffart <ogoffart@owncloud.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* for more details.
*/
#include "config.h"
#include "propagateupload.h"
#include "owncloudpropagator_p.h"
#include "networkjobs.h"
#include "account.h"
#include "syncjournaldb.h"
#include "syncjournalfilerecord.h"
#include "utility.h"
#include "filesystem.h"
#include "propagatorjobs.h"
#include "checksums.h"
#include "syncengine.h"
#include "propagateremotedelete.h"
#include <json.h>
#include <QNetworkAccessManager>
#include <QFileInfo>
#include <QDir>
#include <cmath>
#include <cstring>
namespace OCC {
void PropagateUploadFileV1::doStartUpload()
{
_chunkCount = std::ceil(_item->_size / double(chunkSize()));
_startChunk = 0;
_transferId = qrand() ^ _item->_modtime ^ (_item->_size << 16);
const SyncJournalDb::UploadInfo progressInfo = _propagator->_journal->getUploadInfo(_item->_file);
if (progressInfo._valid && Utility::qDateTimeToTime_t(progressInfo._modtime) == _item->_modtime ) {
_startChunk = progressInfo._chunk;
_transferId = progressInfo._transferid;
qDebug() << Q_FUNC_INFO << _item->_file << ": Resuming from chunk " << _startChunk;
}
_currentChunk = 0;
_duration.start();
emit progress(*_item, 0);
startNextChunk();
}
void PropagateUploadFileV1::startNextChunk()
{
if (_propagator->_abortRequested.fetchAndAddRelaxed(0))
return;
if (! _jobs.isEmpty() && _currentChunk + _startChunk >= _chunkCount - 1) {
// Don't do parallel upload of chunk if this might be the last chunk because the server cannot handle that
// https://github.com/owncloud/core/issues/11106
// We return now and when the _jobs are finished we will proceed with the last chunk
// NOTE: Some other parts of the code such as slotUploadProgress also assume that the last chunk
// is sent last.
return;
}
quint64 fileSize = _item->_size;
auto headers = PropagateUploadFileCommon::headers();
headers["OC-Total-Length"] = QByteArray::number(fileSize);
headers["OC-Chunk-Size"]= QByteArray::number(quint64(chunkSize()));
QString path = _item->_file;
UploadDevice *device = new UploadDevice(&_propagator->_bandwidthManager);
qint64 chunkStart = 0;
qint64 currentChunkSize = fileSize;
bool isFinalChunk = false;
if (_chunkCount > 1) {
int sendingChunk = (_currentChunk + _startChunk) % _chunkCount;
// XOR with chunk size to make sure everything goes well if chunk size changes between runs
uint transid = _transferId ^ chunkSize();
qDebug() << "Upload chunk" << sendingChunk << "of" << _chunkCount << "transferid(remote)=" << transid;
path += QString("-chunking-%1-%2-%3").arg(transid).arg(_chunkCount).arg(sendingChunk);
headers["OC-Chunked"] = "1";
chunkStart = chunkSize() * quint64(sendingChunk);
currentChunkSize = chunkSize();
if (sendingChunk == _chunkCount - 1) { // last chunk
currentChunkSize = (fileSize % chunkSize());
if( currentChunkSize == 0 ) { // if the last chunk pretends to be 0, its actually the full chunk size.
currentChunkSize = chunkSize();
}
isFinalChunk = true;
}
} else {
// if there's only one chunk, it's the final one
isFinalChunk = true;
}
if (isFinalChunk && !_transmissionChecksumType.isEmpty()) {
headers[checkSumHeaderC] = makeChecksumHeader(
_transmissionChecksumType, _transmissionChecksum);
}
const QString fileName = _propagator->getFilePath(_item->_file);
if (! device->prepareAndOpen(fileName, chunkStart, currentChunkSize)) {
qDebug() << "ERR: Could not prepare upload device: " << device->errorString();
// If the file is currently locked, we want to retry the sync
// when it becomes available again.
if (FileSystem::isFileLocked(fileName)) {
emit _propagator->seenLockedFile(fileName);
}
// Soft error because this is likely caused by the user modifying his files while syncing
abortWithError( SyncFileItem::SoftError, device->errorString() );
delete device;
return;
}
// job takes ownership of device via a QScopedPointer. Job deletes itself when finishing
PUTFileJob* job = new PUTFileJob(_propagator->account(), _propagator->_remoteFolder + path, device, headers, _currentChunk);
_jobs.append(job);
connect(job, SIGNAL(finishedSignal()), this, SLOT(slotPutFinished()));
connect(job, SIGNAL(uploadProgress(qint64,qint64)), this, SLOT(slotUploadProgress(qint64,qint64)));
connect(job, SIGNAL(uploadProgress(qint64,qint64)), device, SLOT(slotJobUploadProgress(qint64,qint64)));
connect(job, SIGNAL(destroyed(QObject*)), this, SLOT(slotJobDestroyed(QObject*)));
job->start();
_propagator->_activeJobList.append(this);
_currentChunk++;
bool parallelChunkUpload = true;
QByteArray env = qgetenv("OWNCLOUD_PARALLEL_CHUNK");
if (!env.isEmpty()) {
parallelChunkUpload = env != "false" && env != "0";
} else {
int versionNum = _propagator->account()->serverVersionInt();
if (versionNum < 0x080003) {
// Disable parallel chunk upload severs older than 8.0.3 to avoid too many
// internal sever errors (#2743, #2938)
parallelChunkUpload = false;
}
}
if (_currentChunk + _startChunk >= _chunkCount - 1) {
// Don't do parallel upload of chunk if this might be the last chunk because the server cannot handle that
// https://github.com/owncloud/core/issues/11106
parallelChunkUpload = false;
}
if (parallelChunkUpload && (_propagator->_activeJobList.count() < _propagator->maximumActiveJob())
&& _currentChunk < _chunkCount ) {
startNextChunk();
}
if (!parallelChunkUpload || _chunkCount - _currentChunk <= 0) {
emit ready();
}
}
void PropagateUploadFileV1::slotPutFinished()
{
PUTFileJob *job = qobject_cast<PUTFileJob *>(sender());
Q_ASSERT(job);
slotJobDestroyed(job); // remove it from the _jobs list
qDebug() << Q_FUNC_INFO << job->reply()->request().url() << "FINISHED WITH STATUS"
<< job->reply()->error()
<< (job->reply()->error() == QNetworkReply::NoError ? QLatin1String("") : job->reply()->errorString())
<< job->reply()->attribute(QNetworkRequest::HttpStatusCodeAttribute)
<< job->reply()->attribute(QNetworkRequest::HttpReasonPhraseAttribute);
_propagator->_activeJobList.removeOne(this);
if (_finished) {
// We have sent the finished signal already. We don't need to handle any remaining jobs
return;
}
QNetworkReply::NetworkError err = job->reply()->error();
#if QT_VERSION < QT_VERSION_CHECK(5, 4, 2)
if (err == QNetworkReply::OperationCanceledError && job->reply()->property("owncloud-should-soft-cancel").isValid()) { // Abort the job and try again later.
// This works around a bug in QNAM wich might reuse a non-empty buffer for the next request.
qDebug() << "Forcing job abort on HTTP connection reset with Qt < 5.4.2.";
_propagator->_anotherSyncNeeded = true;
abortWithError(SyncFileItem::SoftError, tr("Forcing job abort on HTTP connection reset with Qt < 5.4.2."));
return;
}
#endif
if (err != QNetworkReply::NoError) {
_item->_httpErrorCode = job->reply()->attribute(QNetworkRequest::HttpStatusCodeAttribute).toInt();
if(checkForProblemsWithShared(_item->_httpErrorCode,
tr("The file was edited locally but is part of a read only share. "
"It is restored and your edit is in the conflict file."))) {
return;
}
QByteArray replyContent = job->reply()->readAll();
qDebug() << replyContent; // display the XML error in the debug
QString errorString = errorMessage(job->errorString(), replyContent);
if (job->reply()->hasRawHeader("OC-ErrorString")) {
errorString = job->reply()->rawHeader("OC-ErrorString");
}
if (_item->_httpErrorCode == 412) {
// Precondition Failed: Maybe the bad etag is in the database, we need to clear the
// parent folder etag so we won't read from DB next sync.
_propagator->_journal->avoidReadFromDbOnNextSync(_item->_file);
_propagator->_anotherSyncNeeded = true;
}
SyncFileItem::Status status = classifyError(err, _item->_httpErrorCode,
&_propagator->_anotherSyncNeeded);
abortWithError(status, errorString);
return;
}
_item->_httpErrorCode = job->reply()->attribute(QNetworkRequest::HttpStatusCodeAttribute).toInt();
// The server needs some time to process the request and provide us with a poll URL
if (_item->_httpErrorCode == 202) {
_finished = true;
QString path = QString::fromUtf8(job->reply()->rawHeader("OC-Finish-Poll"));
if (path.isEmpty()) {
done(SyncFileItem::NormalError, tr("Poll URL missing"));
return;
}
startPollJob(path);
return;
}
// Check the file again post upload.
// Two cases must be considered separately: If the upload is finished,
// the file is on the server and has a changed ETag. In that case,
// the etag has to be properly updated in the client journal, and because
// of that we can bail out here with an error. But we can reschedule a
// sync ASAP.
// But if the upload is ongoing, because not all chunks were uploaded
// yet, the upload can be stopped and an error can be displayed, because
// the server hasn't registered the new file yet.
QByteArray etag = getEtagFromReply(job->reply());
bool finished = etag.length() > 0;
// Check if the file still exists
const QString fullFilePath(_propagator->getFilePath(_item->_file));
if( !FileSystem::fileExists(fullFilePath) ) {
if (!finished) {
abortWithError(SyncFileItem::SoftError, tr("The local file was removed during sync."));
return;
} else {
_propagator->_anotherSyncNeeded = true;
}
}
// Check whether the file changed since discovery.
if (! FileSystem::verifyFileUnchanged(fullFilePath, _item->_size, _item->_modtime)) {
_propagator->_anotherSyncNeeded = true;
if( !finished ) {
abortWithError(SyncFileItem::SoftError, tr("Local file changed during sync."));
// FIXME: the legacy code was retrying for a few seconds.
// and also checking that after the last chunk, and removed the file in case of INSTRUCTION_NEW
return;
}
}
if (!finished) {
// Proceed to next chunk.
if (_currentChunk >= _chunkCount) {
if (!_jobs.empty()) {
// just wait for the other job to finish.
return;
}
_finished = true;
done(SyncFileItem::NormalError, tr("The server did not acknowledge the last chunk. (No e-tag was present)"));
return;
}
// Deletes an existing blacklist entry on successful chunk upload
if (_item->_hasBlacklistEntry) {
_propagator->_journal->wipeErrorBlacklistEntry(_item->_file);
_item->_hasBlacklistEntry = false;
}
SyncJournalDb::UploadInfo pi;
pi._valid = true;
auto currentChunk = job->_chunk;
foreach (auto *job, _jobs) {
// Take the minimum finished one
if (auto putJob = qobject_cast<PUTFileJob*>(job)) {
currentChunk = qMin(currentChunk, putJob->_chunk - 1);
}
}
pi._chunk = (currentChunk + _startChunk + 1) % _chunkCount ; // next chunk to start with
pi._transferid = _transferId;
pi._modtime = Utility::qDateTimeFromTime_t(_item->_modtime);
_propagator->_journal->setUploadInfo(_item->_file, pi);
_propagator->_journal->commit("Upload info");
startNextChunk();
return;
}
// the following code only happens after all chunks were uploaded.
_finished = true;
// the file id should only be empty for new files up- or downloaded
QByteArray fid = job->reply()->rawHeader("OC-FileID");
if( !fid.isEmpty() ) {
if( !_item->_fileId.isEmpty() && _item->_fileId != fid ) {
qDebug() << "WARN: File ID changed!" << _item->_fileId << fid;
}
_item->_fileId = fid;
}
_item->_etag = etag;
_item->_responseTimeStamp = job->responseTimestamp();
if (job->reply()->rawHeader("X-OC-MTime") != "accepted") {
// X-OC-MTime is supported since owncloud 5.0. But not when chunking.
// Normally Owncloud 6 always puts X-OC-MTime
qWarning() << "Server does not support X-OC-MTime" << job->reply()->rawHeader("X-OC-MTime");
// Well, the mtime was not set
done(SyncFileItem::SoftError, "Server does not support X-OC-MTime");
}
// performance logging
_item->_requestDuration = _stopWatch.stop();
qDebug() << "*==* duration UPLOAD" << _item->_size
<< _stopWatch.durationOfLap(QLatin1String("ContentChecksum"))
<< _stopWatch.durationOfLap(QLatin1String("TransmissionChecksum"))
<< _item->_requestDuration;
// The job might stay alive for the whole sync, release this tiny bit of memory.
_stopWatch.reset();
finalize();
}
void PropagateUploadFileV1::slotUploadProgress(qint64 sent, qint64 total)
{
// Completion is signaled with sent=0, total=0; avoid accidentally
// resetting progress due to the sent being zero by ignoring it.
// finishedSignal() is bound to be emitted soon anyway.
// See https://bugreports.qt.io/browse/QTBUG-44782.
if (sent == 0 && total == 0) {
return;
}
int progressChunk = _currentChunk + _startChunk - 1;
if (progressChunk >= _chunkCount)
progressChunk = _currentChunk - 1;
// amount is the number of bytes already sent by all the other chunks that were sent
// not including this one.
// FIXME: this assumes all chunks have the same size, which is true only if the last chunk
// has not been finished (which should not happen because the last chunk is sent sequentially)
quint64 amount = progressChunk * chunkSize();
sender()->setProperty("byteWritten", sent);
if (_jobs.count() > 1) {
amount -= (_jobs.count() -1) * chunkSize();
foreach (QObject *j, _jobs) {
amount += j->property("byteWritten").toULongLong();
}
} else {
// sender() is the only current job, no need to look at the byteWritten properties
amount += sent;
}
emit progress(*_item, amount);
}
}

View file

@ -47,6 +47,7 @@ owncloud_add_test(ExcludedFiles "")
if(HAVE_QT5 AND NOT BUILD_WITH_QT4)
owncloud_add_test(SyncEngine "syncenginetestutils.h")
owncloud_add_test(SyncFileStatusTracker "syncenginetestutils.h")
owncloud_add_test(ChunkingNg "syncenginetestutils.h")
endif(HAVE_QT5 AND NOT BUILD_WITH_QT4)
SET(FolderMan_SRC ../src/gui/folderman.cpp)

View file

@ -18,6 +18,20 @@
#include <QtTest>
static const QUrl sRootUrl("owncloud://somehost/owncloud/remote.php/webdav/");
static const QUrl sRootUrl2("owncloud://somehost/owncloud/remote.php/dav/files/admin/");
static const QUrl sUploadUrl("owncloud://somehost/owncloud/remote.php/dav/uploads/admin/");
inline QString getFilePathFromUrl(const QUrl &url) {
QString path = url.path();
if (path.startsWith(sRootUrl.path()))
return path.mid(sRootUrl.path().length());
if (path.startsWith(sRootUrl2.path()))
return path.mid(sRootUrl2.path().length());
if (path.startsWith(sUploadUrl.path()))
return path.mid(sUploadUrl.path().length());
return {};
}
inline QString generateEtag() {
return QString::number(QDateTime::currentDateTime().toMSecsSinceEpoch(), 16);
@ -68,10 +82,15 @@ public:
QFile file{_rootDir.filePath(relativePath)};
QVERIFY(!file.exists());
file.open(QFile::WriteOnly);
file.write(QByteArray{}.fill(contentChar, size));
QByteArray buf(1024, contentChar);
for (int x = 0; x < size/buf.size(); ++x) {
file.write(buf);
}
file.write(buf.data(), size % buf.size());
file.close();
// Set the mtime 30 seconds in the past, for some tests that need to make sure that the mtime differs.
OCC::FileSystem::setModTime(file.fileName(), OCC::Utility::qDateTimeToTime_t(QDateTime::currentDateTime().addSecs(-30)));
QCOMPARE(file.size(), size);
}
void setContents(const QString &relativePath, char contentChar) override {
QFile file{_rootDir.filePath(relativePath)};
@ -284,6 +303,12 @@ public:
setOperation(op);
open(QIODevice::ReadOnly);
QString fileName = getFilePathFromUrl(request.url());
Q_ASSERT(!fileName.isNull()); // for root, it should be empty
const FileInfo *fileInfo = remoteRootFileInfo.find(fileName);
Q_ASSERT(fileInfo);
QString prefix = request.url().path().left(request.url().path().size() - fileName.size());
// Don't care about the request and just return a full propfind
const QString davUri{QStringLiteral("DAV:")};
const QString ocUri{QStringLiteral("http://owncloud.org/ns")};
@ -297,7 +322,7 @@ public:
auto writeFileResponse = [&](const FileInfo &fileInfo) {
xml.writeStartElement(davUri, QStringLiteral("response"));
xml.writeTextElement(davUri, QStringLiteral("href"), "/owncloud/remote.php/webdav/" + fileInfo.path());
xml.writeTextElement(davUri, QStringLiteral("href"), prefix + fileInfo.path());
xml.writeStartElement(davUri, QStringLiteral("propstat"));
xml.writeStartElement(davUri, QStringLiteral("prop"));
@ -322,11 +347,6 @@ public:
xml.writeEndElement(); // response
};
Q_ASSERT(request.url().path().startsWith(sRootUrl.path()));
QString fileName = request.url().path().mid(sRootUrl.path().length());
const FileInfo *fileInfo = remoteRootFileInfo.find(fileName);
Q_ASSERT(fileInfo);
writeFileResponse(*fileInfo);
foreach(const FileInfo &childFileInfo, fileInfo->children)
writeFileResponse(childFileInfo);
@ -370,8 +390,8 @@ public:
setOperation(op);
open(QIODevice::ReadOnly);
Q_ASSERT(request.url().path().startsWith(sRootUrl.path()));
QString fileName = request.url().path().mid(sRootUrl.path().length());
QString fileName = getFilePathFromUrl(request.url());
Q_ASSERT(!fileName.isEmpty());
if ((fileInfo = remoteRootFileInfo.find(fileName))) {
fileInfo->size = putPayload.size();
fileInfo->contentChar = putPayload.at(0);
@ -388,6 +408,7 @@ public:
}
Q_INVOKABLE void respond() {
emit uploadProgress(fileInfo->size, fileInfo->size);
setRawHeader("OC-ETag", fileInfo->etag.toLatin1());
setRawHeader("ETag", fileInfo->etag.toLatin1());
setRawHeader("X-OC-MTime", "accepted"); // Prevents Q_ASSERT(!_runningNow) since we'll call PropagateItemJob::done twice in that case.
@ -412,8 +433,8 @@ public:
setOperation(op);
open(QIODevice::ReadOnly);
Q_ASSERT(request.url().path().startsWith(sRootUrl.path()));
QString fileName = request.url().path().mid(sRootUrl.path().length());
QString fileName = getFilePathFromUrl(request.url());
Q_ASSERT(!fileName.isEmpty());
fileInfo = remoteRootFileInfo.createDir(fileName);
if (!fileInfo) {
@ -445,8 +466,8 @@ public:
setOperation(op);
open(QIODevice::ReadOnly);
Q_ASSERT(request.url().path().startsWith(sRootUrl.path()));
QString fileName = request.url().path().mid(sRootUrl.path().length());
QString fileName = getFilePathFromUrl(request.url());
Q_ASSERT(!fileName.isEmpty());
remoteRootFileInfo.remove(fileName);
QMetaObject::invokeMethod(this, "respond", Qt::QueuedConnection);
}
@ -472,11 +493,10 @@ public:
setOperation(op);
open(QIODevice::ReadOnly);
Q_ASSERT(request.url().path().startsWith(sRootUrl.path()));
QString fileName = request.url().path().mid(sRootUrl.path().length());
QString destPath = request.rawHeader("Destination");
Q_ASSERT(destPath.startsWith(sRootUrl.path()));
QString dest = destPath.mid(sRootUrl.path().length());
QString fileName = getFilePathFromUrl(request.url());
Q_ASSERT(!fileName.isEmpty());
QString dest = getFilePathFromUrl(QUrl::fromEncoded(request.rawHeader("Destination")));
Q_ASSERT(!dest.isEmpty());
remoteRootFileInfo.rename(fileName, dest);
QMetaObject::invokeMethod(this, "respond", Qt::QueuedConnection);
}
@ -505,8 +525,8 @@ public:
setOperation(op);
open(QIODevice::ReadOnly);
Q_ASSERT(request.url().path().startsWith(sRootUrl.path()));
QString fileName = request.url().path().mid(sRootUrl.path().length());
QString fileName = getFilePathFromUrl(request.url());
Q_ASSERT(!fileName.isEmpty());
fileInfo = remoteRootFileInfo.find(fileName);
QMetaObject::invokeMethod(this, "respond", Qt::QueuedConnection);
}
@ -535,6 +555,79 @@ public:
}
};
class FakeChunkMoveReply : public QNetworkReply
{
Q_OBJECT
FileInfo *fileInfo;
public:
FakeChunkMoveReply(FileInfo &uploadsFileInfo, FileInfo &remoteRootFileInfo,
QNetworkAccessManager::Operation op, const QNetworkRequest &request,
QObject *parent) : QNetworkReply{parent} {
setRequest(request);
setUrl(request.url());
setOperation(op);
open(QIODevice::ReadOnly);
QString source = getFilePathFromUrl(request.url());
Q_ASSERT(!source.isEmpty());
Q_ASSERT(source.endsWith("/.file"));
source = source.left(source.length() - qstrlen("/.file"));
auto sourceFolder = uploadsFileInfo.find(source);
Q_ASSERT(sourceFolder);
Q_ASSERT(sourceFolder->isDir);
int count = 0;
int size = 0;
char payload = '*';
do {
if (!sourceFolder->children.contains(QString::number(count)))
break;
auto &x = sourceFolder->children[QString::number(count)];
Q_ASSERT(!x.isDir);
Q_ASSERT(x.size > 0); // There should not be empty chunks
size += x.size;
payload = x.contentChar;
++count;
} while(true);
Q_ASSERT(count > 1); // There should be at least two chunks, otherwise why would we use chunking?
QCOMPARE(sourceFolder->children.count(), count); // There should not be holes or extra files
QString fileName = getFilePathFromUrl(QUrl::fromEncoded(request.rawHeader("Destination")));
Q_ASSERT(!fileName.isEmpty());
if ((fileInfo = remoteRootFileInfo.find(fileName))) {
QCOMPARE(request.rawHeader("If"), QByteArray("<" + request.rawHeader("Destination") + "> ([\"" + fileInfo->etag.toLatin1() + "\"])"));
fileInfo->size = size;
fileInfo->contentChar = payload;
} else {
Q_ASSERT(!request.hasRawHeader("If"));
// Assume that the file is filled with the same character
fileInfo = remoteRootFileInfo.create(fileName, size, payload);
}
if (!fileInfo) {
abort();
return;
}
QMetaObject::invokeMethod(this, "respond", Qt::QueuedConnection);
}
Q_INVOKABLE void respond() {
setAttribute(QNetworkRequest::HttpStatusCodeAttribute, 201);
setRawHeader("OC-ETag", fileInfo->etag.toLatin1());
setRawHeader("ETag", fileInfo->etag.toLatin1());
setRawHeader("OC-FileId", fileInfo->fileId);
emit metaDataChanged();
emit finished();
}
void abort() override { }
qint64 readData(char *, qint64) override { return 0; }
};
class FakeErrorReply : public QNetworkReply
{
Q_OBJECT
@ -561,33 +654,41 @@ public:
class FakeQNAM : public QNetworkAccessManager
{
FileInfo _remoteRootFileInfo;
FileInfo _uploadFileInfo;
QStringList _errorPaths;
public:
FakeQNAM(FileInfo initialRoot) : _remoteRootFileInfo{std::move(initialRoot)} { }
FileInfo &currentRemoteState() { return _remoteRootFileInfo; }
FileInfo &uploadState() { return _uploadFileInfo; }
QStringList &errorPaths() { return _errorPaths; }
protected:
QNetworkReply *createRequest(Operation op, const QNetworkRequest &request,
QIODevice *outgoingData = 0) {
const QString fileName = request.url().path().mid(sRootUrl.path().length());
const QString fileName = getFilePathFromUrl(request.url());
Q_ASSERT(!fileName.isNull());
if (_errorPaths.contains(fileName))
return new FakeErrorReply{op, request, this};
bool isUpload = request.url().path().startsWith(sUploadUrl.path());
FileInfo &info = isUpload ? _uploadFileInfo : _remoteRootFileInfo;
auto verb = request.attribute(QNetworkRequest::CustomVerbAttribute);
if (verb == QLatin1String("PROPFIND"))
// Ignore outgoingData always returning somethign good enough, works for now.
return new FakePropfindReply{_remoteRootFileInfo, op, request, this};
return new FakePropfindReply{info, op, request, this};
else if (verb == QLatin1String("GET"))
return new FakeGetReply{_remoteRootFileInfo, op, request, this};
return new FakeGetReply{info, op, request, this};
else if (verb == QLatin1String("PUT"))
return new FakePutReply{_remoteRootFileInfo, op, request, outgoingData->readAll(), this};
return new FakePutReply{info, op, request, outgoingData->readAll(), this};
else if (verb == QLatin1String("MKCOL"))
return new FakeMkcolReply{_remoteRootFileInfo, op, request, this};
return new FakeMkcolReply{info, op, request, this};
else if (verb == QLatin1String("DELETE"))
return new FakeDeleteReply{_remoteRootFileInfo, op, request, this};
else if (verb == QLatin1String("MOVE"))
return new FakeMoveReply{_remoteRootFileInfo, op, request, this};
return new FakeDeleteReply{info, op, request, this};
else if (verb == QLatin1String("MOVE") && !isUpload)
return new FakeMoveReply{info, op, request, this};
else if (verb == QLatin1String("MOVE") && isUpload)
return new FakeChunkMoveReply{info, _remoteRootFileInfo, op, request, this};
else {
qDebug() << verb << outgoingData;
Q_UNREACHABLE();
@ -659,6 +760,7 @@ public:
}
FileInfo currentRemoteState() { return _fakeQnam->currentRemoteState(); }
FileInfo &uploadState() { return _fakeQnam->uploadState(); }
QStringList &serverErrorPaths() { return _fakeQnam->errorPaths(); }
@ -695,14 +797,16 @@ public:
QVERIFY(false);
}
void execUntilFinished() {
bool execUntilFinished() {
QSignalSpy spy(_syncEngine.get(), SIGNAL(finished(bool)));
QVERIFY(spy.wait());
bool ok = spy.wait(60000);
Q_ASSERT(ok && "Sync timed out");
return spy[0][0].toBool();
}
void syncOnce() {
bool syncOnce() {
scheduleSync();
execUntilFinished();
return execUntilFinished();
}
private:

78
test/testchunkingng.cpp Normal file
View file

@ -0,0 +1,78 @@
/*
* This software is in the public domain, furnished "as is", without technical
* support, and with no warranty, express or implied, as to its usefulness for
* any purpose.
*
*/
#include <QtTest>
#include "syncenginetestutils.h"
#include <syncengine.h>
using namespace OCC;
class TestChunkingNG : public QObject
{
Q_OBJECT
private slots:
void testFileUpload() {
FakeFolder fakeFolder{FileInfo::A12_B12_C12_S12()};
fakeFolder.syncEngine().account()->setCapabilities({ { "dav", QVariantMap{ {"chunking", "1.0"} } } });
const int size = 300 * 1000 * 1000; // 300 MB
fakeFolder.localModifier().insert("A/a0", size);
QVERIFY(fakeFolder.syncOnce());
QCOMPARE(fakeFolder.currentLocalState(), fakeFolder.currentRemoteState());
QCOMPARE(fakeFolder.uploadState().children.count(), 1); // the transfer was done with chunking
QCOMPARE(fakeFolder.currentRemoteState().find("A/a0")->size, size);
// Check that another upload of the same file also work.
fakeFolder.localModifier().appendByte("A/a0");
QVERIFY(fakeFolder.syncOnce());
QCOMPARE(fakeFolder.currentLocalState(), fakeFolder.currentRemoteState());
QCOMPARE(fakeFolder.uploadState().children.count(), 2); // the transfer was done with chunking
}
void testResume () {
FakeFolder fakeFolder{FileInfo::A12_B12_C12_S12()};
fakeFolder.syncEngine().account()->setCapabilities({ { "dav", QVariantMap{ {"chunking", "1.0"} } } });
const int size = 300 * 1000 * 1000; // 300 MB
fakeFolder.localModifier().insert("A/a0", size);
// Abort when the upload is at 1/3
int sizeWhenAbort = -1;
auto con = QObject::connect(&fakeFolder.syncEngine(), &SyncEngine::transmissionProgress,
[&](const ProgressInfo &progress) {
if (progress.completedSize() > (progress.totalSize() /3 )) {
sizeWhenAbort = progress.completedSize();
fakeFolder.syncEngine().abort();
}
});
QVERIFY(!fakeFolder.syncOnce()); // there should have been an error
QObject::disconnect(con);
QVERIFY(sizeWhenAbort > 0);
QVERIFY(sizeWhenAbort < size);
QCOMPARE(fakeFolder.uploadState().children.count(), 1); // the transfer was done with chunking
auto upStateChildren = fakeFolder.uploadState().children.first().children;
QCOMPARE(sizeWhenAbort, std::accumulate(upStateChildren.cbegin(), upStateChildren.cend(), 0,
[](int s, const FileInfo &i) { return s + i.size; }));
// Add a fake file to make sure it gets deleted
fakeFolder.uploadState().children.first().insert("10000", size);
QVERIFY(fakeFolder.syncOnce());
QCOMPARE(fakeFolder.currentLocalState(), fakeFolder.currentRemoteState());
QCOMPARE(fakeFolder.uploadState().children.count(), 1); // The same chunk id was re-used
QCOMPARE(fakeFolder.currentRemoteState().find("A/a0")->size, size);
}
};
QTEST_GUILESS_MAIN(TestChunkingNG)
#include "testchunkingng.moc"