Propagate upload: code cleanup

This commit is contained in:
Christian Kamm 2015-01-14 15:14:17 +01:00
parent 0ae9055ea6
commit 0af5574951
2 changed files with 79 additions and 73 deletions

View file

@ -192,8 +192,8 @@ void PropagateUploadFileQNAM::start()
this->startNextChunk();
}
UploadDevice::UploadDevice(QIODevice *file, qint64 start, qint64 size, BandwidthManager *bwm)
: _file(file), _size(size), _start(start), _read(0),
UploadDevice::UploadDevice(BandwidthManager *bwm)
: _read(0),
_bandwidthManager(bwm),
_bandwidthQuota(0),
_readWithProgress(0),
@ -209,24 +209,31 @@ UploadDevice::~UploadDevice() {
}
}
bool UploadDevice::open(OpenMode mode)
bool UploadDevice::prepareAndOpen(const QString& fileName, qint64 start, qint64 size)
{
Q_ASSERT(mode == QIODevice::ReadOnly);
_data.clear();
_size = qMin(_file->size(), _size);
_data.resize(_size);
if (!_file->seek(_start)) {
setErrorString(_file->errorString());
return false;
}
auto read = _file->read(_data.data(), _size);
if (read != _size) {
setErrorString(_file->errorString());
return false;
}
_read = 0;
return QIODevice::open(mode);
QFile file(fileName);
QString openError;
if (!FileSystem::openFileSharedRead(&file, &openError)) {
setErrorString(openError);
return false;
}
size = qMin(file.size(), size);
_data.resize(size);
if (!file.seek(start)) {
setErrorString(file.errorString());
return false;
}
auto read = file.read(_data.data(), size);
if (read != size) {
setErrorString(file.errorString());
return false;
}
return QIODevice::open(QIODevice::ReadOnly);
}
@ -237,12 +244,12 @@ qint64 UploadDevice::writeData(const char* , qint64 ) {
qint64 UploadDevice::readData(char* data, qint64 maxlen) {
//qDebug() << Q_FUNC_INFO << maxlen << _read << _size << _bandwidthQuota;
if (_size - _read <= 0) {
if (_data.size() - _read <= 0) {
// at end
_bandwidthManager->unregisterUploadDevice(this);
return -1;
}
maxlen = qMin(maxlen, _size - _read);
maxlen = qMin(maxlen, _data.size() - _read);
if (maxlen == 0) {
return 0;
}
@ -272,19 +279,19 @@ void UploadDevice::slotJobUploadProgress(qint64 sent, qint64 t)
}
bool UploadDevice::atEnd() const {
return _read >= _size;
return _read >= _data.size();
}
qint64 UploadDevice::size() const{
// qDebug() << this << Q_FUNC_INFO << _size;
return _size;
return _data.size();
}
qint64 UploadDevice::bytesAvailable() const
{
// qDebug() << this << Q_FUNC_INFO << _size << _read << QIODevice::bytesAvailable()
// << _size - _read + QIODevice::bytesAvailable();
return _size - _read + QIODevice::bytesAvailable();
return _data.size() - _read + QIODevice::bytesAvailable();
}
// random access, we can seek
@ -293,6 +300,12 @@ bool UploadDevice::isSequential() const{
}
bool UploadDevice::seek ( qint64 pos ) {
if (! QIODevice::seek(pos)) {
return false;
}
if (pos < 0 || pos > _data.size()) {
return false;
}
_read = pos;
return true;
}
@ -344,66 +357,60 @@ void PropagateUploadFileQNAM::startNextChunk()
QString path = _item._file;
QFile file(_propagator->getFilePath(_item._file), this);
QString openError;
// Warning: this clears file->fileName() and makes it unsuitable for QFileInfo!
if (!FileSystem::openFileSharedRead(&file, &openError)) {
// Soft error because this is likely caused by the user modifying his files while syncing
abortWithError(SyncFileItem::SoftError, openError);
return;
}
UploadDevice *device = 0;
UploadDevice *device = new UploadDevice(&_propagator->_bandwidthManager);
qint64 chunkStart = 0;
qint64 currentChunkSize = fileSize;
if (_chunkCount > 1) {
int sendingChunk = (_currentChunk + _startChunk) % _chunkCount;
// XOR with chunk size to make sure everything goes well if chunk size change between runs
uint transid = _transferId ^ chunkSize();
path += QString("-chunking-%1-%2-%3").arg(transid).arg(_chunkCount).arg(sendingChunk);
headers["OC-Chunked"] = "1";
int currentChunkSize = chunkSize();
chunkStart = chunkSize() * quint64(sendingChunk);
currentChunkSize = chunkSize();
if (sendingChunk == _chunkCount - 1) { // last chunk
currentChunkSize = (fileSize % chunkSize());
if( currentChunkSize == 0 ) { // if the last chunk pretents to be 0, its actually the full chunk size.
currentChunkSize = chunkSize();
}
}
device = new UploadDevice(&file, chunkSize() * quint64(sendingChunk), currentChunkSize, &_propagator->_bandwidthManager);
} else {
device = new UploadDevice(&file, 0, fileSize, &_propagator->_bandwidthManager);
}
if (device->open(QIODevice::ReadOnly)) {
PUTFileJob* job = new PUTFileJob(_propagator->account(), _propagator->_remoteFolder + path, device, headers, _currentChunk);
_jobs.append(job);
connect(job, SIGNAL(finishedSignal()), this, SLOT(slotPutFinished()));
connect(job, SIGNAL(uploadProgress(qint64,qint64)), this, SLOT(slotUploadProgress(qint64,qint64)));
connect(job, SIGNAL(uploadProgress(qint64,qint64)), device, SLOT(slotJobUploadProgress(qint64,qint64)));
connect(job, SIGNAL(destroyed(QObject*)), this, SLOT(slotJobDestroyed(QObject*)));
job->start();
_propagator->_activeJobs++;
_currentChunk++;
QByteArray env = qgetenv("OWNCLOUD_PARALLEL_CHUNK");
bool parallelChunkUpload = env!="false" && env != "0";
if (_currentChunk + _startChunk >= _chunkCount - 1) {
// Don't do parallel upload of chunk if this might be the last chunk because the server cannot handle that
// https://github.com/owncloud/core/issues/11106
parallelChunkUpload = false;
}
if (parallelChunkUpload && (_propagator->_activeJobs < _propagator->maximumActiveJob())
&& _currentChunk < _chunkCount ) {
startNextChunk();
}
if (!parallelChunkUpload || _chunkCount - _currentChunk <= 0) {
emit ready();
}
} else {
qDebug() << "ERR: Could not open upload file: " << device->errorString();
done( SyncFileItem::NormalError, device->errorString() );
if (! device->prepareAndOpen(_propagator->getFilePath(_item._file), chunkStart, currentChunkSize)) {
qDebug() << "ERR: Could not prepare upload device: " << device->errorString();
// Soft error because this is likely caused by the user modifying his files while syncing
abortWithError( SyncFileItem::SoftError, device->errorString() );
delete device;
return;
}
PUTFileJob* job = new PUTFileJob(_propagator->account(), _propagator->_remoteFolder + path, device, headers, _currentChunk);
_jobs.append(job);
connect(job, SIGNAL(finishedSignal()), this, SLOT(slotPutFinished()));
connect(job, SIGNAL(uploadProgress(qint64,qint64)), this, SLOT(slotUploadProgress(qint64,qint64)));
connect(job, SIGNAL(uploadProgress(qint64,qint64)), device, SLOT(slotJobUploadProgress(qint64,qint64)));
connect(job, SIGNAL(destroyed(QObject*)), this, SLOT(slotJobDestroyed(QObject*)));
job->start();
_propagator->_activeJobs++;
_currentChunk++;
QByteArray env = qgetenv("OWNCLOUD_PARALLEL_CHUNK");
bool parallelChunkUpload = env!="false" && env != "0";
if (_currentChunk + _startChunk >= _chunkCount - 1) {
// Don't do parallel upload of chunk if this might be the last chunk because the server cannot handle that
// https://github.com/owncloud/core/issues/11106
parallelChunkUpload = false;
}
if (parallelChunkUpload && (_propagator->_activeJobs < _propagator->maximumActiveJob())
&& _currentChunk < _chunkCount ) {
startNextChunk();
}
if (!parallelChunkUpload || _chunkCount - _currentChunk <= 0) {
emit ready();
}
}
void PropagateUploadFileQNAM::slotPutFinished()

View file

@ -26,9 +26,12 @@ class BandwidthManager;
class UploadDevice : public QIODevice {
Q_OBJECT
public:
UploadDevice(QIODevice *file, qint64 start, qint64 size, BandwidthManager *bwm);
UploadDevice(BandwidthManager *bwm);
~UploadDevice();
bool open(OpenMode mode) Q_DECL_OVERRIDE;
/** Reads the data from the file and opens the device */
bool prepareAndOpen(const QString& fileName, qint64 start, qint64 size);
qint64 writeData(const char* , qint64 ) Q_DECL_OVERRIDE;
qint64 readData(char* data, qint64 maxlen) Q_DECL_OVERRIDE;
bool atEnd() const Q_DECL_OVERRIDE;
@ -44,13 +47,9 @@ public:
void giveBandwidthQuota(qint64 bwq);
private:
// Used before opening
QPointer<QIODevice> _file;
qint64 _size;
qint64 _start;
// Used after opening, in order to read
// The file data
QByteArray _data;
// Position in the data
qint64 _read;
// Bandwidth manager related