Propatage upload: Preload the chunk in memory and close the file

This should solve #2675 and #1981

By preloading the chunks in memory before sending them, we don't keep the
file open and therefore we let other program open the file for writing.

If the file is modified between two chunks, we detect that and abort anyway
This commit is contained in:
Olivier Goffart 2015-01-14 12:48:38 +01:00
parent 0215b250af
commit 3ec19ee355
2 changed files with 73 additions and 62 deletions

View file

@ -27,6 +27,7 @@
#include <QFileInfo>
#include <QDir>
#include <cmath>
#include <cstring>
namespace OCC {
@ -151,17 +152,15 @@ void PropagateUploadFileQNAM::start()
if (_propagator->_abortRequested.fetchAndAddRelaxed(0))
return;
_file = new QFile(_propagator->getFilePath(_item._file), this);
if (!_file->open(QIODevice::ReadOnly)) {
// Soft error because this is likely caused by the user modifying his files while syncing
done(SyncFileItem::SoftError, _file->errorString());
delete _file;
QFileInfo fi(_propagator->getFilePath(_item._file));
if (!fi.exists()) {
done(SyncFileItem::SoftError, tr("File Removed"));
return;
}
// Update the mtime and size, it might have changed since discovery.
_item._modtime = FileSystem::getModTime(_file->fileName());
quint64 fileSize = _file->size();
_item._modtime = FileSystem::getModTime(fi.absoluteFilePath());
quint64 fileSize = fi.size();
_item._size = fileSize;
// But skip the file if the mtime is too close to 'now'!
@ -171,7 +170,6 @@ void PropagateUploadFileQNAM::start()
if (modtime.msecsTo(QDateTime::currentDateTime()) < minFileAgeForUpload) {
_propagator->_anotherSyncNeeded = true;
done(SyncFileItem::SoftError, tr("Local file changed during sync."));
delete _file;
return;
}
@ -195,7 +193,7 @@ void PropagateUploadFileQNAM::start()
}
UploadDevice::UploadDevice(QIODevice *file, qint64 start, qint64 size, BandwidthManager *bwm)
: _file(file), _read(0), _size(size), _start(start),
: _file(file), _size(size), _start(start), _read(0),
_bandwidthManager(bwm),
_bandwidthQuota(0),
_readWithProgress(0),
@ -209,18 +207,33 @@ UploadDevice::~UploadDevice() {
_bandwidthManager->unregisterUploadDevice(this);
}
bool UploadDevice::open(OpenMode mode)
{
Q_ASSERT(mode == QIODevice::ReadOnly);
_data.clear();
_size = qMin(_file->size(), _size);
_data.resize(_size);
if (!_file->seek(_start)) {
setErrorString(_file->errorString());
return false;
}
auto read = _file->read(_data.data(), _size);
if (read != _size) {
setErrorString(_file->errorString());
return false;
}
_read = 0;
return QIODevice::open(mode);
}
qint64 UploadDevice::writeData(const char* , qint64 ) {
Q_ASSERT(!"write to read only device");
return 0;
}
qint64 UploadDevice::readData(char* data, qint64 maxlen) {
if (_file.isNull()) {
qDebug() << "Upload file object deleted during upload";
close();
return -1;
}
_file.data()->seek(_start + _read);
//qDebug() << Q_FUNC_INFO << maxlen << _read << _size << _bandwidthQuota;
if (_size - _read <= 0) {
// at end
@ -242,13 +255,9 @@ qint64 UploadDevice::readData(char* data, qint64 maxlen) {
}
_bandwidthQuota -= maxlen;
}
qint64 ret = _file.data()->read(data, maxlen);
if (ret < 0)
return -1;
_read += ret;
return ret;
std::memcpy(data, _data.data()+_read, maxlen);
_read += maxlen;
return maxlen;
}
void UploadDevice::slotJobUploadProgress(qint64 sent, qint64 t)
@ -261,14 +270,7 @@ void UploadDevice::slotJobUploadProgress(qint64 sent, qint64 t)
}
bool UploadDevice::atEnd() const {
if (_file.isNull()) {
qDebug() << "Upload file object deleted during upload";
return true;
}
// qDebug() << this << Q_FUNC_INFO << _read << chunkSize()
// << (_read >= chunkSize() || _file.data()->atEnd())
// << (_read >= _size);
return _file.data()->atEnd() || (_read >= _size);
return _read >= _size;
}
qint64 UploadDevice::size() const{
@ -344,6 +346,14 @@ void PropagateUploadFileQNAM::startNextChunk()
}
QString path = _item._file;
QFile file(_propagator->getFilePath(_item._file), this);
if (!file.open(QIODevice::ReadOnly)) {
// Soft error because this is likely caused by the user modifying his files while syncing
abortWithError(SyncFileItem::SoftError, file.errorString());
return;
}
UploadDevice *device = 0;
if (_chunkCount > 1) {
int sendingChunk = (_currentChunk + _startChunk) % _chunkCount;
@ -358,17 +368,12 @@ void PropagateUploadFileQNAM::startNextChunk()
currentChunkSize = chunkSize();
}
}
device = new UploadDevice(_file, chunkSize() * quint64(sendingChunk), currentChunkSize, &_propagator->_bandwidthManager);
device = new UploadDevice(&file, chunkSize() * quint64(sendingChunk), currentChunkSize, &_propagator->_bandwidthManager);
} else {
device = new UploadDevice(_file, 0, fileSize, &_propagator->_bandwidthManager);
device = new UploadDevice(&file, 0, fileSize, &_propagator->_bandwidthManager);
}
bool isOpen = true;
if (!device->isOpen()) {
isOpen = device->open(QIODevice::ReadOnly);
}
if( isOpen ) {
if (device->open(QIODevice::ReadOnly)) {
PUTFileJob* job = new PUTFileJob(_propagator->account(), _propagator->_remoteFolder + path, device, headers, _currentChunk);
_jobs.append(job);
connect(job, SIGNAL(finishedSignal()), this, SLOT(slotPutFinished()));
@ -445,14 +450,7 @@ void PropagateUploadFileQNAM::slotPutFinished()
_propagator->_anotherSyncNeeded = true;
}
foreach (auto job, _jobs) {
if (job->reply()) {
job->reply()->abort();
}
}
_finished = true;
done(classifyError(err, _item._httpErrorCode), errorString);
abortWithError(classifyError(err, _item._httpErrorCode), errorString);
return;
}
@ -462,7 +460,6 @@ void PropagateUploadFileQNAM::slotPutFinished()
_finished = true;
QString path = QString::fromUtf8(job->reply()->rawHeader("OC-Finish-Poll"));
if (path.isEmpty()) {
_finished = true;
done(SyncFileItem::NormalError, tr("Poll URL missing"));
return;
}
@ -488,8 +485,7 @@ void PropagateUploadFileQNAM::slotPutFinished()
// Check if the file still exists
if( !fi.exists() ) {
if (!finished) {
_finished = true;
done(SyncFileItem::SoftError, tr("The local file was removed during sync."));
abortWithError(SyncFileItem::SoftError, tr("The local file was removed during sync."));
return;
} else {
_propagator->_anotherSyncNeeded = true;
@ -506,8 +502,7 @@ void PropagateUploadFileQNAM::slotPutFinished()
<< ", QFileInfo: " << Utility::qDateTimeToTime_t(fi.lastModified()) << fi.lastModified();
_propagator->_anotherSyncNeeded = true;
if( !finished ) {
_finished = true;
done(SyncFileItem::SoftError, tr("Local file changed during sync."));
abortWithError(SyncFileItem::SoftError, tr("Local file changed during sync."));
// FIXME: the legacy code was retrying for a few seconds.
// and also checking that after the last chunk, and removed the file in case of INSTRUCTION_NEW
return;
@ -652,4 +647,13 @@ void PropagateUploadFileQNAM::abort()
}
}
// This function is used whenever there is an error occuring and jobs might be in progress
void PropagateUploadFileQNAM::abortWithError(SyncFileItem::Status status, const QString &error)
{
_finished = true;
abort();
done(status, error);
}
}

