Propagator: Bandwidth limiting for new propagator

This commit is contained in:
Markus Goetz 2014-09-29 10:30:39 +02:00
parent 5471bfd5a8
commit de79f9338a
7 changed files with 663 additions and 72 deletions

View file

@ -86,6 +86,7 @@ set(libsync_SRCS
mirall/mirallconfigfile.cpp mirall/mirallconfigfile.cpp
mirall/syncengine.cpp mirall/syncengine.cpp
mirall/owncloudpropagator.cpp mirall/owncloudpropagator.cpp
mirall/bandwidthmanager.cpp
mirall/propagatorjobs.cpp mirall/propagatorjobs.cpp
mirall/propagator_qnam.cpp mirall/propagator_qnam.cpp
mirall/propagator_legacy.cpp mirall/propagator_legacy.cpp

View file

@ -0,0 +1,292 @@
/*
* Copyright (C) by Markus Goetz <markus@woboq.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* for more details.
*/
#include "owncloudpropagator.h"
#include "propagator_qnam.h"
#include "propagatorjobs.h"
#include "propagator_legacy.h"
#include "mirall/utility.h"
#ifdef Q_OS_WIN
#include <windef.h>
#include <winbase.h>
#endif
#include <QTimer>
#include <QObject>
namespace Mirall {
// Because of the many layers of buffering inside Qt (and probably the OS and the network)
// we cannot lower this value much more. If we do, the estimated bw will be very high
// because the buffers fill fast while the actual network algorithms are not relevant yet.
static qint64 relativeLimitMeasuringTimerIntervalMsec = 1000*2;
// See also WritingState in http://code.woboq.org/qt5/qtbase/src/network/access/qhttpprotocolhandler.cpp.html#_ZN20QHttpProtocolHandler11sendRequestEv
// FIXME At some point:
// * Register device only after the QNR received its metaDataChanged() signal
// * Incorporate Qt buffer fill state (it's a negative absolute delta).
// * Incorporate SSL overhead (percentage)
// * For relative limiting, do less measuring and more delaying+giving quota
// * For relative limiting, smoothen measurements
BandwidthManager::BandwidthManager(OwncloudPropagator *p) : QObject(),
_relativeLimitCurrentMeasuredDevice(0),
_relativeUploadLimitProgressAtMeasuringRestart(0),
_currentUploadLimit(0),
_currentDownloadLimit(0),
_propagator(p)
{
_currentUploadLimit = _propagator->_uploadLimit.fetchAndAddAcquire(0);
_currentDownloadLimit = _propagator->_downloadLimit.fetchAndAddAcquire(0);
QObject::connect(&_absoluteLimitTimer, SIGNAL(timeout()), this, SLOT(absoluteLimitTimerExpired()));
_absoluteLimitTimer.setInterval(1000);
_absoluteLimitTimer.start();
QObject::connect(&_switchingTimer, SIGNAL(timeout()), this, SLOT(switchingTimerExpired()));
_switchingTimer.setInterval(10*1000);
_switchingTimer.start();
QMetaObject::invokeMethod(this, "switchingTimerExpired", Qt::QueuedConnection);
QObject::connect(&_relativeUploadMeasuringTimer,SIGNAL(timeout()),
this, SLOT(relativeUploadMeasuringTimerExpired()));
_relativeUploadMeasuringTimer.setInterval(relativeLimitMeasuringTimerIntervalMsec);
_relativeUploadMeasuringTimer.start();
_relativeUploadMeasuringTimer.setSingleShot(true); // will be restarted from the delay timer
QObject::connect(&_relativeUploadDelayTimer, SIGNAL(timeout()),
this, SLOT(relativeUploadDelayTimerExpired()));
_relativeUploadDelayTimer.setSingleShot(true); // will be restarted from the measuring timer
}
void BandwidthManager::registerUploadDevice(UploadDevice *p)
{
qDebug() << Q_FUNC_INFO << p;
_absoluteUploadDeviceList.append(p);
_relativeUploadDeviceList.append(p);
QObject::connect(p, SIGNAL(destroyed(QObject*)), this, SLOT(unregisterUploadDevice(QObject*)));
if (usingAbsoluteUploadLimit()) {
p->setBandwidthLimited(true);
p->setChoked(false);
} else if (usingRelativeUploadLimit()) {
p->setBandwidthLimited(true);
p->setChoked(true);
} else {
p->setBandwidthLimited(false);
p->setChoked(false);
}
}
void BandwidthManager::unregisterUploadDevice(QObject *o)
{
UploadDevice *p = qobject_cast<UploadDevice*>(o);
if (p) {
unregisterUploadDevice(p);
}
}
void BandwidthManager::unregisterUploadDevice(UploadDevice* p)
{
qDebug() << Q_FUNC_INFO << p;
_absoluteUploadDeviceList.removeAll(p);
_relativeUploadDeviceList.removeAll(p);
if (p == _relativeLimitCurrentMeasuredDevice) {
_relativeLimitCurrentMeasuredDevice = 0;
}
}
void BandwidthManager::registerDownloadJob(GETFileJob* j)
{
qDebug() << Q_FUNC_INFO << j;
_downloadJobList.append(j);
QObject::connect(j, SIGNAL(destroyed(QObject*)), this, SLOT(unregisterDownloadJob(QObject*)));
if (usingAbsoluteDownloadLimit()) {
j->setBandwidthLimited(true);
j->setChoked(false);
} else if (usingRelativeDownloadLimit()) {
j->setBandwidthLimited(true);
j->setChoked(true);
} else {
j->setBandwidthLimited(false);
j->setChoked(false);
}
}
void BandwidthManager::unregisterDownloadJob(GETFileJob* j)
{
_downloadJobList.removeAll(j);
}
void BandwidthManager::unregisterDownloadJob(QObject* o)
{
GETFileJob *p = qobject_cast<GETFileJob*>(o);
if (p) {
unregisterDownloadJob(p);
}
}
void BandwidthManager::relativeUploadMeasuringTimerExpired()
{
if (!usingRelativeUploadLimit()) {
// Not in this limiting mode, just wait 1 sec to continue the cycle
_relativeUploadDelayTimer.setInterval(1000);
_relativeUploadDelayTimer.start();
return;
}
if (_relativeLimitCurrentMeasuredDevice == 0 || _relativeUploadDeviceList.count()) {
qDebug() << Q_FUNC_INFO << "No device set, just waiting 1 sec";
_relativeUploadDelayTimer.setInterval(1000);
_relativeUploadDelayTimer.start();
return;
}
qDebug() << Q_FUNC_INFO << _relativeUploadDeviceList.count() << "Starting Delay";
qint64 relativeLimitProgressMeasured = (_relativeLimitCurrentMeasuredDevice->_readWithProgress
+ _relativeLimitCurrentMeasuredDevice->_read) / 2;
qint64 relativeLimitProgressDifference = relativeLimitProgressMeasured - _relativeUploadLimitProgressAtMeasuringRestart;
qDebug() << Q_FUNC_INFO << _relativeUploadLimitProgressAtMeasuringRestart
<< relativeLimitProgressMeasured << relativeLimitProgressDifference;
qint64 speedkBPerSec = (relativeLimitProgressDifference / relativeLimitMeasuringTimerIntervalMsec*1000.0) / 1024.0;
qDebug() << Q_FUNC_INFO << relativeLimitProgressDifference/1024 <<"kB =>" << speedkBPerSec << "kB/sec on full speed ("
<< _relativeLimitCurrentMeasuredDevice->_readWithProgress << _relativeLimitCurrentMeasuredDevice->_read
<< qAbs(_relativeLimitCurrentMeasuredDevice->_readWithProgress
- _relativeLimitCurrentMeasuredDevice->_read) << ")";
qint64 uploadLimitPercent = -_currentUploadLimit;
// don't use too extreme values
uploadLimitPercent = qMin(uploadLimitPercent, qint64(90));
uploadLimitPercent = qMax(qint64(10), uploadLimitPercent);
qint64 wholeTimeMsec = (100.0 / uploadLimitPercent) * relativeLimitMeasuringTimerIntervalMsec;
qint64 waitTimeMsec = wholeTimeMsec - relativeLimitMeasuringTimerIntervalMsec;
qint64 realWaitTimeMsec = waitTimeMsec + wholeTimeMsec;
qDebug() << Q_FUNC_INFO << waitTimeMsec << " - "<< realWaitTimeMsec <<
" msec for " << uploadLimitPercent << "%";
qDebug() << Q_FUNC_INFO << "XXXX" << uploadLimitPercent << relativeLimitMeasuringTimerIntervalMsec;
// We want to wait twice as long since we want to give all
// devices the same quota we used now since we don't want
// any upload to timeout
_relativeUploadDelayTimer.setInterval(realWaitTimeMsec);
_relativeUploadDelayTimer.start();
int deviceCount = _relativeUploadDeviceList.count();
qint64 quotaPerDevice = relativeLimitProgressDifference * (uploadLimitPercent / 100.0) / deviceCount + 1.0;
qDebug() << Q_FUNC_INFO << "YYYY" << relativeLimitProgressDifference << uploadLimitPercent << deviceCount;
Q_FOREACH(UploadDevice *ud, _relativeUploadDeviceList) {
ud->setBandwidthLimited(true);
ud->setChoked(false);
ud->giveBandwidthQuota(quotaPerDevice);
qDebug() << Q_FUNC_INFO << "Gave" << quotaPerDevice/1024.0 << "kB to" << ud;
}
_relativeLimitCurrentMeasuredDevice = 0;
}
void BandwidthManager::relativeUploadDelayTimerExpired()
{
// Switch to measuring state
_relativeUploadMeasuringTimer.start(); // always start to continue the cycle
if (!usingRelativeUploadLimit()) {
return; // oh, not actually needed
}
if (_relativeUploadDeviceList.isEmpty()) {
return;
}
qDebug() << Q_FUNC_INFO << _relativeUploadDeviceList.count() << "Starting measuring";
// Take first device and then append it again (= we round robin all devices)
_relativeLimitCurrentMeasuredDevice = _relativeUploadDeviceList.takeFirst();
_relativeUploadDeviceList.append(_relativeLimitCurrentMeasuredDevice);
_relativeUploadLimitProgressAtMeasuringRestart = (_relativeLimitCurrentMeasuredDevice->_readWithProgress
+ _relativeLimitCurrentMeasuredDevice->_read) / 2;
_relativeLimitCurrentMeasuredDevice->setBandwidthLimited(false);
_relativeLimitCurrentMeasuredDevice->setChoked(false);
// choke all other UploadDevices
Q_FOREACH(UploadDevice *ud, _relativeUploadDeviceList) {
if (ud != _relativeLimitCurrentMeasuredDevice) {
ud->setBandwidthLimited(true);
ud->setChoked(true);
}
}
// now we're in measuring state
}
void BandwidthManager::switchingTimerExpired() {
qint64 newUploadLimit = _propagator->_uploadLimit.fetchAndAddAcquire(0);
if (newUploadLimit != _currentUploadLimit) {
qDebug() << Q_FUNC_INFO << "Upload Bandwidth limit changed" << _currentUploadLimit << newUploadLimit;
_currentUploadLimit = newUploadLimit;
Q_FOREACH(UploadDevice *ud, _relativeUploadDeviceList) {
if (newUploadLimit == 0) {
ud->setBandwidthLimited(false);
ud->setChoked(false);
} else if (newUploadLimit > 0) {
ud->setBandwidthLimited(true);
ud->setChoked(false);
} else if (newUploadLimit < 0) {
ud->setBandwidthLimited(true);
ud->setChoked(true);
}
}
}
qint64 newDownloadLimit = _propagator->_downloadLimit.fetchAndAddAcquire(0);
if (newDownloadLimit != _currentDownloadLimit) {
qDebug() << Q_FUNC_INFO << "Download Bandwidth limit changed" << _currentDownloadLimit << newDownloadLimit;
_currentDownloadLimit = newDownloadLimit;
Q_FOREACH(GETFileJob *j, _downloadJobList) {
if (usingAbsoluteDownloadLimit()) {
j->setBandwidthLimited(true);
j->setChoked(false);
} else if (usingRelativeDownloadLimit()) {
j->setBandwidthLimited(true);
j->setChoked(true);
} else {
j->setBandwidthLimited(false);
j->setChoked(false);
}
}
}
}
void BandwidthManager::absoluteLimitTimerExpired()
{
if (usingAbsoluteUploadLimit() && _absoluteUploadDeviceList.count() > 0) {
qint64 quotaPerDevice = _currentUploadLimit / qMax(1, _absoluteUploadDeviceList.count());
qDebug() << Q_FUNC_INFO << quotaPerDevice << _absoluteUploadDeviceList.count() << _currentUploadLimit;
Q_FOREACH(UploadDevice *device, _absoluteUploadDeviceList) {
device->giveBandwidthQuota(quotaPerDevice);
qDebug() << Q_FUNC_INFO << "Gave " << quotaPerDevice/1024.0 << " kB to" << device;
}
}
if (usingAbsoluteDownloadLimit() && _downloadJobList.count() > 0) {
qint64 quotaPerJob = _currentDownloadLimit / qMax(1, _downloadJobList.count());
qDebug() << Q_FUNC_INFO << quotaPerJob << _downloadJobList.count() << _currentDownloadLimit;
Q_FOREACH(GETFileJob *j, _downloadJobList) {
j->giveBandwidthQuota(quotaPerJob);
qDebug() << Q_FUNC_INFO << "Gave " << quotaPerJob/1024.0 << " kB to" << j;
}
}
}
}

View file

@ -0,0 +1,76 @@
/*
* Copyright (C) by Markus Goetz <markus@woboq.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* for more details.
*/
#ifndef BANDWIDTHMANAGER_H
#define BANDWIDTHMANAGER_H
#include <QObject>
#include <QLinkedList>
#include <QTimer>
#include <QIODevice>
namespace Mirall {
class UploadDevice;
class GETFileJob;
class OwncloudPropagator;
class BandwidthManager : public QObject {
Q_OBJECT
public:
BandwidthManager(OwncloudPropagator *p);
bool usingAbsoluteUploadLimit() { return _currentUploadLimit > 0; }
bool usingRelativeUploadLimit() { return _currentUploadLimit < 0; }
bool usingAbsoluteDownloadLimit() { return _currentDownloadLimit > 0; }
bool usingRelativeDownloadLimit() { return _currentDownloadLimit < 0; }
public slots:
void registerUploadDevice(UploadDevice*);
void unregisterUploadDevice(UploadDevice*);
void unregisterUploadDevice(QObject*);
void registerDownloadJob(GETFileJob*);
void unregisterDownloadJob(GETFileJob*);
void unregisterDownloadJob(QObject*);
void absoluteLimitTimerExpired();
void switchingTimerExpired();
void relativeUploadMeasuringTimerExpired();
void relativeUploadDelayTimerExpired();
private:
QTimer _switchingTimer; // for switching between absolute and relative bw limiting
OwncloudPropagator *_propagator; // this timer and this variable could be replaced
// by the propagator emitting the changed limit values to us as signal
QTimer _absoluteLimitTimer; // for absolute up/down bw limiting
QLinkedList<UploadDevice*> _absoluteUploadDeviceList;
QLinkedList<UploadDevice*> _relativeUploadDeviceList; // FIXME merge with list above ^^
QTimer _relativeUploadMeasuringTimer;
QTimer _relativeUploadDelayTimer; // for relative bw limiting, we need to wait this amount before measuring again
UploadDevice *_relativeLimitCurrentMeasuredDevice; // the device measured
qint64 _relativeUploadLimitProgressAtMeasuringRestart; // for measuring how much progress we made at start
qint64 _currentUploadLimit;
QLinkedList<GETFileJob*> _downloadJobList;
qint64 _currentDownloadLimit;
};
}
#endif

View file

@ -30,6 +30,9 @@
#include <QStack> #include <QStack>
#include <QFileInfo> #include <QFileInfo>
#include <QTimer>
#include <QObject>
#include <QTimerEvent>
namespace Mirall { namespace Mirall {
@ -317,14 +320,28 @@ bool OwncloudPropagator::isInSharedDirectory(const QString& file)
*/ */
bool OwncloudPropagator::useLegacyJobs() bool OwncloudPropagator::useLegacyJobs()
{ {
if (_downloadLimit.fetchAndAddAcquire(0) != 0 || _uploadLimit.fetchAndAddAcquire(0) != 0) { // Allow an environement variable for debugging
// QNAM does not support bandwith limiting QByteArray env = qgetenv("OWNCLOUD_USE_LEGACY_JOBS");
if (env=="true" || env =="1") {
return true; return true;
} }
// Allow an environement variable for debugging env = qgetenv("OWNCLOUD_NEW_BANDWIDTH_LIMITING");
QByteArray env = qgetenv("OWNCLOUD_USE_LEGACY_JOBS"); if (env=="true" || env =="1") {
return env=="true" || env =="1"; // Only certain Qt versions support this at the moment.
// They need those Change-Ids: Idb1c2d5a382a704d8cc08fe03c55c883bfc95aa7 Iefbcb1a21d8aedef1eb11761232dd16a049018dc
// FIXME We need to check the Qt version and then also return false here as soon
// as mirall ships with those Qt versions on Windows and OS X
return false;
}
if (_downloadLimit.fetchAndAddAcquire(0) != 0 || _uploadLimit.fetchAndAddAcquire(0) != 0) {
// QNAM does not support bandwith limiting
// in most Qt versions.
return true;
}
return false;
} }
int OwncloudPropagator::httpTimeout() int OwncloudPropagator::httpTimeout()

View file

@ -18,10 +18,16 @@
#include <neon/ne_request.h> #include <neon/ne_request.h>
#include <QHash> #include <QHash>
#include <QObject> #include <QObject>
#include <qelapsedtimer.h> #include <QMap>
#include <QLinkedList>
#include <QElapsedTimer>
#include <QTimer>
#include <QPointer>
#include <QIODevice>
#include "syncfileitem.h" #include "syncfileitem.h"
#include "syncjournaldb.h" #include "syncjournaldb.h"
#include "bandwidthmanager.h"
struct hbf_transfer_s; struct hbf_transfer_s;
struct ne_session_s; struct ne_session_s;
@ -177,6 +183,43 @@ public:
} }
}; };
class BandwidthManager; // fwd
class UploadDevice : public QIODevice {
Q_OBJECT
public:
QPointer<QIODevice> _file;
qint64 _read;
qint64 _size;
qint64 _start;
BandwidthManager* _bandwidthManager;
qint64 _bandwidthQuota;
qint64 _readWithProgress;
UploadDevice(QIODevice *file, qint64 start, qint64 size, BandwidthManager *bwm);
~UploadDevice();
virtual qint64 writeData(const char* , qint64 );
virtual qint64 readData(char* data, qint64 maxlen);
virtual bool atEnd() const;
virtual qint64 size() const;
qint64 bytesAvailable() const;
virtual bool isSequential() const;
virtual bool seek ( qint64 pos );
void setBandwidthLimited(bool);
bool isBandwidthLimited() { return _bandwidthLimited; }
void setChoked(bool);
bool isChoked() { return _choked; }
void giveBandwidthQuota(qint64 bwq);
private:
bool _bandwidthLimited; // if _bandwidthQuota will be used
bool _choked; // if upload is paused (readData() will return 0)
protected slots:
void slotJobUploadProgress(qint64 sent, qint64 t);
};
//Q_DECLARE_METATYPE(UploadDevice);
//Q_DECLARE_METATYPE(QPointer<UploadDevice>);
class OwncloudPropagator : public QObject { class OwncloudPropagator : public QObject {
Q_OBJECT Q_OBJECT
@ -198,6 +241,8 @@ public:
SyncJournalDb * const _journal; SyncJournalDb * const _journal;
bool _finishedEmited; // used to ensure that finished is only emit once bool _finishedEmited; // used to ensure that finished is only emit once
BandwidthManager _bandwidthManager;
public: public:
OwncloudPropagator(ne_session_s *session, const QString &localDir, const QString &remoteDir, const QString &remoteFolder, OwncloudPropagator(ne_session_s *session, const QString &localDir, const QString &remoteDir, const QString &remoteFolder,
SyncJournalDb *progressDb, QThread *neonThread) SyncJournalDb *progressDb, QThread *neonThread)
@ -208,6 +253,7 @@ public:
, _remoteFolder((remoteFolder.endsWith(QChar('/'))) ? remoteFolder : remoteFolder+'/' ) , _remoteFolder((remoteFolder.endsWith(QChar('/'))) ? remoteFolder : remoteFolder+'/' )
, _journal(progressDb) , _journal(progressDb)
, _finishedEmited(false) , _finishedEmited(false)
, _bandwidthManager(this)
, _activeJobs(0) , _activeJobs(0)
, _anotherSyncNeeded(false) , _anotherSyncNeeded(false)
{ } { }

View file

@ -65,8 +65,7 @@ void PUTFileJob::start() {
req.setRawHeader(it.key(), it.value()); req.setRawHeader(it.key(), it.value());
} }
setReply(davRequest("PUT", path(), req, _device)); setReply(davRequest("PUT", path(), req, _device.data()));
_device->setParent(reply());
setupConnections(reply()); setupConnections(reply());
if( reply()->error() != QNetworkReply::NoError ) { if( reply()->error() != QNetworkReply::NoError ) {
@ -185,73 +184,142 @@ void PropagateUploadFileQNAM::start()
this->startNextChunk(); this->startNextChunk();
} }
struct ChunkDevice : QIODevice { UploadDevice::UploadDevice(QIODevice *file, qint64 start, qint64 size, BandwidthManager *bwm)
public: : QIODevice(file), _file(file), _read(0), _size(size), _start(start),
QPointer<QIODevice> _file; _bandwidthManager(bwm),
qint64 _read; _bandwidthQuota(0),
qint64 _size; _readWithProgress(0),
qint64 _start; _bandwidthLimited(false), _choked(false)
{
ChunkDevice(QIODevice *file, qint64 start, qint64 size) qDebug() << Q_FUNC_INFO << start << size << chunkSize();
: QIODevice(file), _file(file), _read(0), _size(size), _start(start) { _bandwidthManager->registerUploadDevice(this);
_file = QPointer<QIODevice>(file); _file = QPointer<QIODevice>(file);
} }
virtual qint64 writeData(const char* , qint64 ) {
UploadDevice::~UploadDevice() {
_bandwidthManager->unregisterUploadDevice(this);
}
qint64 UploadDevice::writeData(const char* , qint64 ) {
Q_ASSERT(!"write to read only device"); Q_ASSERT(!"write to read only device");
return 0; return 0;
} }
virtual qint64 readData(char* data, qint64 maxlen) { qint64 UploadDevice::readData(char* data, qint64 maxlen) {
if (_file.isNull()) { if (_file.isNull()) {
qDebug() << Q_FUNC_INFO << "Upload file object deleted during upload"; qDebug() << Q_FUNC_INFO << "Upload file object deleted during upload";
close(); close();
return -1; return -1;
} }
_file.data()->seek(_start + _read); _file.data()->seek(_start + _read);
maxlen = qMin(maxlen, chunkSize() - _read); qDebug() << Q_FUNC_INFO << maxlen << _read << _size << _bandwidthQuota;
if (maxlen == 0) if (_size - _read <= 0) {
// at end
qDebug() << Q_FUNC_INFO << _read << _size << _bandwidthQuota << "at end";
_bandwidthManager->unregisterUploadDevice(this);
return -1;
}
maxlen = qMin(maxlen, _size - _read);
if (maxlen == 0) {
qDebug() << Q_FUNC_INFO << "FUUUUUU" << maxlen << _size - _read;
return 0; return 0;
}
if (isChoked()) {
qDebug() << Q_FUNC_INFO << this << "Upload Choked";
return 0;
}
if (isBandwidthLimited()) {
qDebug() << Q_FUNC_INFO << "BW LIMITED" << maxlen << _bandwidthQuota
<< qMin(maxlen, _bandwidthQuota);
maxlen = qMin(maxlen, _bandwidthQuota);
if (maxlen <= 0) { // no quota
qDebug() << Q_FUNC_INFO << "no quota";
return 0;
}
_bandwidthQuota -= maxlen;
}
qDebug() << Q_FUNC_INFO << "reading limited=" << isBandwidthLimited()
<< "maxlen=" << maxlen << "quota=" << _bandwidthQuota;
qint64 ret = _file.data()->read(data, maxlen); qint64 ret = _file.data()->read(data, maxlen);
qDebug() << Q_FUNC_INFO << "returning " << ret;
if (ret < 0) if (ret < 0)
return -1; return -1;
_read += ret; _read += ret;
qDebug() << Q_FUNC_INFO << "returning2 " << ret << _read;
return ret; return ret;
} }
virtual bool atEnd() const { void UploadDevice::slotJobUploadProgress(qint64 sent, qint64 t)
{
qDebug() << Q_FUNC_INFO << sent << _read << t << _size << _bandwidthQuota;
if (sent == 0 || t == 0) {
return;
}
_readWithProgress = sent;
}
bool UploadDevice::atEnd() const {
if (_file.isNull()) { if (_file.isNull()) {
qDebug() << Q_FUNC_INFO << "Upload file object deleted during upload"; qDebug() << Q_FUNC_INFO << "Upload file object deleted during upload";
return true; return true;
} }
return _read >= chunkSize() || _file.data()->atEnd(); qDebug() << this << Q_FUNC_INFO << _read << chunkSize()
<< (_read >= chunkSize() || _file.data()->atEnd())
<< (_read >= _size);
return _file.data()->atEnd() || (_read >= _size);
} }
virtual qint64 size() const{ qint64 UploadDevice::size() const{
qDebug() << this << Q_FUNC_INFO << _size;
return _size; return _size;
} }
qint64 bytesAvailable() const qint64 UploadDevice::bytesAvailable() const
{ {
qDebug() << this << Q_FUNC_INFO << _size << _read << QIODevice::bytesAvailable()
<< _size - _read + QIODevice::bytesAvailable();
return _size - _read + QIODevice::bytesAvailable(); return _size - _read + QIODevice::bytesAvailable();
} }
// random access, we can seek // random access, we can seek
virtual bool isSequential() const{ bool UploadDevice::isSequential() const{
return false; return false;
} }
virtual bool seek ( qint64 pos ) { bool UploadDevice::seek ( qint64 pos ) {
if (_file.isNull()) { if (_file.isNull()) {
qDebug() << Q_FUNC_INFO << "Upload file object deleted during upload"; qDebug() << Q_FUNC_INFO << "Upload file object deleted during upload";
close(); close();
return false; return false;
} }
qDebug() << this << Q_FUNC_INFO << pos << _read;
_read = pos; _read = pos;
return _file.data()->seek(pos + _start); return _file.data()->seek(pos + _start);
} }
};
void UploadDevice::giveBandwidthQuota(qint64 bwq) {
qDebug() << Q_FUNC_INFO << bwq;
if (!atEnd()) {
_bandwidthQuota = bwq;
qDebug() << Q_FUNC_INFO << bwq << "emitting readyRead()" << _read << _readWithProgress;
QMetaObject::invokeMethod(this, "readyRead", Qt::QueuedConnection); // tell QNAM that we have quota
}
}
void UploadDevice::setBandwidthLimited(bool b) {
_bandwidthLimited = b;
QMetaObject::invokeMethod(this, "readyRead", Qt::QueuedConnection);
}
void UploadDevice::setChoked(bool b) {
_choked = b;
if (!_choked) {
QMetaObject::invokeMethod(this, "readyRead", Qt::QueuedConnection);
}
}
void PropagateUploadFileQNAM::startNextChunk() void PropagateUploadFileQNAM::startNextChunk()
{ {
@ -291,7 +359,7 @@ void PropagateUploadFileQNAM::startNextChunk()
} }
QString path = _item._file; QString path = _item._file;
QIODevice *device = 0; UploadDevice *device = 0;
if (_chunkCount > 1) { if (_chunkCount > 1) {
int sendingChunk = (_currentChunk + _startChunk) % _chunkCount; int sendingChunk = (_currentChunk + _startChunk) % _chunkCount;
// XOR with chunk size to make sure everything goes well if chunk size change between runs // XOR with chunk size to make sure everything goes well if chunk size change between runs
@ -305,9 +373,9 @@ void PropagateUploadFileQNAM::startNextChunk()
currentChunkSize = chunkSize(); currentChunkSize = chunkSize();
} }
} }
device = new ChunkDevice(_file, chunkSize() * quint64(sendingChunk), currentChunkSize); device = new UploadDevice(_file, chunkSize() * quint64(sendingChunk), currentChunkSize, &_propagator->_bandwidthManager);
} else { } else {
device = _file; device = new UploadDevice(_file, 0, fileSize, &_propagator->_bandwidthManager);
} }
bool isOpen = true; bool isOpen = true;
@ -321,6 +389,7 @@ void PropagateUploadFileQNAM::startNextChunk()
job->setTimeout(_propagator->httpTimeout() * 1000); job->setTimeout(_propagator->httpTimeout() * 1000);
connect(job, SIGNAL(finishedSignal()), this, SLOT(slotPutFinished())); connect(job, SIGNAL(finishedSignal()), this, SLOT(slotPutFinished()));
connect(job, SIGNAL(uploadProgress(qint64,qint64)), this, SLOT(slotUploadProgress(qint64,qint64))); connect(job, SIGNAL(uploadProgress(qint64,qint64)), this, SLOT(slotUploadProgress(qint64,qint64)));
connect(_job, SIGNAL(uploadProgress(qint64,qint64)), device, SLOT(slotJobUploadProgress(qint64,qint64)));
connect(job, SIGNAL(destroyed(QObject*)), this, SLOT(slotJobDestroyed(QObject*))); connect(job, SIGNAL(destroyed(QObject*)), this, SLOT(slotJobDestroyed(QObject*)));
job->start(); job->start();
_propagator->_activeJobs++; _propagator->_activeJobs++;
@ -341,7 +410,6 @@ void PropagateUploadFileQNAM::startNextChunk()
if (!parallelChunkUpload || _chunkCount - _currentChunk <= 0) { if (!parallelChunkUpload || _chunkCount - _currentChunk <= 0) {
emitReady(); emitReady();
} }
} else { } else {
qDebug() << "ERR: Could not open upload file: " << device->errorString(); qDebug() << "ERR: Could not open upload file: " << device->errorString();
done( SyncFileItem::NormalError, device->errorString() ); done( SyncFileItem::NormalError, device->errorString() );
@ -496,17 +564,18 @@ void PropagateUploadFileQNAM::finalize(const SyncFileItem &copy)
_propagator->_journal->setUploadInfo(_item._file, SyncJournalDb::UploadInfo()); _propagator->_journal->setUploadInfo(_item._file, SyncJournalDb::UploadInfo());
_propagator->_journal->commit("upload file start"); _propagator->_journal->commit("upload file start");
qDebug() << Q_FUNC_INFO << "msec=" <<_duration.elapsed();
done(SyncFileItem::Success); done(SyncFileItem::Success);
} }
void PropagateUploadFileQNAM::slotUploadProgress(qint64 sent, qint64) void PropagateUploadFileQNAM::slotUploadProgress(qint64 sent, qint64 t)
{ {
int progressChunk = _currentChunk + _startChunk - 1; int progressChunk = _currentChunk + _startChunk - 1;
if (progressChunk >= _chunkCount) if (progressChunk >= _chunkCount)
progressChunk = _currentChunk - 1; progressChunk = _currentChunk - 1;
quint64 amount = progressChunk * chunkSize(); quint64 amount = progressChunk * chunkSize();
sender()->setProperty("byteWritten", sent); sender()->setProperty("byteWritten", sent);
if (_jobs.count() == 1) { if (_jobs.count() > 1) {
amount += sent; amount += sent;
} else { } else {
amount -= (_jobs.count() -1) * chunkSize(); amount -= (_jobs.count() -1) * chunkSize();
@ -568,6 +637,7 @@ GETFileJob::GETFileJob(Account* account, const QString& path, QFile *device,
: AbstractNetworkJob(account, path, parent), : AbstractNetworkJob(account, path, parent),
_device(device), _headers(headers), _expectedEtagForResume(expectedEtagForResume), _device(device), _headers(headers), _expectedEtagForResume(expectedEtagForResume),
_resumeStart(_resumeStart) , _errorStatus(SyncFileItem::NoStatus) _resumeStart(_resumeStart) , _errorStatus(SyncFileItem::NoStatus)
, _bandwidthLimited(false), _bandwidthChoked(false), _bandwidthQuota(0), _bandwidthManager(0)
{ {
} }
@ -577,6 +647,7 @@ GETFileJob::GETFileJob(Account* account, const QUrl& url, QFile *device,
: AbstractNetworkJob(account, url.toEncoded(), parent), : AbstractNetworkJob(account, url.toEncoded(), parent),
_device(device), _headers(headers), _resumeStart(0), _device(device), _headers(headers), _resumeStart(0),
_errorStatus(SyncFileItem::NoStatus), _directDownloadUrl(url) _errorStatus(SyncFileItem::NoStatus), _directDownloadUrl(url)
, _bandwidthLimited(false), _bandwidthChoked(false), _bandwidthQuota(0), _bandwidthManager(0)
{ {
} }
@ -594,7 +665,11 @@ void GETFileJob::start() {
setReply(davRequest("GET", _directDownloadUrl, req)); setReply(davRequest("GET", _directDownloadUrl, req));
} }
setupConnections(reply()); setupConnections(reply());
reply()->setReadBufferSize(128 * 1024);
reply()->setReadBufferSize(16 * 1024); // keep low so we can easier limit the bandwidth
if (_bandwidthManager) {
_bandwidthManager->registerDownloadJob(this);
}
if( reply()->error() != QNetworkReply::NoError ) { if( reply()->error() != QNetworkReply::NoError ) {
qWarning() << Q_FUNC_INFO << " Network error: " << reply()->errorString(); qWarning() << Q_FUNC_INFO << " Network error: " << reply()->errorString();
@ -609,6 +684,10 @@ void GETFileJob::start() {
void GETFileJob::slotMetaDataChanged() void GETFileJob::slotMetaDataChanged()
{ {
// For some reason setting the read buffer in GETFileJob::start doesn't seem to go
// through the HTTP layer thread(?)
reply()->setReadBufferSize(16 * 1024);
if (reply()->error() != QNetworkReply::NoError if (reply()->error() != QNetworkReply::NoError
|| reply()->attribute(QNetworkRequest::HttpStatusCodeAttribute).toInt() / 100 != 2) { || reply()->attribute(QNetworkRequest::HttpStatusCodeAttribute).toInt() / 100 != 2) {
// We will handle the error when the job is finished. // We will handle the error when the job is finished.
@ -665,13 +744,55 @@ void GETFileJob::slotMetaDataChanged()
} }
void GETFileJob::setBandwidthManager(BandwidthManager *bwm)
{
_bandwidthManager = bwm;
}
void GETFileJob::setChoked(bool c)
{
_bandwidthChoked = c;
QMetaObject::invokeMethod(this, "slotReadyRead", Qt::QueuedConnection);
}
void GETFileJob::setBandwidthLimited(bool b)
{
_bandwidthLimited = b;
QMetaObject::invokeMethod(this, "slotReadyRead", Qt::QueuedConnection);
}
void GETFileJob::giveBandwidthQuota(qint64 q)
{
_bandwidthQuota = q;
qDebug() << Q_FUNC_INFO << "Got" << q << "bytes";
QMetaObject::invokeMethod(this, "slotReadyRead", Qt::QueuedConnection);
}
void GETFileJob::slotReadyRead() void GETFileJob::slotReadyRead()
{ {
int bufferSize = qMin(1024*8ll , reply()->bytesAvailable()); int bufferSize = qMin(1024*8ll , reply()->bytesAvailable());
QByteArray buffer(bufferSize, Qt::Uninitialized); QByteArray buffer(bufferSize, Qt::Uninitialized);
qDebug() << Q_FUNC_INFO << reply()->bytesAvailable() << reply()->isOpen() << reply()->isFinished();
//return;
while(reply()->bytesAvailable() > 0) { while(reply()->bytesAvailable() > 0) {
qint64 r = reply()->read(buffer.data(), bufferSize); if (_bandwidthChoked) {
qDebug() << Q_FUNC_INFO << "Download choked";
break;
}
qint64 toRead = bufferSize;
if (_bandwidthLimited) {
toRead = qMin(qint64(bufferSize), _bandwidthQuota);
if (toRead == 0) {
qDebug() << Q_FUNC_INFO << "Out of quota";
break;
}
_bandwidthQuota -= toRead;
qDebug() << Q_FUNC_INFO << "Reading" << toRead << "remaining" << _bandwidthQuota;
}
qint64 r = reply()->read(buffer.data(), toRead);
if (r < 0) { if (r < 0) {
_errorString = reply()->errorString(); _errorString = reply()->errorString();
_errorStatus = SyncFileItem::NormalError; _errorStatus = SyncFileItem::NormalError;
@ -690,6 +811,18 @@ void GETFileJob::slotReadyRead()
} }
} }
resetTimeout(); resetTimeout();
if (reply()->isFinished() && reply()->bytesAvailable() == 0) {
qDebug() << Q_FUNC_INFO << "Actually finished!";
if (_bandwidthManager) {
_bandwidthManager->unregisterDownloadJob(this);
}
if (!_hasEmittedFinishedSignal) {
emit finishedSignal();
}
_hasEmittedFinishedSignal = true;
deleteLater();
}
} }
void GETFileJob::slotTimeout() void GETFileJob::slotTimeout()
@ -788,6 +921,7 @@ void PropagateDownloadFileQNAM::start()
&_tmpFile, headers); &_tmpFile, headers);
qDebug() << Q_FUNC_INFO << "directDownloadUrl given for " << _item._file << _item._directDownloadUrl << headers["Cookie"]; qDebug() << Q_FUNC_INFO << "directDownloadUrl given for " << _item._file << _item._directDownloadUrl << headers["Cookie"];
} }
_job->setBandwidthManager(&_propagator->_bandwidthManager);
_job->setTimeout(_propagator->httpTimeout() * 1000); _job->setTimeout(_propagator->httpTimeout() * 1000);
connect(_job, SIGNAL(finishedSignal()), this, SLOT(slotGetFinished())); connect(_job, SIGNAL(finishedSignal()), this, SLOT(slotGetFinished()));
connect(_job, SIGNAL(downloadProgress(qint64,qint64)), this, SLOT(slotDownloadProgress(qint64,qint64))); connect(_job, SIGNAL(downloadProgress(qint64,qint64)), this, SLOT(slotDownloadProgress(qint64,qint64)));
@ -911,4 +1045,5 @@ void PropagateDownloadFileQNAM::abort()
_job->reply()->abort(); _job->reply()->abort();
} }
} }

View file

@ -51,7 +51,7 @@ public:
class PUTFileJob : public AbstractNetworkJob { class PUTFileJob : public AbstractNetworkJob {
Q_OBJECT Q_OBJECT
QIODevice* _device; QSharedPointer<QIODevice> _device;
QMap<QByteArray, QByteArray> _headers; QMap<QByteArray, QByteArray> _headers;
QString _errorString; QString _errorString;
@ -144,6 +144,11 @@ class GETFileJob : public AbstractNetworkJob {
SyncFileItem::Status _errorStatus; SyncFileItem::Status _errorStatus;
QUrl _directDownloadUrl; QUrl _directDownloadUrl;
QByteArray _etag; QByteArray _etag;
bool _bandwidthLimited; // if _bandwidthQuota will be used
bool _bandwidthChoked; // if download is paused (won't read on readyRead())
qint64 _bandwidthQuota;
BandwidthManager *_bandwidthManager;
bool _hasEmittedFinishedSignal;
public: public:
// DOES NOT take owncership of the device. // DOES NOT take owncership of the device.
@ -154,12 +159,33 @@ public:
explicit GETFileJob(Account* account, const QUrl& url, QFile *device, explicit GETFileJob(Account* account, const QUrl& url, QFile *device,
const QMap<QByteArray, QByteArray> &headers, const QMap<QByteArray, QByteArray> &headers,
QObject* parent = 0); QObject* parent = 0);
virtual ~GETFileJob() {
if (_bandwidthManager) {
_bandwidthManager->unregisterDownloadJob(this);
}
}
virtual void start(); virtual void start();
virtual bool finished() { virtual bool finished() {
emit finishedSignal(); if (reply()->bytesAvailable()) {
return true; qDebug() << Q_FUNC_INFO << "Not all read yet because of bandwidth limits";
return false;
} else {
if (_bandwidthManager) {
_bandwidthManager->unregisterDownloadJob(this);
} }
if (!_hasEmittedFinishedSignal) {
emit finishedSignal();
}
_hasEmittedFinishedSignal = true;
return true; // discard
}
}
void setBandwidthManager(BandwidthManager *bwm);
void setChoked(bool c);
void setBandwidthLimited(bool b);
void giveBandwidthQuota(qint64 q);
QString errorString() { QString errorString() {
return _errorString.isEmpty() ? reply()->errorString() : _errorString; return _errorString.isEmpty() ? reply()->errorString() : _errorString;
@ -199,6 +225,4 @@ private slots:
void slotDownloadProgress(qint64,qint64); void slotDownloadProgress(qint64,qint64);
}; };
} }