From 3ec19ee35553aff4845bcd69add6d6ab60d54b9f Mon Sep 17 00:00:00 2001 From: Olivier Goffart Date: Wed, 14 Jan 2015 12:48:38 +0100 Subject: [PATCH] Propatage upload: Preload the chunk in memory and close the file This should solve #2675 and #1981 By preloading the chunks in memory before sending them, we don't keep the file open and therefore we let other program open the file for writing. If the file is modified between two chunks, we detect that and abort anyway --- src/libsync/propagateupload.cpp | 106 +++++++++++++++++--------------- src/libsync/propagateupload.h | 29 +++++---- 2 files changed, 73 insertions(+), 62 deletions(-) diff --git a/src/libsync/propagateupload.cpp b/src/libsync/propagateupload.cpp index b2202b6f0..51b87b797 100644 --- a/src/libsync/propagateupload.cpp +++ b/src/libsync/propagateupload.cpp @@ -27,6 +27,7 @@ #include #include #include +#include namespace OCC { @@ -151,17 +152,15 @@ void PropagateUploadFileQNAM::start() if (_propagator->_abortRequested.fetchAndAddRelaxed(0)) return; - _file = new QFile(_propagator->getFilePath(_item._file), this); - if (!_file->open(QIODevice::ReadOnly)) { - // Soft error because this is likely caused by the user modifying his files while syncing - done(SyncFileItem::SoftError, _file->errorString()); - delete _file; + QFileInfo fi(_propagator->getFilePath(_item._file)); + if (!fi.exists()) { + done(SyncFileItem::SoftError, tr("File Removed")); return; } // Update the mtime and size, it might have changed since discovery. - _item._modtime = FileSystem::getModTime(_file->fileName()); - quint64 fileSize = _file->size(); + _item._modtime = FileSystem::getModTime(fi.absoluteFilePath()); + quint64 fileSize = fi.size(); _item._size = fileSize; // But skip the file if the mtime is too close to 'now'! @@ -171,7 +170,6 @@ void PropagateUploadFileQNAM::start() if (modtime.msecsTo(QDateTime::currentDateTime()) < minFileAgeForUpload) { _propagator->_anotherSyncNeeded = true; done(SyncFileItem::SoftError, tr("Local file changed during sync.")); - delete _file; return; } @@ -195,7 +193,7 @@ void PropagateUploadFileQNAM::start() } UploadDevice::UploadDevice(QIODevice *file, qint64 start, qint64 size, BandwidthManager *bwm) - : _file(file), _read(0), _size(size), _start(start), + : _file(file), _size(size), _start(start), _read(0), _bandwidthManager(bwm), _bandwidthQuota(0), _readWithProgress(0), @@ -209,18 +207,33 @@ UploadDevice::~UploadDevice() { _bandwidthManager->unregisterUploadDevice(this); } +bool UploadDevice::open(OpenMode mode) +{ + Q_ASSERT(mode == QIODevice::ReadOnly); + + _data.clear(); + _size = qMin(_file->size(), _size); + _data.resize(_size); + if (!_file->seek(_start)) { + setErrorString(_file->errorString()); + return false; + } + auto read = _file->read(_data.data(), _size); + if (read != _size) { + setErrorString(_file->errorString()); + return false; + } + _read = 0; + return QIODevice::open(mode); +} + + qint64 UploadDevice::writeData(const char* , qint64 ) { Q_ASSERT(!"write to read only device"); return 0; } qint64 UploadDevice::readData(char* data, qint64 maxlen) { - if (_file.isNull()) { - qDebug() << "Upload file object deleted during upload"; - close(); - return -1; - } - _file.data()->seek(_start + _read); //qDebug() << Q_FUNC_INFO << maxlen << _read << _size << _bandwidthQuota; if (_size - _read <= 0) { // at end @@ -242,13 +255,9 @@ qint64 UploadDevice::readData(char* data, qint64 maxlen) { } _bandwidthQuota -= maxlen; } - qint64 ret = _file.data()->read(data, maxlen); - - if (ret < 0) - return -1; - _read += ret; - - return ret; + std::memcpy(data, _data.data()+_read, maxlen); + _read += maxlen; + return maxlen; } void UploadDevice::slotJobUploadProgress(qint64 sent, qint64 t) @@ -261,14 +270,7 @@ void UploadDevice::slotJobUploadProgress(qint64 sent, qint64 t) } bool UploadDevice::atEnd() const { - if (_file.isNull()) { - qDebug() << "Upload file object deleted during upload"; - return true; - } -// qDebug() << this << Q_FUNC_INFO << _read << chunkSize() -// << (_read >= chunkSize() || _file.data()->atEnd()) -// << (_read >= _size); - return _file.data()->atEnd() || (_read >= _size); + return _read >= _size; } qint64 UploadDevice::size() const{ @@ -344,6 +346,14 @@ void PropagateUploadFileQNAM::startNextChunk() } QString path = _item._file; + + QFile file(_propagator->getFilePath(_item._file), this); + if (!file.open(QIODevice::ReadOnly)) { + // Soft error because this is likely caused by the user modifying his files while syncing + abortWithError(SyncFileItem::SoftError, file.errorString()); + return; + } + UploadDevice *device = 0; if (_chunkCount > 1) { int sendingChunk = (_currentChunk + _startChunk) % _chunkCount; @@ -358,17 +368,12 @@ void PropagateUploadFileQNAM::startNextChunk() currentChunkSize = chunkSize(); } } - device = new UploadDevice(_file, chunkSize() * quint64(sendingChunk), currentChunkSize, &_propagator->_bandwidthManager); + device = new UploadDevice(&file, chunkSize() * quint64(sendingChunk), currentChunkSize, &_propagator->_bandwidthManager); } else { - device = new UploadDevice(_file, 0, fileSize, &_propagator->_bandwidthManager); + device = new UploadDevice(&file, 0, fileSize, &_propagator->_bandwidthManager); } - bool isOpen = true; - if (!device->isOpen()) { - isOpen = device->open(QIODevice::ReadOnly); - } - - if( isOpen ) { + if (device->open(QIODevice::ReadOnly)) { PUTFileJob* job = new PUTFileJob(_propagator->account(), _propagator->_remoteFolder + path, device, headers, _currentChunk); _jobs.append(job); connect(job, SIGNAL(finishedSignal()), this, SLOT(slotPutFinished())); @@ -445,14 +450,7 @@ void PropagateUploadFileQNAM::slotPutFinished() _propagator->_anotherSyncNeeded = true; } - foreach (auto job, _jobs) { - if (job->reply()) { - job->reply()->abort(); - } - } - - _finished = true; - done(classifyError(err, _item._httpErrorCode), errorString); + abortWithError(classifyError(err, _item._httpErrorCode), errorString); return; } @@ -462,7 +460,6 @@ void PropagateUploadFileQNAM::slotPutFinished() _finished = true; QString path = QString::fromUtf8(job->reply()->rawHeader("OC-Finish-Poll")); if (path.isEmpty()) { - _finished = true; done(SyncFileItem::NormalError, tr("Poll URL missing")); return; } @@ -488,8 +485,7 @@ void PropagateUploadFileQNAM::slotPutFinished() // Check if the file still exists if( !fi.exists() ) { if (!finished) { - _finished = true; - done(SyncFileItem::SoftError, tr("The local file was removed during sync.")); + abortWithError(SyncFileItem::SoftError, tr("The local file was removed during sync.")); return; } else { _propagator->_anotherSyncNeeded = true; @@ -506,8 +502,7 @@ void PropagateUploadFileQNAM::slotPutFinished() << ", QFileInfo: " << Utility::qDateTimeToTime_t(fi.lastModified()) << fi.lastModified(); _propagator->_anotherSyncNeeded = true; if( !finished ) { - _finished = true; - done(SyncFileItem::SoftError, tr("Local file changed during sync.")); + abortWithError(SyncFileItem::SoftError, tr("Local file changed during sync.")); // FIXME: the legacy code was retrying for a few seconds. // and also checking that after the last chunk, and removed the file in case of INSTRUCTION_NEW return; @@ -652,4 +647,13 @@ void PropagateUploadFileQNAM::abort() } } +// This function is used whenever there is an error occuring and jobs might be in progress +void PropagateUploadFileQNAM::abortWithError(SyncFileItem::Status status, const QString &error) +{ + _finished = true; + abort(); + done(status, error); +} + + } diff --git a/src/libsync/propagateupload.h b/src/libsync/propagateupload.h index b10a1bbfd..1bdd672e2 100644 --- a/src/libsync/propagateupload.h +++ b/src/libsync/propagateupload.h @@ -26,17 +26,9 @@ class BandwidthManager; class UploadDevice : public QIODevice { Q_OBJECT public: - QPointer _file; - qint64 _read; - qint64 _size; - qint64 _start; - BandwidthManager* _bandwidthManager; - - qint64 _bandwidthQuota; - qint64 _readWithProgress; - UploadDevice(QIODevice *file, qint64 start, qint64 size, BandwidthManager *bwm); ~UploadDevice(); + bool open(OpenMode mode) Q_DECL_OVERRIDE; qint64 writeData(const char* , qint64 ) Q_DECL_OVERRIDE; qint64 readData(char* data, qint64 maxlen) Q_DECL_OVERRIDE; bool atEnd() const Q_DECL_OVERRIDE; @@ -51,8 +43,23 @@ public: bool isChoked() { return _choked; } void giveBandwidthQuota(qint64 bwq); private: + + // Used before opening + QPointer _file; + qint64 _size; + qint64 _start; + + // Used after opening, in order to read + QByteArray _data; + qint64 _read; + + // Bandith manager related + BandwidthManager* _bandwidthManager; + qint64 _bandwidthQuota; + qint64 _readWithProgress; bool _bandwidthLimited; // if _bandwidthQuota will be used bool _choked; // if upload is paused (readData() will return 0) + friend class BandwidthManager; protected slots: void slotJobUploadProgress(qint64 sent, qint64 t); }; @@ -117,14 +124,13 @@ signals: class PropagateUploadFileQNAM : public PropagateItemJob { Q_OBJECT - QFile *_file; int _startChunk; int _currentChunk; int _chunkCount; int _transferId; QElapsedTimer _duration; QVector _jobs; - bool _finished; + bool _finished; // Tells that all the jobs have been finished public: PropagateUploadFileQNAM(OwncloudPropagator* propagator,const SyncFileItem& item) : PropagateItemJob(propagator, item), _startChunk(0), _currentChunk(0), _chunkCount(0), _transferId(0), _finished(false) {} @@ -139,6 +145,7 @@ private slots: void slotJobDestroyed(QObject *job); private: void startPollJob(const QString& path); + void abortWithError(SyncFileItem::Status status, const QString &error); }; }