mirror of
https://github.com/nextcloud/desktop.git
synced 2024-11-26 15:06:08 +03:00
SyncEngine: Recover when the PUT reply (or chunkin's MOVE) is lost
This can happen if the upload of a file is finished, but we just got disconnected right before recieving the reply containing the etag. So nothing was save din the DB, and we are not sure if the server recieved the file properly or not. Further local update of the file will cause a conflict. In order to fix this, store the checksum of the uploading file in the uploadinfo table of the local db (even if there is no chunking involved). And when we have a conflict, check that it is not because of this situation by checking the entry in the uploadinfo table. Issue #5106
This commit is contained in:
parent
4369853ddb
commit
4dc49ff3b0
7 changed files with 136 additions and 18 deletions
|
@ -389,6 +389,7 @@ bool SyncJournalDb::checkConnect()
|
||||||
"errorcount INTEGER,"
|
"errorcount INTEGER,"
|
||||||
"size INTEGER(8),"
|
"size INTEGER(8),"
|
||||||
"modtime INTEGER(8),"
|
"modtime INTEGER(8),"
|
||||||
|
"contentChecksum TEXT,"
|
||||||
"PRIMARY KEY(path)"
|
"PRIMARY KEY(path)"
|
||||||
");");
|
");");
|
||||||
|
|
||||||
|
@ -611,15 +612,15 @@ bool SyncJournalDb::checkConnect()
|
||||||
}
|
}
|
||||||
|
|
||||||
_getUploadInfoQuery.reset(new SqlQuery(_db));
|
_getUploadInfoQuery.reset(new SqlQuery(_db));
|
||||||
if (_getUploadInfoQuery->prepare("SELECT chunk, transferid, errorcount, size, modtime FROM "
|
if (_getUploadInfoQuery->prepare("SELECT chunk, transferid, errorcount, size, modtime, contentChecksum FROM "
|
||||||
"uploadinfo WHERE path=?1")) {
|
"uploadinfo WHERE path=?1")) {
|
||||||
return sqlFail("prepare _getUploadInfoQuery", *_getUploadInfoQuery);
|
return sqlFail("prepare _getUploadInfoQuery", *_getUploadInfoQuery);
|
||||||
}
|
}
|
||||||
|
|
||||||
_setUploadInfoQuery.reset(new SqlQuery(_db));
|
_setUploadInfoQuery.reset(new SqlQuery(_db));
|
||||||
if (_setUploadInfoQuery->prepare("INSERT OR REPLACE INTO uploadinfo "
|
if (_setUploadInfoQuery->prepare("INSERT OR REPLACE INTO uploadinfo "
|
||||||
"(path, chunk, transferid, errorcount, size, modtime) "
|
"(path, chunk, transferid, errorcount, size, modtime, contentChecksum) "
|
||||||
"VALUES ( ?1 , ?2, ?3 , ?4 , ?5, ?6 )")) {
|
"VALUES ( ?1 , ?2, ?3 , ?4 , ?5, ?6 , ?7 )")) {
|
||||||
return sqlFail("prepare _setUploadInfoQuery", *_setUploadInfoQuery);
|
return sqlFail("prepare _setUploadInfoQuery", *_setUploadInfoQuery);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -849,6 +850,16 @@ bool SyncJournalDb::updateMetadataTableStructure()
|
||||||
commitInternal("update database structure: add contentChecksumTypeId col");
|
commitInternal("update database structure: add contentChecksumTypeId col");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!tableColumns("uploadinfo").contains("contentChecksum")) {
|
||||||
|
SqlQuery query(_db);
|
||||||
|
query.prepare("ALTER TABLE uploadinfo ADD COLUMN contentChecksum TEXT;");
|
||||||
|
if (!query.exec()) {
|
||||||
|
sqlFail("updateMetadataTableStructure: add contentChecksum column", query);
|
||||||
|
re = false;
|
||||||
|
}
|
||||||
|
commitInternal("update database structure: add contentChecksum col for uploadinfo");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
return re;
|
return re;
|
||||||
}
|
}
|
||||||
|
@ -1472,6 +1483,7 @@ SyncJournalDb::UploadInfo SyncJournalDb::getUploadInfo(const QString &file)
|
||||||
res._errorCount = _getUploadInfoQuery->intValue(2);
|
res._errorCount = _getUploadInfoQuery->intValue(2);
|
||||||
res._size = _getUploadInfoQuery->int64Value(3);
|
res._size = _getUploadInfoQuery->int64Value(3);
|
||||||
res._modtime = _getUploadInfoQuery->int64Value(4);
|
res._modtime = _getUploadInfoQuery->int64Value(4);
|
||||||
|
res._contentChecksum = _getUploadInfoQuery->baValue(5);
|
||||||
res._valid = ok;
|
res._valid = ok;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1494,6 +1506,7 @@ void SyncJournalDb::setUploadInfo(const QString &file, const SyncJournalDb::Uplo
|
||||||
_setUploadInfoQuery->bindValue(4, i._errorCount);
|
_setUploadInfoQuery->bindValue(4, i._errorCount);
|
||||||
_setUploadInfoQuery->bindValue(5, i._size);
|
_setUploadInfoQuery->bindValue(5, i._size);
|
||||||
_setUploadInfoQuery->bindValue(6, i._modtime);
|
_setUploadInfoQuery->bindValue(6, i._modtime);
|
||||||
|
_setUploadInfoQuery->bindValue(7, i._contentChecksum);
|
||||||
|
|
||||||
if (!_setUploadInfoQuery->exec()) {
|
if (!_setUploadInfoQuery->exec()) {
|
||||||
return;
|
return;
|
||||||
|
@ -2001,7 +2014,8 @@ bool operator==(const SyncJournalDb::UploadInfo &lhs,
|
||||||
&& lhs._modtime == rhs._modtime
|
&& lhs._modtime == rhs._modtime
|
||||||
&& lhs._valid == rhs._valid
|
&& lhs._valid == rhs._valid
|
||||||
&& lhs._size == rhs._size
|
&& lhs._size == rhs._size
|
||||||
&& lhs._transferid == rhs._transferid;
|
&& lhs._transferid == rhs._transferid
|
||||||
|
&& lhs._contentChecksum == rhs._contentChecksum;
|
||||||
}
|
}
|
||||||
|
|
||||||
} // namespace OCC
|
} // namespace OCC
|
||||||
|
|
|
@ -112,6 +112,7 @@ public:
|
||||||
qint64 _modtime;
|
qint64 _modtime;
|
||||||
int _errorCount;
|
int _errorCount;
|
||||||
bool _valid;
|
bool _valid;
|
||||||
|
QByteArray _contentChecksum;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct PollInfo
|
struct PollInfo
|
||||||
|
|
|
@ -27,6 +27,7 @@
|
||||||
#include "csync_rename.h"
|
#include "csync_rename.h"
|
||||||
#include "common/c_jhash.h"
|
#include "common/c_jhash.h"
|
||||||
#include "common/asserts.h"
|
#include "common/asserts.h"
|
||||||
|
#include "common/syncjournalfilerecord.h"
|
||||||
|
|
||||||
#include <QLoggingCategory>
|
#include <QLoggingCategory>
|
||||||
Q_LOGGING_CATEGORY(lcReconcile, "sync.csync.reconciler", QtInfoMsg)
|
Q_LOGGING_CATEGORY(lcReconcile, "sync.csync.reconciler", QtInfoMsg)
|
||||||
|
@ -316,6 +317,35 @@ static int _csync_merge_algorithm_visitor(csync_file_stat_t *cur, CSYNC * ctx) {
|
||||||
(ctx->current == REMOTE_REPLICA ? cur->checksumHeader : other->checksumHeader);
|
(ctx->current == REMOTE_REPLICA ? cur->checksumHeader : other->checksumHeader);
|
||||||
if (!remoteChecksumHeader.isEmpty()) {
|
if (!remoteChecksumHeader.isEmpty()) {
|
||||||
is_conflict = true;
|
is_conflict = true;
|
||||||
|
|
||||||
|
// Do we have an UploadInfo for this?
|
||||||
|
// Maybe the Upload was completed, but the connection was broken just before
|
||||||
|
// we recieved the etag (Issue #5106)
|
||||||
|
auto up = ctx->statedb->getUploadInfo(cur->path);
|
||||||
|
if (up._valid && up._contentChecksum == remoteChecksumHeader) {
|
||||||
|
// Solve the conflict into an upload, or nothing
|
||||||
|
auto remoteNode = ctx->current == REMOTE_REPLICA ? cur : other;
|
||||||
|
auto localNode = ctx->current == REMOTE_REPLICA ? other : cur;
|
||||||
|
remoteNode->instruction = CSYNC_INSTRUCTION_NONE;
|
||||||
|
localNode->instruction = up._modtime == localNode->modtime ? CSYNC_INSTRUCTION_UPDATE_METADATA : CSYNC_INSTRUCTION_SYNC;
|
||||||
|
|
||||||
|
// Update the etag and other server metadata in the journal already
|
||||||
|
// (We can't use a typical CSYNC_INSTRUCTION_UPDATE_METADATA because
|
||||||
|
// we must not store the size/modtime from the file system)
|
||||||
|
OCC::SyncJournalFileRecord rec;
|
||||||
|
if (ctx->statedb->getFileRecord(remoteNode->path, &rec)) {
|
||||||
|
rec._path = remoteNode->path;
|
||||||
|
rec._etag = remoteNode->etag;
|
||||||
|
rec._fileId = remoteNode->file_id;
|
||||||
|
rec._modtime = remoteNode->modtime;
|
||||||
|
rec._type = remoteNode->type;
|
||||||
|
rec._fileSize = remoteNode->size;
|
||||||
|
rec._remotePerm = remoteNode->remotePerm;
|
||||||
|
rec._checksumHeader = remoteNode->checksumHeader;
|
||||||
|
ctx->statedb->setFileRecordMetadata(rec);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// SO: If there is no checksum, we can have !is_conflict here
|
// SO: If there is no checksum, we can have !is_conflict here
|
||||||
|
|
|
@ -230,6 +230,7 @@ void PropagateUploadFileNG::startNewUpload()
|
||||||
pi._valid = true;
|
pi._valid = true;
|
||||||
pi._transferid = _transferId;
|
pi._transferid = _transferId;
|
||||||
pi._modtime = _item->_modtime;
|
pi._modtime = _item->_modtime;
|
||||||
|
pi._contentChecksum = _item->_checksumHeader;
|
||||||
propagator()->_journal->setUploadInfo(_item->_file, pi);
|
propagator()->_journal->setUploadInfo(_item->_file, pi);
|
||||||
propagator()->_journal->commit("Upload info");
|
propagator()->_journal->commit("Upload info");
|
||||||
QMap<QByteArray, QByteArray> headers;
|
QMap<QByteArray, QByteArray> headers;
|
||||||
|
|
|
@ -43,10 +43,24 @@ void PropagateUploadFileV1::doStartUpload()
|
||||||
|
|
||||||
const SyncJournalDb::UploadInfo progressInfo = propagator()->_journal->getUploadInfo(_item->_file);
|
const SyncJournalDb::UploadInfo progressInfo = propagator()->_journal->getUploadInfo(_item->_file);
|
||||||
|
|
||||||
if (progressInfo._valid && progressInfo._modtime == _item->_modtime) {
|
if (progressInfo._valid && progressInfo._modtime == _item->_modtime
|
||||||
|
&& (progressInfo._contentChecksum == _item->_checksumHeader || progressInfo._contentChecksum.isEmpty() || _item->_checksumHeader.isEmpty())) {
|
||||||
_startChunk = progressInfo._chunk;
|
_startChunk = progressInfo._chunk;
|
||||||
_transferId = progressInfo._transferid;
|
_transferId = progressInfo._transferid;
|
||||||
qCInfo(lcPropagateUpload) << _item->_file << ": Resuming from chunk " << _startChunk;
|
qCInfo(lcPropagateUpload) << _item->_file << ": Resuming from chunk " << _startChunk;
|
||||||
|
} else if (_chunkCount <= 1 && !_item->_checksumHeader.isEmpty()) {
|
||||||
|
// If there is only one chunk, write the checksum in the database, so if the PUT is sent
|
||||||
|
// to the server, but the connection drops before we get the etag, we can check the checksum
|
||||||
|
// in reconcile (issue #5106)
|
||||||
|
SyncJournalDb::UploadInfo pi;
|
||||||
|
pi._valid = true;
|
||||||
|
pi._chunk = 0;
|
||||||
|
pi._transferid = _transferId;
|
||||||
|
pi._modtime = _item->_modtime;
|
||||||
|
pi._errorCount = 0;
|
||||||
|
pi._contentChecksum = _item->_checksumHeader;
|
||||||
|
propagator()->_journal->setUploadInfo(_item->_file, pi);
|
||||||
|
propagator()->_journal->commit("Upload info");
|
||||||
}
|
}
|
||||||
|
|
||||||
_currentChunk = 0;
|
_currentChunk = 0;
|
||||||
|
@ -274,6 +288,7 @@ void PropagateUploadFileV1::slotPutFinished()
|
||||||
pi._transferid = _transferId;
|
pi._transferid = _transferId;
|
||||||
pi._modtime = _item->_modtime;
|
pi._modtime = _item->_modtime;
|
||||||
pi._errorCount = 0; // successful chunk upload resets
|
pi._errorCount = 0; // successful chunk upload resets
|
||||||
|
pi._contentChecksum = _item->_checksumHeader;
|
||||||
propagator()->_journal->setUploadInfo(_item->_file, pi);
|
propagator()->_journal->setUploadInfo(_item->_file, pi);
|
||||||
propagator()->_journal->commit("Upload info");
|
propagator()->_journal->commit("Upload info");
|
||||||
startNextChunk();
|
startNextChunk();
|
||||||
|
|
|
@ -452,7 +452,11 @@ public:
|
||||||
emit finished();
|
emit finished();
|
||||||
}
|
}
|
||||||
|
|
||||||
void abort() override { }
|
void abort() override
|
||||||
|
{
|
||||||
|
setError(OperationCanceledError, "abort");
|
||||||
|
emit finished();
|
||||||
|
}
|
||||||
qint64 readData(char *, qint64) override { return 0; }
|
qint64 readData(char *, qint64) override { return 0; }
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -696,7 +700,12 @@ public:
|
||||||
emit finished();
|
emit finished();
|
||||||
}
|
}
|
||||||
|
|
||||||
void abort() override { }
|
void abort() override
|
||||||
|
{
|
||||||
|
setError(OperationCanceledError, "abort");
|
||||||
|
emit finished();
|
||||||
|
}
|
||||||
|
|
||||||
qint64 readData(char *, qint64) override { return 0; }
|
qint64 readData(char *, qint64) override { return 0; }
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -161,17 +161,6 @@ private slots:
|
||||||
QVERIFY(!moveChecksumHeader.isEmpty());
|
QVERIFY(!moveChecksumHeader.isEmpty());
|
||||||
fakeFolder.remoteModifier().find("A/a0")->checksums = moveChecksumHeader;
|
fakeFolder.remoteModifier().find("A/a0")->checksums = moveChecksumHeader;
|
||||||
|
|
||||||
// This time it's a real conflict, we have a remote checksum!
|
|
||||||
connection = connect(&fakeFolder.syncEngine(), &SyncEngine::aboutToPropagate, [&](SyncFileItemVector &items) {
|
|
||||||
SyncFileItemPtr a0;
|
|
||||||
for (auto &item : items) {
|
|
||||||
if (item->_file == "A/a0")
|
|
||||||
a0 = item;
|
|
||||||
}
|
|
||||||
|
|
||||||
QVERIFY(a0);
|
|
||||||
QCOMPARE(a0->_instruction, CSYNC_INSTRUCTION_CONFLICT);
|
|
||||||
});
|
|
||||||
QVERIFY(fakeFolder.syncOnce());
|
QVERIFY(fakeFolder.syncOnce());
|
||||||
disconnect(connection);
|
disconnect(connection);
|
||||||
QCOMPARE(nGET, 0); // no new download, just a metadata update!
|
QCOMPARE(nGET, 0); // no new download, just a metadata update!
|
||||||
|
@ -378,6 +367,65 @@ private slots:
|
||||||
QVERIFY(fakeFolder.uploadState().children.first().name != chunkingId);
|
QVERIFY(fakeFolder.uploadState().children.first().name != chunkingId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check what happens when the connection is dropped on the PUT (non-chunking) or MOVE (chunking)
|
||||||
|
// for on the issue #5106
|
||||||
|
void connectionDroppedBeforeEtagRecieved_data()
|
||||||
|
{
|
||||||
|
QTest::addColumn<bool>("chunking");
|
||||||
|
QTest::newRow("big file") << true;
|
||||||
|
QTest::newRow("small file") << false;
|
||||||
|
}
|
||||||
|
void connectionDroppedBeforeEtagRecieved()
|
||||||
|
{
|
||||||
|
QFETCH(bool, chunking);
|
||||||
|
FakeFolder fakeFolder{ FileInfo::A12_B12_C12_S12() };
|
||||||
|
fakeFolder.syncEngine().account()->setCapabilities({ { "dav", QVariantMap{ { "chunking", "1.0" } } }, { "checksums", QVariantMap{ { "supportedTypes", QStringList() << "SHA1" } } } });
|
||||||
|
const int size = chunking ? 150 * 1000 * 1000 : 300;
|
||||||
|
|
||||||
|
// Make the MOVE never reply, but trigger a client-abort and apply the change remotely
|
||||||
|
QByteArray checksumHeader;
|
||||||
|
int nGET = 0;
|
||||||
|
QScopedValueRollback<int> setHttpTimeout(AbstractNetworkJob::httpTimeout, 1);
|
||||||
|
int responseDelay = AbstractNetworkJob::httpTimeout * 1000 * 1000; // much bigger than http timeout (so a timeout will occur)
|
||||||
|
// This will perform the operation on the server, but the reply will not come to the client
|
||||||
|
fakeFolder.setServerOverride([&](QNetworkAccessManager::Operation op, const QNetworkRequest &request, QIODevice *outgoingData) -> QNetworkReply * {
|
||||||
|
if (!chunking && op == QNetworkAccessManager::PutOperation) {
|
||||||
|
checksumHeader = request.rawHeader("OC-Checksum");
|
||||||
|
return new DelayedReply<FakePutReply>(responseDelay, fakeFolder.remoteModifier(), op, request, outgoingData->readAll(), &fakeFolder.syncEngine());
|
||||||
|
} else if (chunking && request.attribute(QNetworkRequest::CustomVerbAttribute) == "MOVE") {
|
||||||
|
checksumHeader = request.rawHeader("OC-Checksum");
|
||||||
|
return new DelayedReply<FakeChunkMoveReply>(responseDelay, fakeFolder.uploadState(), fakeFolder.remoteModifier(), op, request, &fakeFolder.syncEngine());
|
||||||
|
} else if (op == QNetworkAccessManager::GetOperation) {
|
||||||
|
nGET++;
|
||||||
|
}
|
||||||
|
return nullptr;
|
||||||
|
});
|
||||||
|
|
||||||
|
|
||||||
|
// Test 1: a NEW file
|
||||||
|
fakeFolder.localModifier().insert("A/a0", size);
|
||||||
|
QVERIFY(!fakeFolder.syncOnce()); // timeout!
|
||||||
|
QCOMPARE(fakeFolder.currentLocalState(), fakeFolder.currentRemoteState()); // but the upload succeeded
|
||||||
|
QVERIFY(!checksumHeader.isEmpty());
|
||||||
|
fakeFolder.remoteModifier().find("A/a0")->checksums = checksumHeader; // The test system don't do that automatically
|
||||||
|
// Should be resolved properly
|
||||||
|
QVERIFY(fakeFolder.syncOnce());
|
||||||
|
QCOMPARE(nGET, 0);
|
||||||
|
QCOMPARE(fakeFolder.currentLocalState(), fakeFolder.currentRemoteState());
|
||||||
|
|
||||||
|
// Test 2: Modify the file further
|
||||||
|
fakeFolder.localModifier().appendByte("A/a0");
|
||||||
|
QVERIFY(!fakeFolder.syncOnce()); // timeout!
|
||||||
|
QCOMPARE(fakeFolder.currentLocalState(), fakeFolder.currentRemoteState()); // but the upload succeeded
|
||||||
|
fakeFolder.remoteModifier().find("A/a0")->checksums = checksumHeader;
|
||||||
|
// modify again, should not cause conflict
|
||||||
|
fakeFolder.localModifier().appendByte("A/a0");
|
||||||
|
QVERIFY(!fakeFolder.syncOnce()); // now it's trying to upload the modified file
|
||||||
|
QCOMPARE(fakeFolder.currentLocalState(), fakeFolder.currentRemoteState());
|
||||||
|
fakeFolder.remoteModifier().find("A/a0")->checksums = checksumHeader;
|
||||||
|
QVERIFY(fakeFolder.syncOnce());
|
||||||
|
QCOMPARE(nGET, 0);
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
QTEST_GUILESS_MAIN(TestChunkingNG)
|
QTEST_GUILESS_MAIN(TestChunkingNG)
|
||||||
|
|
Loading…
Reference in a new issue