Merge pull request #3420 from nextcloud/bugfix/cfapi-improve-cancellation

Cfapi: Make sure no data is transfered after cancellation
This commit is contained in:
Felix Weilbach 2021-06-17 11:46:13 +02:00 committed by GitHub
commit 6e41875fe1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 169 additions and 85 deletions

View file

@ -351,11 +351,13 @@ void GETFileJob::slotReadyRead()
void GETFileJob::cancel()
{
if (reply()->isRunning()) {
reply()->abort();
const auto networkReply = reply();
if (networkReply && networkReply->isRunning()) {
networkReply->abort();
}
if (_device && _device->isOpen()) {
_device->close();
}
emit canceled();
}
void GETFileJob::onTimedOut()

View file

@ -110,7 +110,6 @@ public:
void setExpectedContentLength(qint64 size) { _expectedContentLength = size; }
signals:
void canceled();
void finishedSignal();
void downloadProgress(qint64, qint64);
private slots:

View file

@ -146,6 +146,22 @@ void CALLBACK cfApiFetchDataCallback(const CF_CALLBACK_INFO *callbackInfo, const
return;
}
QLocalSocket signalSocket;
const QString signalSocketName = requestId + ":cancellation";
signalSocket.connectToServer(signalSocketName);
const auto cancellationSocketConnectResult = signalSocket.waitForConnected();
if (!cancellationSocketConnectResult) {
qCWarning(lcCfApiWrapper) << "Couldn't connect the socket" << signalSocketName
<< signalSocket.error() << signalSocket.errorString();
sendTransferError();
return;
}
auto hydrationRequestCancelled = false;
QObject::connect(&signalSocket, &QLocalSocket::readyRead, &loop, [&] {
hydrationRequestCancelled = true;
});
// CFAPI expects sent blocks to be of a multiple of a block size.
// Only the last sent block is allowed to be of a different size than
// a multiple of a block size
@ -169,6 +185,11 @@ void CALLBACK cfApiFetchDataCallback(const CF_CALLBACK_INFO *callbackInfo, const
};
QObject::connect(&socket, &QLocalSocket::readyRead, &loop, [&] {
if (hydrationRequestCancelled) {
qCDebug(lcCfApiWrapper) << "Don't transfer data because request" << requestId << "was cancelled";
return;
}
const auto receivedData = socket.readAll();
if (receivedData.isEmpty()) {
qCWarning(lcCfApiWrapper) << "Unexpected empty data received" << requestId;
@ -180,24 +201,34 @@ void CALLBACK cfApiFetchDataCallback(const CF_CALLBACK_INFO *callbackInfo, const
alignAndSendData(receivedData);
});
QObject::connect(vfs, &OCC::VfsCfApi::hydrationRequestFinished, &loop, [&](const QString &id, int s) {
QObject::connect(vfs, &OCC::VfsCfApi::hydrationRequestFinished, &loop, [&](const QString &id) {
qDebug(lcCfApiWrapper) << "Hydration finished for request" << id;
if (requestId == id) {
const auto status = static_cast<OCC::HydrationJob::Status>(s);
qCInfo(lcCfApiWrapper) << "Hydration done for" << path << requestId << status;
if (status != OCC::HydrationJob::Success) {
sendTransferError();
}
socket.close();
signalSocket.close();
loop.quit();
}
});
loop.exec();
if (!protrudingData.isEmpty()) {
if (!hydrationRequestCancelled && !protrudingData.isEmpty()) {
qDebug(lcCfApiWrapper) << "Send remaining protruding data. Size:" << protrudingData.size();
sendTransferInfo(protrudingData, dataOffset);
}
int hydrationJobResult = OCC::HydrationJob::Status::Error;
const auto invokeFinalizeResult = QMetaObject::invokeMethod(
vfs, [=] { vfs->finalizeHydrationJob(requestId); }, Qt::BlockingQueuedConnection,
&hydrationJobResult);
if (!invokeFinalizeResult) {
qCritical(lcCfApiWrapper) << "Failed to finalize hydration job for" << path << requestId;
}
if (static_cast<OCC::HydrationJob::Status>(hydrationJobResult) == OCC::HydrationJob::Success) {
sendTransferError();
}
}
}
void CALLBACK cfApiCancelFetchData(const CF_CALLBACK_INFO *callbackInfo, const CF_CALLBACK_PARAMETERS * /*callbackParameters*/)
@ -217,7 +248,6 @@ void CALLBACK cfApiCancelFetchData(const CF_CALLBACK_INFO *callbackInfo, const C
}
}
CF_CALLBACK_REGISTRATION cfApiCallbacks[] = {
{ CF_CALLBACK_TYPE_FETCH_DATA, cfApiFetchDataCallback },
{ CF_CALLBACK_TYPE_CANCEL_FETCH_DATA, cfApiCancelFetchData },
@ -292,8 +322,6 @@ CF_SET_PIN_FLAGS pinRecurseModeToCfSetPinFlags(OCC::CfApiWrapper::SetPinRecurseM
}
}
}
OCC::CfApiWrapper::ConnectionKey::ConnectionKey()
: _data(new CF_CONNECTION_KEY, [](void *p) { delete reinterpret_cast<CF_CONNECTION_KEY *>(p); })
{

View file

@ -16,6 +16,7 @@
#include "common/syncjournaldb.h"
#include "propagatedownload.h"
#include "vfs/cfapi/vfs_cfapi.h"
#include <QLocalServer>
#include <QLocalSocket>
@ -25,7 +26,6 @@ Q_LOGGING_CATEGORY(lcHydration, "nextcloud.sync.vfs.hydrationjob", QtInfoMsg)
OCC::HydrationJob::HydrationJob(QObject *parent)
: QObject(parent)
{
connect(this, &HydrationJob::finished, this, &HydrationJob::deleteLater);
}
OCC::AccountPtr OCC::HydrationJob::account() const
@ -104,90 +104,131 @@ void OCC::HydrationJob::start()
Q_ASSERT(_localPath.endsWith('/'));
Q_ASSERT(!_folderPath.startsWith('/'));
_server = new QLocalServer(this);
const auto listenResult = _server->listen(_requestId);
if (!listenResult) {
qCCritical(lcHydration) << "Couldn't get server to listen" << _requestId << _localPath << _folderPath;
emitFinished(Error);
const auto startServer = [this](const QString &serverName) -> QLocalServer * {
const auto server = new QLocalServer(this);
const auto listenResult = server->listen(serverName);
if (!listenResult) {
qCCritical(lcHydration) << "Couldn't get server to listen" << serverName
<< _localPath << _folderPath;
if (!_isCancelled) {
emitFinished(Error);
}
return nullptr;
}
qCInfo(lcHydration) << "Server ready, waiting for connections" << serverName
<< _localPath << _folderPath;
return server;
};
// Start cancellation server
_signalServer = startServer(_requestId + ":cancellation");
Q_ASSERT(_signalServer);
if (!_signalServer) {
return;
}
connect(_signalServer, &QLocalServer::newConnection, this, &HydrationJob::onCancellationServerNewConnection);
qCInfo(lcHydration) << "Server ready, waiting for connections" << _requestId << _localPath << _folderPath;
connect(_server, &QLocalServer::newConnection, this, &HydrationJob::onNewConnection);
// Start transfer data server
_transferDataServer = startServer(_requestId);
Q_ASSERT(_transferDataServer);
if (!_transferDataServer) {
return;
}
connect(_transferDataServer, &QLocalServer::newConnection, this, &HydrationJob::onNewConnection);
}
void OCC::HydrationJob::cancel()
{
if (!_job) {
return;
Q_ASSERT(_signalSocket);
_isCancelled = true;
if (_job) {
_job->cancel();
}
_job->cancel();
_signalSocket->write("cancelled");
emitFinished(Cancelled);
}
void OCC::HydrationJob::emitFinished(Status status)
{
_status = status;
_signalSocket->close();
if (status == Success) {
connect(_socket, &QLocalSocket::disconnected, this, [=]{
_socket->close();
connect(_transferDataSocket, &QLocalSocket::disconnected, this, [=] {
_transferDataSocket->close();
emit finished(this);
});
_socket->disconnectFromServer();
} else {
_socket->close();
emit finished(this);
_transferDataSocket->disconnectFromServer();
return;
}
_transferDataSocket->close();
emit finished(this);
}
void OCC::HydrationJob::emitCanceled()
void OCC::HydrationJob::onCancellationServerNewConnection()
{
connect(_socket, &QLocalSocket::disconnected, this, [=] {
_socket->close();
});
_socket->disconnectFromServer();
Q_ASSERT(!_signalSocket);
emit canceled(this);
qCInfo(lcHydration) << "Got new connection on cancellation server" << _requestId << _folderPath;
_signalSocket = _signalServer->nextPendingConnection();
}
void OCC::HydrationJob::onNewConnection()
{
Q_ASSERT(!_socket);
Q_ASSERT(!_transferDataSocket);
Q_ASSERT(!_job);
qCInfo(lcHydration) << "Got new connection starting GETFileJob" << _requestId << _folderPath;
_socket = _server->nextPendingConnection();
_job = new GETFileJob(_account, _remotePath + _folderPath, _socket, {}, {}, 0, this);
_transferDataSocket = _transferDataServer->nextPendingConnection();
_job = new GETFileJob(_account, _remotePath + _folderPath, _transferDataSocket, {}, {}, 0, this);
connect(_job, &GETFileJob::finishedSignal, this, &HydrationJob::onGetFinished);
connect(_job, &GETFileJob::canceled, this, &HydrationJob::onGetCanceled);
_job->start();
}
void OCC::HydrationJob::onGetCanceled()
void OCC::HydrationJob::finalize(OCC::VfsCfApi *vfs)
{
qCInfo(lcHydration) << "GETFileJob canceled" << _requestId << _folderPath << _job->reply()->error();
emitCanceled();
// Mark the file as hydrated in the sync journal
SyncJournalFileRecord record;
_journal->getFileRecord(_folderPath, &record);
Q_ASSERT(record.isValid());
if (!record.isValid()) {
qCWarning(lcHydration) << "Couldn't find record to update after hydration" << _requestId << _folderPath;
// emitFinished(Error);
return;
}
if (_isCancelled) {
// Remove placeholder file because there might be already pumped
// some data into it
QFile::remove(_localPath + _folderPath);
// Create a new placeholder file
const auto item = SyncFileItem::fromSyncJournalFileRecord(record);
vfs->createPlaceholder(*item);
return;
}
record._type = ItemTypeFile;
_journal->setFileRecord(record);
}
void OCC::HydrationJob::onGetFinished()
{
qCInfo(lcHydration) << "GETFileJob finished" << _requestId << _folderPath << _job->reply()->error();
if (_job->reply()->error()) {
emitFinished(Error);
const auto isGetJobResultError = _job->reply()->error();
// GETFileJob deletes itself after this signal was handled
_job = nullptr;
if (_isCancelled) {
return;
}
SyncJournalFileRecord record;
_journal->getFileRecord(_folderPath, &record);
Q_ASSERT(record.isValid());
if (!record.isValid()) {
qCWarning(lcHydration) << "Couldn't find record to update after hydration" << _requestId << _folderPath;
if (isGetJobResultError) {
emitFinished(Error);
return;
}
record._type = ItemTypeFile;
_journal->setFileRecord(record);
emitFinished(Success);
}

View file

@ -23,6 +23,7 @@ class QLocalSocket;
namespace OCC {
class GETFileJob;
class SyncJournalDb;
class VfsCfApi;
class OWNCLOUDSYNC_EXPORT HydrationJob : public QObject
{
@ -31,6 +32,7 @@ public:
enum Status {
Success = 0,
Error,
Cancelled,
};
Q_ENUM(Status)
@ -58,29 +60,31 @@ public:
void start();
void cancel();
void finalize(OCC::VfsCfApi *vfs);
signals:
void finished(HydrationJob *job);
void canceled(HydrationJob *job);
private:
void emitFinished(Status status);
void emitCanceled();
void onNewConnection();
void onCancellationServerNewConnection();
void onGetFinished();
void onGetCanceled();
AccountPtr _account;
QString _remotePath;
QString _localPath;
SyncJournalDb *_journal = nullptr;
bool _isCancelled = false;
QString _requestId;
QString _folderPath;
QLocalServer *_server = nullptr;
QLocalSocket *_socket = nullptr;
QLocalServer *_transferDataServer = nullptr;
QLocalServer *_signalServer = nullptr;
QLocalSocket *_transferDataSocket = nullptr;
QLocalSocket *_signalSocket = nullptr;
GETFileJob *_job = nullptr;
Status _status = Success;
};

View file

@ -269,16 +269,28 @@ Vfs::AvailabilityResult VfsCfApi::availability(const QString &folderPath)
return AvailabilityError::NoSuchItem;
}
void VfsCfApi::cancelHydration(const QString &requestId, const QString & /*path*/)
HydrationJob *VfsCfApi::findHydrationJob(const QString &requestId) const
{
// Find matching hydration job for request id
const auto hydrationJobsIter = std::find_if(d->hydrationJobs.cbegin(), d->hydrationJobs.cend(), [&](const HydrationJob *job) {
return job->requestId() == requestId;
});
// If found, cancel it
if (hydrationJobsIter != d->hydrationJobs.cend()) {
(*hydrationJobsIter)->cancel();
return *hydrationJobsIter;
}
return nullptr;
}
void VfsCfApi::cancelHydration(const QString &requestId, const QString & /*path*/)
{
// Find matching hydration job for request id
const auto hydrationJob = findHydrationJob(requestId);
// If found, cancel it
if (hydrationJob) {
qCInfo(lcCfApi) << "Cancel hydration";
hydrationJob->cancel();
}
}
@ -360,7 +372,6 @@ void VfsCfApi::scheduleHydrationJob(const QString &requestId, const QString &fol
job->setRequestId(requestId);
job->setFolderPath(folderPath);
connect(job, &HydrationJob::finished, this, &VfsCfApi::onHydrationJobFinished);
connect(job, &HydrationJob::canceled, this, &VfsCfApi::onHydrationJobCanceled);
d->hydrationJobs << job;
job->start();
emit hydrationRequestReady(requestId);
@ -370,30 +381,27 @@ void VfsCfApi::onHydrationJobFinished(HydrationJob *job)
{
Q_ASSERT(d->hydrationJobs.contains(job));
qCInfo(lcCfApi) << "Hydration job finished" << job->requestId() << job->folderPath() << job->status();
emit hydrationRequestFinished(job->requestId(), job->status());
d->hydrationJobs.removeAll(job);
if (d->hydrationJobs.isEmpty()) {
emit doneHydrating();
}
emit hydrationRequestFinished(job->requestId());
}
void VfsCfApi::onHydrationJobCanceled(HydrationJob *job)
int VfsCfApi::finalizeHydrationJob(const QString &requestId)
{
const auto folderRelativePath = job->folderPath();
SyncJournalFileRecord record;
if (!params().journal->getFileRecord(folderRelativePath, &record)) {
qCWarning(lcCfApi) << "Could not read file record from journal for canceled hydration request.";
return;
qCDebug(lcCfApi) << "Finalize hydration job" << requestId;
// Find matching hydration job for request id
const auto hydrationJob = findHydrationJob(requestId);
// If found, finalize it
if (hydrationJob) {
hydrationJob->finalize(this);
d->hydrationJobs.removeAll(hydrationJob);
hydrationJob->deleteLater();
if (d->hydrationJobs.isEmpty()) {
emit doneHydrating();
}
return hydrationJob->status();
}
// Remove placeholder file because there might be already pumped
// some data into it
const auto folderPath = job->localPath();
QFile::remove(folderPath + folderRelativePath);
// Create a new placeholder file
const auto item = SyncFileItem::fromSyncJournalFileRecord(record);
createPlaceholder(*item);
return HydrationJob::Status::Error;
}
VfsCfApi::HydratationAndPinStates VfsCfApi::computeRecursiveHydrationAndPinStates(const QString &folderPath, const Optional<PinState> &basePinState)

View file

@ -55,6 +55,8 @@ public:
void cancelHydration(const QString &requestId, const QString &path);
int finalizeHydrationJob(const QString &requestId);
public slots:
void requestHydration(const QString &requestId, const QString &path);
void fileStatusChanged(const QString &systemFileName, SyncFileStatus fileStatus) override;
@ -62,7 +64,7 @@ public slots:
signals:
void hydrationRequestReady(const QString &requestId);
void hydrationRequestFailed(const QString &requestId);
void hydrationRequestFinished(const QString &requestId, int status);
void hydrationRequestFinished(const QString &requestId);
protected:
void startImpl(const VfsSetupParams &params) override;
@ -70,7 +72,7 @@ protected:
private:
void scheduleHydrationJob(const QString &requestId, const QString &folderPath);
void onHydrationJobFinished(HydrationJob *job);
void onHydrationJobCanceled(HydrationJob *job);
HydrationJob *findHydrationJob(const QString &requestId) const;
struct HasHydratedDehydrated {
bool hasHydrated = false;