diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 6ef4ceb4f..43f9097cd 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -86,6 +86,7 @@ set(libsync_SRCS mirall/mirallconfigfile.cpp mirall/syncengine.cpp mirall/owncloudpropagator.cpp + mirall/bandwidthmanager.cpp mirall/propagatorjobs.cpp mirall/propagator_qnam.cpp mirall/propagator_legacy.cpp diff --git a/src/mirall/bandwidthmanager.cpp b/src/mirall/bandwidthmanager.cpp new file mode 100644 index 000000000..b080a6ecc --- /dev/null +++ b/src/mirall/bandwidthmanager.cpp @@ -0,0 +1,292 @@ +/* + * Copyright (C) by Markus Goetz + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * for more details. + */ + +#include "owncloudpropagator.h" +#include "propagator_qnam.h" +#include "propagatorjobs.h" +#include "propagator_legacy.h" +#include "mirall/utility.h" + +#ifdef Q_OS_WIN +#include +#include +#endif + +#include +#include + +namespace Mirall { + +// Because of the many layers of buffering inside Qt (and probably the OS and the network) +// we cannot lower this value much more. If we do, the estimated bw will be very high +// because the buffers fill fast while the actual network algorithms are not relevant yet. +static qint64 relativeLimitMeasuringTimerIntervalMsec = 1000*2; +// See also WritingState in http://code.woboq.org/qt5/qtbase/src/network/access/qhttpprotocolhandler.cpp.html#_ZN20QHttpProtocolHandler11sendRequestEv + +// FIXME At some point: +// * Register device only after the QNR received its metaDataChanged() signal +// * Incorporate Qt buffer fill state (it's a negative absolute delta). +// * Incorporate SSL overhead (percentage) +// * For relative limiting, do less measuring and more delaying+giving quota +// * For relative limiting, smoothen measurements + +BandwidthManager::BandwidthManager(OwncloudPropagator *p) : QObject(), + _relativeLimitCurrentMeasuredDevice(0), + _relativeUploadLimitProgressAtMeasuringRestart(0), + _currentUploadLimit(0), + _currentDownloadLimit(0), + _propagator(p) +{ + _currentUploadLimit = _propagator->_uploadLimit.fetchAndAddAcquire(0); + _currentDownloadLimit = _propagator->_downloadLimit.fetchAndAddAcquire(0); + + QObject::connect(&_absoluteLimitTimer, SIGNAL(timeout()), this, SLOT(absoluteLimitTimerExpired())); + _absoluteLimitTimer.setInterval(1000); + _absoluteLimitTimer.start(); + + QObject::connect(&_switchingTimer, SIGNAL(timeout()), this, SLOT(switchingTimerExpired())); + _switchingTimer.setInterval(10*1000); + _switchingTimer.start(); + QMetaObject::invokeMethod(this, "switchingTimerExpired", Qt::QueuedConnection); + + QObject::connect(&_relativeUploadMeasuringTimer,SIGNAL(timeout()), + this, SLOT(relativeUploadMeasuringTimerExpired())); + _relativeUploadMeasuringTimer.setInterval(relativeLimitMeasuringTimerIntervalMsec); + _relativeUploadMeasuringTimer.start(); + _relativeUploadMeasuringTimer.setSingleShot(true); // will be restarted from the delay timer + QObject::connect(&_relativeUploadDelayTimer, SIGNAL(timeout()), + this, SLOT(relativeUploadDelayTimerExpired())); + _relativeUploadDelayTimer.setSingleShot(true); // will be restarted from the measuring timer +} + +void BandwidthManager::registerUploadDevice(UploadDevice *p) +{ + qDebug() << Q_FUNC_INFO << p; + _absoluteUploadDeviceList.append(p); + _relativeUploadDeviceList.append(p); + QObject::connect(p, SIGNAL(destroyed(QObject*)), this, SLOT(unregisterUploadDevice(QObject*))); + + if (usingAbsoluteUploadLimit()) { + p->setBandwidthLimited(true); + p->setChoked(false); + } else if (usingRelativeUploadLimit()) { + p->setBandwidthLimited(true); + p->setChoked(true); + } else { + p->setBandwidthLimited(false); + p->setChoked(false); + } +} + +void BandwidthManager::unregisterUploadDevice(QObject *o) +{ + UploadDevice *p = qobject_cast(o); + if (p) { + unregisterUploadDevice(p); + } +} + +void BandwidthManager::unregisterUploadDevice(UploadDevice* p) +{ + qDebug() << Q_FUNC_INFO << p; + _absoluteUploadDeviceList.removeAll(p); + _relativeUploadDeviceList.removeAll(p); + if (p == _relativeLimitCurrentMeasuredDevice) { + _relativeLimitCurrentMeasuredDevice = 0; + } +} + +void BandwidthManager::registerDownloadJob(GETFileJob* j) +{ + qDebug() << Q_FUNC_INFO << j; + _downloadJobList.append(j); + QObject::connect(j, SIGNAL(destroyed(QObject*)), this, SLOT(unregisterDownloadJob(QObject*))); + + if (usingAbsoluteDownloadLimit()) { + j->setBandwidthLimited(true); + j->setChoked(false); + } else if (usingRelativeDownloadLimit()) { + j->setBandwidthLimited(true); + j->setChoked(true); + } else { + j->setBandwidthLimited(false); + j->setChoked(false); + } +} + +void BandwidthManager::unregisterDownloadJob(GETFileJob* j) +{ + _downloadJobList.removeAll(j); +} + +void BandwidthManager::unregisterDownloadJob(QObject* o) +{ + GETFileJob *p = qobject_cast(o); + if (p) { + unregisterDownloadJob(p); + } +} + +void BandwidthManager::relativeUploadMeasuringTimerExpired() +{ + if (!usingRelativeUploadLimit()) { + // Not in this limiting mode, just wait 1 sec to continue the cycle + _relativeUploadDelayTimer.setInterval(1000); + _relativeUploadDelayTimer.start(); + return; + } + if (_relativeLimitCurrentMeasuredDevice == 0 || _relativeUploadDeviceList.count()) { + qDebug() << Q_FUNC_INFO << "No device set, just waiting 1 sec"; + _relativeUploadDelayTimer.setInterval(1000); + _relativeUploadDelayTimer.start(); + return; + } + + qDebug() << Q_FUNC_INFO << _relativeUploadDeviceList.count() << "Starting Delay"; + + qint64 relativeLimitProgressMeasured = (_relativeLimitCurrentMeasuredDevice->_readWithProgress + + _relativeLimitCurrentMeasuredDevice->_read) / 2; + qint64 relativeLimitProgressDifference = relativeLimitProgressMeasured - _relativeUploadLimitProgressAtMeasuringRestart; + qDebug() << Q_FUNC_INFO << _relativeUploadLimitProgressAtMeasuringRestart + << relativeLimitProgressMeasured << relativeLimitProgressDifference; + + qint64 speedkBPerSec = (relativeLimitProgressDifference / relativeLimitMeasuringTimerIntervalMsec*1000.0) / 1024.0; + qDebug() << Q_FUNC_INFO << relativeLimitProgressDifference/1024 <<"kB =>" << speedkBPerSec << "kB/sec on full speed (" + << _relativeLimitCurrentMeasuredDevice->_readWithProgress << _relativeLimitCurrentMeasuredDevice->_read + << qAbs(_relativeLimitCurrentMeasuredDevice->_readWithProgress + - _relativeLimitCurrentMeasuredDevice->_read) << ")"; + + qint64 uploadLimitPercent = -_currentUploadLimit; + // don't use too extreme values + uploadLimitPercent = qMin(uploadLimitPercent, qint64(90)); + uploadLimitPercent = qMax(qint64(10), uploadLimitPercent); + qint64 wholeTimeMsec = (100.0 / uploadLimitPercent) * relativeLimitMeasuringTimerIntervalMsec; + qint64 waitTimeMsec = wholeTimeMsec - relativeLimitMeasuringTimerIntervalMsec; + qint64 realWaitTimeMsec = waitTimeMsec + wholeTimeMsec; + qDebug() << Q_FUNC_INFO << waitTimeMsec << " - "<< realWaitTimeMsec << + " msec for " << uploadLimitPercent << "%"; + qDebug() << Q_FUNC_INFO << "XXXX" << uploadLimitPercent << relativeLimitMeasuringTimerIntervalMsec; + + // We want to wait twice as long since we want to give all + // devices the same quota we used now since we don't want + // any upload to timeout + _relativeUploadDelayTimer.setInterval(realWaitTimeMsec); + _relativeUploadDelayTimer.start(); + + int deviceCount = _relativeUploadDeviceList.count(); + qint64 quotaPerDevice = relativeLimitProgressDifference * (uploadLimitPercent / 100.0) / deviceCount + 1.0; + qDebug() << Q_FUNC_INFO << "YYYY" << relativeLimitProgressDifference << uploadLimitPercent << deviceCount; + Q_FOREACH(UploadDevice *ud, _relativeUploadDeviceList) { + ud->setBandwidthLimited(true); + ud->setChoked(false); + ud->giveBandwidthQuota(quotaPerDevice); + qDebug() << Q_FUNC_INFO << "Gave" << quotaPerDevice/1024.0 << "kB to" << ud; + } + _relativeLimitCurrentMeasuredDevice = 0; +} + +void BandwidthManager::relativeUploadDelayTimerExpired() +{ + // Switch to measuring state + _relativeUploadMeasuringTimer.start(); // always start to continue the cycle + + if (!usingRelativeUploadLimit()) { + return; // oh, not actually needed + } + + if (_relativeUploadDeviceList.isEmpty()) { + return; + } + + qDebug() << Q_FUNC_INFO << _relativeUploadDeviceList.count() << "Starting measuring"; + + // Take first device and then append it again (= we round robin all devices) + _relativeLimitCurrentMeasuredDevice = _relativeUploadDeviceList.takeFirst(); + _relativeUploadDeviceList.append(_relativeLimitCurrentMeasuredDevice); + + _relativeUploadLimitProgressAtMeasuringRestart = (_relativeLimitCurrentMeasuredDevice->_readWithProgress + + _relativeLimitCurrentMeasuredDevice->_read) / 2; + _relativeLimitCurrentMeasuredDevice->setBandwidthLimited(false); + _relativeLimitCurrentMeasuredDevice->setChoked(false); + + // choke all other UploadDevices + Q_FOREACH(UploadDevice *ud, _relativeUploadDeviceList) { + if (ud != _relativeLimitCurrentMeasuredDevice) { + ud->setBandwidthLimited(true); + ud->setChoked(true); + } + } + + // now we're in measuring state +} + +void BandwidthManager::switchingTimerExpired() { + qint64 newUploadLimit = _propagator->_uploadLimit.fetchAndAddAcquire(0); + if (newUploadLimit != _currentUploadLimit) { + qDebug() << Q_FUNC_INFO << "Upload Bandwidth limit changed" << _currentUploadLimit << newUploadLimit; + _currentUploadLimit = newUploadLimit; + Q_FOREACH(UploadDevice *ud, _relativeUploadDeviceList) { + if (newUploadLimit == 0) { + ud->setBandwidthLimited(false); + ud->setChoked(false); + } else if (newUploadLimit > 0) { + ud->setBandwidthLimited(true); + ud->setChoked(false); + } else if (newUploadLimit < 0) { + ud->setBandwidthLimited(true); + ud->setChoked(true); + } + } + } + qint64 newDownloadLimit = _propagator->_downloadLimit.fetchAndAddAcquire(0); + if (newDownloadLimit != _currentDownloadLimit) { + qDebug() << Q_FUNC_INFO << "Download Bandwidth limit changed" << _currentDownloadLimit << newDownloadLimit; + _currentDownloadLimit = newDownloadLimit; + Q_FOREACH(GETFileJob *j, _downloadJobList) { + if (usingAbsoluteDownloadLimit()) { + j->setBandwidthLimited(true); + j->setChoked(false); + } else if (usingRelativeDownloadLimit()) { + j->setBandwidthLimited(true); + j->setChoked(true); + } else { + j->setBandwidthLimited(false); + j->setChoked(false); + } + } + } +} + +void BandwidthManager::absoluteLimitTimerExpired() +{ + if (usingAbsoluteUploadLimit() && _absoluteUploadDeviceList.count() > 0) { + qint64 quotaPerDevice = _currentUploadLimit / qMax(1, _absoluteUploadDeviceList.count()); + qDebug() << Q_FUNC_INFO << quotaPerDevice << _absoluteUploadDeviceList.count() << _currentUploadLimit; + Q_FOREACH(UploadDevice *device, _absoluteUploadDeviceList) { + device->giveBandwidthQuota(quotaPerDevice); + qDebug() << Q_FUNC_INFO << "Gave " << quotaPerDevice/1024.0 << " kB to" << device; + } + } + if (usingAbsoluteDownloadLimit() && _downloadJobList.count() > 0) { + qint64 quotaPerJob = _currentDownloadLimit / qMax(1, _downloadJobList.count()); + qDebug() << Q_FUNC_INFO << quotaPerJob << _downloadJobList.count() << _currentDownloadLimit; + Q_FOREACH(GETFileJob *j, _downloadJobList) { + j->giveBandwidthQuota(quotaPerJob); + qDebug() << Q_FUNC_INFO << "Gave " << quotaPerJob/1024.0 << " kB to" << j; + } + } +} + + +} diff --git a/src/mirall/bandwidthmanager.h b/src/mirall/bandwidthmanager.h new file mode 100644 index 000000000..39f0d2fc2 --- /dev/null +++ b/src/mirall/bandwidthmanager.h @@ -0,0 +1,76 @@ +/* + * Copyright (C) by Markus Goetz + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * for more details. + */ + +#ifndef BANDWIDTHMANAGER_H +#define BANDWIDTHMANAGER_H + +#include +#include +#include +#include + +namespace Mirall { + +class UploadDevice; +class GETFileJob; +class OwncloudPropagator; + +class BandwidthManager : public QObject { + Q_OBJECT +public: + BandwidthManager(OwncloudPropagator *p); + + bool usingAbsoluteUploadLimit() { return _currentUploadLimit > 0; } + bool usingRelativeUploadLimit() { return _currentUploadLimit < 0; } + bool usingAbsoluteDownloadLimit() { return _currentDownloadLimit > 0; } + bool usingRelativeDownloadLimit() { return _currentDownloadLimit < 0; } + + +public slots: + void registerUploadDevice(UploadDevice*); + void unregisterUploadDevice(UploadDevice*); + void unregisterUploadDevice(QObject*); + + void registerDownloadJob(GETFileJob*); + void unregisterDownloadJob(GETFileJob*); + void unregisterDownloadJob(QObject*); + + void absoluteLimitTimerExpired(); + void switchingTimerExpired(); + + void relativeUploadMeasuringTimerExpired(); + void relativeUploadDelayTimerExpired(); + +private: + QTimer _switchingTimer; // for switching between absolute and relative bw limiting + OwncloudPropagator *_propagator; // this timer and this variable could be replaced + // by the propagator emitting the changed limit values to us as signal + + QTimer _absoluteLimitTimer; // for absolute up/down bw limiting + + QLinkedList _absoluteUploadDeviceList; + QLinkedList _relativeUploadDeviceList; // FIXME merge with list above ^^ + QTimer _relativeUploadMeasuringTimer; + QTimer _relativeUploadDelayTimer; // for relative bw limiting, we need to wait this amount before measuring again + UploadDevice *_relativeLimitCurrentMeasuredDevice; // the device measured + qint64 _relativeUploadLimitProgressAtMeasuringRestart; // for measuring how much progress we made at start + qint64 _currentUploadLimit; + + QLinkedList _downloadJobList; + qint64 _currentDownloadLimit; +}; + +} + +#endif diff --git a/src/mirall/owncloudpropagator.cpp b/src/mirall/owncloudpropagator.cpp index 3b1ca712a..b276c4f32 100644 --- a/src/mirall/owncloudpropagator.cpp +++ b/src/mirall/owncloudpropagator.cpp @@ -30,6 +30,9 @@ #include #include +#include +#include +#include namespace Mirall { @@ -317,14 +320,28 @@ bool OwncloudPropagator::isInSharedDirectory(const QString& file) */ bool OwncloudPropagator::useLegacyJobs() { - if (_downloadLimit.fetchAndAddAcquire(0) != 0 || _uploadLimit.fetchAndAddAcquire(0) != 0) { - // QNAM does not support bandwith limiting + // Allow an environement variable for debugging + QByteArray env = qgetenv("OWNCLOUD_USE_LEGACY_JOBS"); + if (env=="true" || env =="1") { return true; } - // Allow an environement variable for debugging - QByteArray env = qgetenv("OWNCLOUD_USE_LEGACY_JOBS"); - return env=="true" || env =="1"; + env = qgetenv("OWNCLOUD_NEW_BANDWIDTH_LIMITING"); + if (env=="true" || env =="1") { + // Only certain Qt versions support this at the moment. + // They need those Change-Ids: Idb1c2d5a382a704d8cc08fe03c55c883bfc95aa7 Iefbcb1a21d8aedef1eb11761232dd16a049018dc + // FIXME We need to check the Qt version and then also return false here as soon + // as mirall ships with those Qt versions on Windows and OS X + return false; + } + + if (_downloadLimit.fetchAndAddAcquire(0) != 0 || _uploadLimit.fetchAndAddAcquire(0) != 0) { + // QNAM does not support bandwith limiting + // in most Qt versions. + return true; + } + + return false; } int OwncloudPropagator::httpTimeout() diff --git a/src/mirall/owncloudpropagator.h b/src/mirall/owncloudpropagator.h index 88ab0c830..a9bcba9b0 100644 --- a/src/mirall/owncloudpropagator.h +++ b/src/mirall/owncloudpropagator.h @@ -18,10 +18,16 @@ #include #include #include -#include +#include +#include +#include +#include +#include +#include #include "syncfileitem.h" #include "syncjournaldb.h" +#include "bandwidthmanager.h" struct hbf_transfer_s; struct ne_session_s; @@ -177,6 +183,43 @@ public: } }; +class BandwidthManager; // fwd +class UploadDevice : public QIODevice { + Q_OBJECT +public: + QPointer _file; + qint64 _read; + qint64 _size; + qint64 _start; + BandwidthManager* _bandwidthManager; + + qint64 _bandwidthQuota; + qint64 _readWithProgress; + + UploadDevice(QIODevice *file, qint64 start, qint64 size, BandwidthManager *bwm); + ~UploadDevice(); + virtual qint64 writeData(const char* , qint64 ); + virtual qint64 readData(char* data, qint64 maxlen); + virtual bool atEnd() const; + virtual qint64 size() const; + qint64 bytesAvailable() const; + virtual bool isSequential() const; + virtual bool seek ( qint64 pos ); + + void setBandwidthLimited(bool); + bool isBandwidthLimited() { return _bandwidthLimited; } + void setChoked(bool); + bool isChoked() { return _choked; } + void giveBandwidthQuota(qint64 bwq); +private: + bool _bandwidthLimited; // if _bandwidthQuota will be used + bool _choked; // if upload is paused (readData() will return 0) +protected slots: + void slotJobUploadProgress(qint64 sent, qint64 t); +}; +//Q_DECLARE_METATYPE(UploadDevice); +//Q_DECLARE_METATYPE(QPointer); + class OwncloudPropagator : public QObject { Q_OBJECT @@ -198,6 +241,8 @@ public: SyncJournalDb * const _journal; bool _finishedEmited; // used to ensure that finished is only emit once + BandwidthManager _bandwidthManager; + public: OwncloudPropagator(ne_session_s *session, const QString &localDir, const QString &remoteDir, const QString &remoteFolder, SyncJournalDb *progressDb, QThread *neonThread) @@ -208,6 +253,7 @@ public: , _remoteFolder((remoteFolder.endsWith(QChar('/'))) ? remoteFolder : remoteFolder+'/' ) , _journal(progressDb) , _finishedEmited(false) + , _bandwidthManager(this) , _activeJobs(0) , _anotherSyncNeeded(false) { } diff --git a/src/mirall/propagator_qnam.cpp b/src/mirall/propagator_qnam.cpp index 723ff8979..9ff9832c3 100644 --- a/src/mirall/propagator_qnam.cpp +++ b/src/mirall/propagator_qnam.cpp @@ -65,8 +65,7 @@ void PUTFileJob::start() { req.setRawHeader(it.key(), it.value()); } - setReply(davRequest("PUT", path(), req, _device)); - _device->setParent(reply()); + setReply(davRequest("PUT", path(), req, _device.data())); setupConnections(reply()); if( reply()->error() != QNetworkReply::NoError ) { @@ -185,73 +184,142 @@ void PropagateUploadFileQNAM::start() this->startNextChunk(); } -struct ChunkDevice : QIODevice { -public: - QPointer _file; - qint64 _read; - qint64 _size; - qint64 _start; +UploadDevice::UploadDevice(QIODevice *file, qint64 start, qint64 size, BandwidthManager *bwm) + : QIODevice(file), _file(file), _read(0), _size(size), _start(start), + _bandwidthManager(bwm), + _bandwidthQuota(0), + _readWithProgress(0), + _bandwidthLimited(false), _choked(false) +{ + qDebug() << Q_FUNC_INFO << start << size << chunkSize(); + _bandwidthManager->registerUploadDevice(this); + _file = QPointer(file); +} - ChunkDevice(QIODevice *file, qint64 start, qint64 size) - : QIODevice(file), _file(file), _read(0), _size(size), _start(start) { - _file = QPointer(file); + +UploadDevice::~UploadDevice() { + _bandwidthManager->unregisterUploadDevice(this); +} + +qint64 UploadDevice::writeData(const char* , qint64 ) { + Q_ASSERT(!"write to read only device"); + return 0; +} + +qint64 UploadDevice::readData(char* data, qint64 maxlen) { + if (_file.isNull()) { + qDebug() << Q_FUNC_INFO << "Upload file object deleted during upload"; + close(); + return -1; } - - virtual qint64 writeData(const char* , qint64 ) { - Q_ASSERT(!"write to read only device"); + _file.data()->seek(_start + _read); + qDebug() << Q_FUNC_INFO << maxlen << _read << _size << _bandwidthQuota; + if (_size - _read <= 0) { + // at end + qDebug() << Q_FUNC_INFO << _read << _size << _bandwidthQuota << "at end"; + _bandwidthManager->unregisterUploadDevice(this); + return -1; + } + maxlen = qMin(maxlen, _size - _read); + if (maxlen == 0) { + qDebug() << Q_FUNC_INFO << "FUUUUUU" << maxlen << _size - _read; return 0; } - - virtual qint64 readData(char* data, qint64 maxlen) { - if (_file.isNull()) { - qDebug() << Q_FUNC_INFO << "Upload file object deleted during upload"; - close(); - return -1; - } - _file.data()->seek(_start + _read); - maxlen = qMin(maxlen, chunkSize() - _read); - if (maxlen == 0) + if (isChoked()) { + qDebug() << Q_FUNC_INFO << this << "Upload Choked"; + return 0; + } + if (isBandwidthLimited()) { + qDebug() << Q_FUNC_INFO << "BW LIMITED" << maxlen << _bandwidthQuota + << qMin(maxlen, _bandwidthQuota); + maxlen = qMin(maxlen, _bandwidthQuota); + if (maxlen <= 0) { // no quota + qDebug() << Q_FUNC_INFO << "no quota"; return 0; - qint64 ret = _file.data()->read(data, maxlen); - - if (ret < 0) - return -1; - _read += ret; - return ret; - } - - virtual bool atEnd() const { - if (_file.isNull()) { - qDebug() << Q_FUNC_INFO << "Upload file object deleted during upload"; - return true; } - return _read >= chunkSize() || _file.data()->atEnd(); + _bandwidthQuota -= maxlen; } + qDebug() << Q_FUNC_INFO << "reading limited=" << isBandwidthLimited() + << "maxlen=" << maxlen << "quota=" << _bandwidthQuota; + qint64 ret = _file.data()->read(data, maxlen); + qDebug() << Q_FUNC_INFO << "returning " << ret; - virtual qint64 size() const{ - return _size; + if (ret < 0) + return -1; + _read += ret; + qDebug() << Q_FUNC_INFO << "returning2 " << ret << _read; + + return ret; +} + +void UploadDevice::slotJobUploadProgress(qint64 sent, qint64 t) +{ + qDebug() << Q_FUNC_INFO << sent << _read << t << _size << _bandwidthQuota; + if (sent == 0 || t == 0) { + return; } + _readWithProgress = sent; +} - qint64 bytesAvailable() const - { - return _size - _read + QIODevice::bytesAvailable(); +bool UploadDevice::atEnd() const { + if (_file.isNull()) { + qDebug() << Q_FUNC_INFO << "Upload file object deleted during upload"; + return true; } + qDebug() << this << Q_FUNC_INFO << _read << chunkSize() + << (_read >= chunkSize() || _file.data()->atEnd()) + << (_read >= _size); + return _file.data()->atEnd() || (_read >= _size); +} - // random access, we can seek - virtual bool isSequential() const{ +qint64 UploadDevice::size() const{ + qDebug() << this << Q_FUNC_INFO << _size; + return _size; +} + +qint64 UploadDevice::bytesAvailable() const +{ + qDebug() << this << Q_FUNC_INFO << _size << _read << QIODevice::bytesAvailable() + << _size - _read + QIODevice::bytesAvailable(); + return _size - _read + QIODevice::bytesAvailable(); +} + +// random access, we can seek +bool UploadDevice::isSequential() const{ + return false; +} + +bool UploadDevice::seek ( qint64 pos ) { + if (_file.isNull()) { + qDebug() << Q_FUNC_INFO << "Upload file object deleted during upload"; + close(); return false; } + qDebug() << this << Q_FUNC_INFO << pos << _read; + _read = pos; + return _file.data()->seek(pos + _start); +} - virtual bool seek ( qint64 pos ) { - if (_file.isNull()) { - qDebug() << Q_FUNC_INFO << "Upload file object deleted during upload"; - close(); - return false; - } - _read = pos; - return _file.data()->seek(pos + _start); +void UploadDevice::giveBandwidthQuota(qint64 bwq) { + qDebug() << Q_FUNC_INFO << bwq; + if (!atEnd()) { + _bandwidthQuota = bwq; + qDebug() << Q_FUNC_INFO << bwq << "emitting readyRead()" << _read << _readWithProgress; + QMetaObject::invokeMethod(this, "readyRead", Qt::QueuedConnection); // tell QNAM that we have quota } -}; +} + +void UploadDevice::setBandwidthLimited(bool b) { + _bandwidthLimited = b; + QMetaObject::invokeMethod(this, "readyRead", Qt::QueuedConnection); +} + +void UploadDevice::setChoked(bool b) { + _choked = b; + if (!_choked) { + QMetaObject::invokeMethod(this, "readyRead", Qt::QueuedConnection); + } +} void PropagateUploadFileQNAM::startNextChunk() { @@ -291,7 +359,7 @@ void PropagateUploadFileQNAM::startNextChunk() } QString path = _item._file; - QIODevice *device = 0; + UploadDevice *device = 0; if (_chunkCount > 1) { int sendingChunk = (_currentChunk + _startChunk) % _chunkCount; // XOR with chunk size to make sure everything goes well if chunk size change between runs @@ -305,9 +373,9 @@ void PropagateUploadFileQNAM::startNextChunk() currentChunkSize = chunkSize(); } } - device = new ChunkDevice(_file, chunkSize() * quint64(sendingChunk), currentChunkSize); + device = new UploadDevice(_file, chunkSize() * quint64(sendingChunk), currentChunkSize, &_propagator->_bandwidthManager); } else { - device = _file; + device = new UploadDevice(_file, 0, fileSize, &_propagator->_bandwidthManager); } bool isOpen = true; @@ -321,6 +389,7 @@ void PropagateUploadFileQNAM::startNextChunk() job->setTimeout(_propagator->httpTimeout() * 1000); connect(job, SIGNAL(finishedSignal()), this, SLOT(slotPutFinished())); connect(job, SIGNAL(uploadProgress(qint64,qint64)), this, SLOT(slotUploadProgress(qint64,qint64))); + connect(_job, SIGNAL(uploadProgress(qint64,qint64)), device, SLOT(slotJobUploadProgress(qint64,qint64))); connect(job, SIGNAL(destroyed(QObject*)), this, SLOT(slotJobDestroyed(QObject*))); job->start(); _propagator->_activeJobs++; @@ -341,7 +410,6 @@ void PropagateUploadFileQNAM::startNextChunk() if (!parallelChunkUpload || _chunkCount - _currentChunk <= 0) { emitReady(); } - } else { qDebug() << "ERR: Could not open upload file: " << device->errorString(); done( SyncFileItem::NormalError, device->errorString() ); @@ -496,17 +564,18 @@ void PropagateUploadFileQNAM::finalize(const SyncFileItem ©) _propagator->_journal->setUploadInfo(_item._file, SyncJournalDb::UploadInfo()); _propagator->_journal->commit("upload file start"); + qDebug() << Q_FUNC_INFO << "msec=" <<_duration.elapsed(); done(SyncFileItem::Success); } -void PropagateUploadFileQNAM::slotUploadProgress(qint64 sent, qint64) +void PropagateUploadFileQNAM::slotUploadProgress(qint64 sent, qint64 t) { int progressChunk = _currentChunk + _startChunk - 1; if (progressChunk >= _chunkCount) progressChunk = _currentChunk - 1; quint64 amount = progressChunk * chunkSize(); sender()->setProperty("byteWritten", sent); - if (_jobs.count() == 1) { + if (_jobs.count() > 1) { amount += sent; } else { amount -= (_jobs.count() -1) * chunkSize(); @@ -568,6 +637,7 @@ GETFileJob::GETFileJob(Account* account, const QString& path, QFile *device, : AbstractNetworkJob(account, path, parent), _device(device), _headers(headers), _expectedEtagForResume(expectedEtagForResume), _resumeStart(_resumeStart) , _errorStatus(SyncFileItem::NoStatus) +, _bandwidthLimited(false), _bandwidthChoked(false), _bandwidthQuota(0), _bandwidthManager(0) { } @@ -577,6 +647,7 @@ GETFileJob::GETFileJob(Account* account, const QUrl& url, QFile *device, : AbstractNetworkJob(account, url.toEncoded(), parent), _device(device), _headers(headers), _resumeStart(0), _errorStatus(SyncFileItem::NoStatus), _directDownloadUrl(url) +, _bandwidthLimited(false), _bandwidthChoked(false), _bandwidthQuota(0), _bandwidthManager(0) { } @@ -594,7 +665,11 @@ void GETFileJob::start() { setReply(davRequest("GET", _directDownloadUrl, req)); } setupConnections(reply()); - reply()->setReadBufferSize(128 * 1024); + + reply()->setReadBufferSize(16 * 1024); // keep low so we can easier limit the bandwidth + if (_bandwidthManager) { + _bandwidthManager->registerDownloadJob(this); + } if( reply()->error() != QNetworkReply::NoError ) { qWarning() << Q_FUNC_INFO << " Network error: " << reply()->errorString(); @@ -609,6 +684,10 @@ void GETFileJob::start() { void GETFileJob::slotMetaDataChanged() { + // For some reason setting the read buffer in GETFileJob::start doesn't seem to go + // through the HTTP layer thread(?) + reply()->setReadBufferSize(16 * 1024); + if (reply()->error() != QNetworkReply::NoError || reply()->attribute(QNetworkRequest::HttpStatusCodeAttribute).toInt() / 100 != 2) { // We will handle the error when the job is finished. @@ -665,13 +744,55 @@ void GETFileJob::slotMetaDataChanged() } +void GETFileJob::setBandwidthManager(BandwidthManager *bwm) +{ + _bandwidthManager = bwm; +} + +void GETFileJob::setChoked(bool c) +{ + _bandwidthChoked = c; + QMetaObject::invokeMethod(this, "slotReadyRead", Qt::QueuedConnection); +} + +void GETFileJob::setBandwidthLimited(bool b) +{ + _bandwidthLimited = b; + QMetaObject::invokeMethod(this, "slotReadyRead", Qt::QueuedConnection); +} + +void GETFileJob::giveBandwidthQuota(qint64 q) +{ + _bandwidthQuota = q; + qDebug() << Q_FUNC_INFO << "Got" << q << "bytes"; + QMetaObject::invokeMethod(this, "slotReadyRead", Qt::QueuedConnection); +} + void GETFileJob::slotReadyRead() { int bufferSize = qMin(1024*8ll , reply()->bytesAvailable()); QByteArray buffer(bufferSize, Qt::Uninitialized); + qDebug() << Q_FUNC_INFO << reply()->bytesAvailable() << reply()->isOpen() << reply()->isFinished(); + //return; + while(reply()->bytesAvailable() > 0) { - qint64 r = reply()->read(buffer.data(), bufferSize); + if (_bandwidthChoked) { + qDebug() << Q_FUNC_INFO << "Download choked"; + break; + } + qint64 toRead = bufferSize; + if (_bandwidthLimited) { + toRead = qMin(qint64(bufferSize), _bandwidthQuota); + if (toRead == 0) { + qDebug() << Q_FUNC_INFO << "Out of quota"; + break; + } + _bandwidthQuota -= toRead; + qDebug() << Q_FUNC_INFO << "Reading" << toRead << "remaining" << _bandwidthQuota; + } + + qint64 r = reply()->read(buffer.data(), toRead); if (r < 0) { _errorString = reply()->errorString(); _errorStatus = SyncFileItem::NormalError; @@ -690,6 +811,18 @@ void GETFileJob::slotReadyRead() } } resetTimeout(); + + if (reply()->isFinished() && reply()->bytesAvailable() == 0) { + qDebug() << Q_FUNC_INFO << "Actually finished!"; + if (_bandwidthManager) { + _bandwidthManager->unregisterDownloadJob(this); + } + if (!_hasEmittedFinishedSignal) { + emit finishedSignal(); + } + _hasEmittedFinishedSignal = true; + deleteLater(); + } } void GETFileJob::slotTimeout() @@ -788,6 +921,7 @@ void PropagateDownloadFileQNAM::start() &_tmpFile, headers); qDebug() << Q_FUNC_INFO << "directDownloadUrl given for " << _item._file << _item._directDownloadUrl << headers["Cookie"]; } + _job->setBandwidthManager(&_propagator->_bandwidthManager); _job->setTimeout(_propagator->httpTimeout() * 1000); connect(_job, SIGNAL(finishedSignal()), this, SLOT(slotGetFinished())); connect(_job, SIGNAL(downloadProgress(qint64,qint64)), this, SLOT(slotDownloadProgress(qint64,qint64))); @@ -911,4 +1045,5 @@ void PropagateDownloadFileQNAM::abort() _job->reply()->abort(); } + } diff --git a/src/mirall/propagator_qnam.h b/src/mirall/propagator_qnam.h index ffc54dd35..0cd516441 100644 --- a/src/mirall/propagator_qnam.h +++ b/src/mirall/propagator_qnam.h @@ -51,7 +51,7 @@ public: class PUTFileJob : public AbstractNetworkJob { Q_OBJECT - QIODevice* _device; + QSharedPointer _device; QMap _headers; QString _errorString; @@ -144,6 +144,11 @@ class GETFileJob : public AbstractNetworkJob { SyncFileItem::Status _errorStatus; QUrl _directDownloadUrl; QByteArray _etag; + bool _bandwidthLimited; // if _bandwidthQuota will be used + bool _bandwidthChoked; // if download is paused (won't read on readyRead()) + qint64 _bandwidthQuota; + BandwidthManager *_bandwidthManager; + bool _hasEmittedFinishedSignal; public: // DOES NOT take owncership of the device. @@ -154,13 +159,34 @@ public: explicit GETFileJob(Account* account, const QUrl& url, QFile *device, const QMap &headers, QObject* parent = 0); + virtual ~GETFileJob() { + if (_bandwidthManager) { + _bandwidthManager->unregisterDownloadJob(this); + } + } virtual void start(); virtual bool finished() { - emit finishedSignal(); - return true; + if (reply()->bytesAvailable()) { + qDebug() << Q_FUNC_INFO << "Not all read yet because of bandwidth limits"; + return false; + } else { + if (_bandwidthManager) { + _bandwidthManager->unregisterDownloadJob(this); + } + if (!_hasEmittedFinishedSignal) { + emit finishedSignal(); + } + _hasEmittedFinishedSignal = true; + return true; // discard + } } + void setBandwidthManager(BandwidthManager *bwm); + void setChoked(bool c); + void setBandwidthLimited(bool b); + void giveBandwidthQuota(qint64 q); + QString errorString() { return _errorString.isEmpty() ? reply()->errorString() : _errorString; }; @@ -199,6 +225,4 @@ private slots: void slotDownloadProgress(qint64,qint64); }; - - }