View file

@ -26,17 +26,9 @@ class BandwidthManager;
class UploadDevice : public QIODevice {
Q_OBJECT
public:
QPointer<QIODevice> _file;
qint64 _read;
qint64 _size;
qint64 _start;
BandwidthManager* _bandwidthManager;
qint64 _bandwidthQuota;
qint64 _readWithProgress;
UploadDevice(QIODevice *file, qint64 start, qint64 size, BandwidthManager *bwm);
~UploadDevice();
bool open(OpenMode mode) Q_DECL_OVERRIDE;
qint64 writeData(const char* , qint64 ) Q_DECL_OVERRIDE;
qint64 readData(char* data, qint64 maxlen) Q_DECL_OVERRIDE;
bool atEnd() const Q_DECL_OVERRIDE;
@ -51,8 +43,23 @@ public:
bool isChoked() { return _choked; }
void giveBandwidthQuota(qint64 bwq);
private:
// Used before opening
QPointer<QIODevice> _file;
qint64 _size;
qint64 _start;
// Used after opening, in order to read
QByteArray _data;
qint64 _read;
// Bandith manager related
BandwidthManager* _bandwidthManager;
qint64 _bandwidthQuota;
qint64 _readWithProgress;
bool _bandwidthLimited; // if _bandwidthQuota will be used
bool _choked; // if upload is paused (readData() will return 0)
friend class BandwidthManager;
protected slots:
void slotJobUploadProgress(qint64 sent, qint64 t);
};
@ -117,14 +124,13 @@ signals:
class PropagateUploadFileQNAM : public PropagateItemJob {
Q_OBJECT
QFile *_file;
int _startChunk;
int _currentChunk;
int _chunkCount;
int _transferId;
QElapsedTimer _duration;
QVector<PUTFileJob*> _jobs;
bool _finished;
bool _finished; // Tells that all the jobs have been finished
public:
PropagateUploadFileQNAM(OwncloudPropagator* propagator,const SyncFileItem& item)
: PropagateItemJob(propagator, item), _startChunk(0), _currentChunk(0), _chunkCount(0), _transferId(0), _finished(false) {}
@ -139,6 +145,7 @@ private slots:
void slotJobDestroyed(QObject *job);
private:
void startPollJob(const QString& path);
void abortWithError(SyncFileItem::Status status, const QString &error);
};
}