mirror of
https://github.com/nextcloud/desktop.git
synced 2024-12-17 19:31:48 +03:00
Propagator: Relative download limit
This commit is contained in:
parent
de79f9338a
commit
d1cc3c34c9
4 changed files with 140 additions and 9 deletions
|
@ -42,24 +42,27 @@ static qint64 relativeLimitMeasuringTimerIntervalMsec = 1000*2;
|
||||||
// * For relative limiting, smoothen measurements
|
// * For relative limiting, smoothen measurements
|
||||||
|
|
||||||
BandwidthManager::BandwidthManager(OwncloudPropagator *p) : QObject(),
|
BandwidthManager::BandwidthManager(OwncloudPropagator *p) : QObject(),
|
||||||
|
_propagator(p),
|
||||||
_relativeLimitCurrentMeasuredDevice(0),
|
_relativeLimitCurrentMeasuredDevice(0),
|
||||||
_relativeUploadLimitProgressAtMeasuringRestart(0),
|
_relativeUploadLimitProgressAtMeasuringRestart(0),
|
||||||
_currentUploadLimit(0),
|
_currentUploadLimit(0),
|
||||||
_currentDownloadLimit(0),
|
_relativeLimitCurrentMeasuredJob(0),
|
||||||
_propagator(p)
|
_currentDownloadLimit(0)
|
||||||
{
|
{
|
||||||
_currentUploadLimit = _propagator->_uploadLimit.fetchAndAddAcquire(0);
|
_currentUploadLimit = _propagator->_uploadLimit.fetchAndAddAcquire(0);
|
||||||
_currentDownloadLimit = _propagator->_downloadLimit.fetchAndAddAcquire(0);
|
_currentDownloadLimit = _propagator->_downloadLimit.fetchAndAddAcquire(0);
|
||||||
|
|
||||||
QObject::connect(&_absoluteLimitTimer, SIGNAL(timeout()), this, SLOT(absoluteLimitTimerExpired()));
|
|
||||||
_absoluteLimitTimer.setInterval(1000);
|
|
||||||
_absoluteLimitTimer.start();
|
|
||||||
|
|
||||||
QObject::connect(&_switchingTimer, SIGNAL(timeout()), this, SLOT(switchingTimerExpired()));
|
QObject::connect(&_switchingTimer, SIGNAL(timeout()), this, SLOT(switchingTimerExpired()));
|
||||||
_switchingTimer.setInterval(10*1000);
|
_switchingTimer.setInterval(10*1000);
|
||||||
_switchingTimer.start();
|
_switchingTimer.start();
|
||||||
QMetaObject::invokeMethod(this, "switchingTimerExpired", Qt::QueuedConnection);
|
QMetaObject::invokeMethod(this, "switchingTimerExpired", Qt::QueuedConnection);
|
||||||
|
|
||||||
|
// absolute uploads/downloads
|
||||||
|
QObject::connect(&_absoluteLimitTimer, SIGNAL(timeout()), this, SLOT(absoluteLimitTimerExpired()));
|
||||||
|
_absoluteLimitTimer.setInterval(1000);
|
||||||
|
_absoluteLimitTimer.start();
|
||||||
|
|
||||||
|
// Relative uploads
|
||||||
QObject::connect(&_relativeUploadMeasuringTimer,SIGNAL(timeout()),
|
QObject::connect(&_relativeUploadMeasuringTimer,SIGNAL(timeout()),
|
||||||
this, SLOT(relativeUploadMeasuringTimerExpired()));
|
this, SLOT(relativeUploadMeasuringTimerExpired()));
|
||||||
_relativeUploadMeasuringTimer.setInterval(relativeLimitMeasuringTimerIntervalMsec);
|
_relativeUploadMeasuringTimer.setInterval(relativeLimitMeasuringTimerIntervalMsec);
|
||||||
|
@ -68,6 +71,16 @@ BandwidthManager::BandwidthManager(OwncloudPropagator *p) : QObject(),
|
||||||
QObject::connect(&_relativeUploadDelayTimer, SIGNAL(timeout()),
|
QObject::connect(&_relativeUploadDelayTimer, SIGNAL(timeout()),
|
||||||
this, SLOT(relativeUploadDelayTimerExpired()));
|
this, SLOT(relativeUploadDelayTimerExpired()));
|
||||||
_relativeUploadDelayTimer.setSingleShot(true); // will be restarted from the measuring timer
|
_relativeUploadDelayTimer.setSingleShot(true); // will be restarted from the measuring timer
|
||||||
|
|
||||||
|
// Relative downloads
|
||||||
|
QObject::connect(&_relativeDownloadMeasuringTimer,SIGNAL(timeout()),
|
||||||
|
this, SLOT(relativeDownloadMeasuringTimerExpired()));
|
||||||
|
_relativeDownloadMeasuringTimer.setInterval(relativeLimitMeasuringTimerIntervalMsec);
|
||||||
|
_relativeDownloadMeasuringTimer.start();
|
||||||
|
_relativeDownloadMeasuringTimer.setSingleShot(true); // will be restarted from the delay timer
|
||||||
|
QObject::connect(&_relativeDownloadDelayTimer, SIGNAL(timeout()),
|
||||||
|
this, SLOT(relativeDownloadDelayTimerExpired()));
|
||||||
|
_relativeDownloadDelayTimer.setSingleShot(true); // will be restarted from the measuring timer
|
||||||
}
|
}
|
||||||
|
|
||||||
void BandwidthManager::registerUploadDevice(UploadDevice *p)
|
void BandwidthManager::registerUploadDevice(UploadDevice *p)
|
||||||
|
@ -231,6 +244,104 @@ void BandwidthManager::relativeUploadDelayTimerExpired()
|
||||||
// now we're in measuring state
|
// now we're in measuring state
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// for downloads:
|
||||||
|
void BandwidthManager::relativeDownloadMeasuringTimerExpired()
|
||||||
|
{
|
||||||
|
if (!usingRelativeDownloadLimit()) {
|
||||||
|
// Not in this limiting mode, just wait 1 sec to continue the cycle
|
||||||
|
_relativeDownloadDelayTimer.setInterval(1000);
|
||||||
|
_relativeDownloadDelayTimer.start();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (_relativeLimitCurrentMeasuredJob == 0 || _downloadJobList.count() == 0) {
|
||||||
|
qDebug() << Q_FUNC_INFO << "No job set, just waiting 1 sec";
|
||||||
|
_relativeDownloadDelayTimer.setInterval(1000);
|
||||||
|
_relativeDownloadDelayTimer.start();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
qDebug() << Q_FUNC_INFO << _downloadJobList.count() << "Starting Delay";
|
||||||
|
|
||||||
|
qint64 relativeLimitProgressMeasured = _relativeLimitCurrentMeasuredJob->currentDownloadPosition();
|
||||||
|
qint64 relativeLimitProgressDifference = relativeLimitProgressMeasured - _relativeDownloadLimitProgressAtMeasuringRestart;
|
||||||
|
qDebug() << Q_FUNC_INFO << _relativeDownloadLimitProgressAtMeasuringRestart
|
||||||
|
<< relativeLimitProgressMeasured << relativeLimitProgressDifference;
|
||||||
|
|
||||||
|
qint64 speedkBPerSec = (relativeLimitProgressDifference / relativeLimitMeasuringTimerIntervalMsec*1000.0) / 1024.0;
|
||||||
|
qDebug() << Q_FUNC_INFO << relativeLimitProgressDifference/1024 <<"kB =>" << speedkBPerSec << "kB/sec on full speed ("
|
||||||
|
<< _relativeLimitCurrentMeasuredJob->currentDownloadPosition() ;
|
||||||
|
|
||||||
|
qint64 downloadLimitPercent = -_currentDownloadLimit;
|
||||||
|
// don't use too extreme values
|
||||||
|
downloadLimitPercent = qMin(downloadLimitPercent, qint64(90));
|
||||||
|
downloadLimitPercent = qMax(qint64(10), downloadLimitPercent);
|
||||||
|
qint64 wholeTimeMsec = (100.0 / downloadLimitPercent) * relativeLimitMeasuringTimerIntervalMsec;
|
||||||
|
qint64 waitTimeMsec = wholeTimeMsec - relativeLimitMeasuringTimerIntervalMsec;
|
||||||
|
qint64 realWaitTimeMsec = waitTimeMsec + wholeTimeMsec;
|
||||||
|
qDebug() << Q_FUNC_INFO << waitTimeMsec << " - "<< realWaitTimeMsec <<
|
||||||
|
" msec for " << downloadLimitPercent << "%";
|
||||||
|
qDebug() << Q_FUNC_INFO << "XXXX" << downloadLimitPercent << relativeLimitMeasuringTimerIntervalMsec;
|
||||||
|
|
||||||
|
// We want to wait twice as long since we want to give all
|
||||||
|
// devices the same quota we used now since we don't want
|
||||||
|
// any upload to timeout
|
||||||
|
_relativeDownloadDelayTimer.setInterval(realWaitTimeMsec);
|
||||||
|
_relativeDownloadDelayTimer.start();
|
||||||
|
|
||||||
|
int jobCount = _downloadJobList.count();
|
||||||
|
qint64 quota = relativeLimitProgressDifference * (downloadLimitPercent / 100.0);
|
||||||
|
// if (quota > 20*1024) {
|
||||||
|
// qDebug() << "======== ADJUSTING QUOTA FROM " << quota << " TO " << quota - 20*1024;
|
||||||
|
// quota -= 20*1024;
|
||||||
|
// }
|
||||||
|
qint64 quotaPerJob = quota / jobCount + 1.0;
|
||||||
|
qDebug() << Q_FUNC_INFO << "YYYY" << relativeLimitProgressDifference << downloadLimitPercent << jobCount;
|
||||||
|
Q_FOREACH(GETFileJob *gfj, _downloadJobList) {
|
||||||
|
gfj->setBandwidthLimited(true);
|
||||||
|
gfj->setChoked(false);
|
||||||
|
gfj->giveBandwidthQuota(quotaPerJob);
|
||||||
|
qDebug() << Q_FUNC_INFO << "Gave" << quotaPerJob/1024.0 << "kB to" << gfj;
|
||||||
|
}
|
||||||
|
_relativeLimitCurrentMeasuredDevice = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void BandwidthManager::relativeDownloadDelayTimerExpired()
|
||||||
|
{
|
||||||
|
// Switch to measuring state
|
||||||
|
_relativeDownloadMeasuringTimer.start(); // always start to continue the cycle
|
||||||
|
|
||||||
|
if (!usingRelativeDownloadLimit()) {
|
||||||
|
return; // oh, not actually needed
|
||||||
|
}
|
||||||
|
|
||||||
|
if (_downloadJobList.isEmpty()) {
|
||||||
|
qDebug() << Q_FUNC_INFO << _downloadJobList.count() << "No jobs?";
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
qDebug() << Q_FUNC_INFO << _downloadJobList.count() << "Starting measuring";
|
||||||
|
|
||||||
|
// Take first device and then append it again (= we round robin all devices)
|
||||||
|
_relativeLimitCurrentMeasuredJob = _downloadJobList.takeFirst();
|
||||||
|
_downloadJobList.append(_relativeLimitCurrentMeasuredJob);
|
||||||
|
|
||||||
|
_relativeDownloadLimitProgressAtMeasuringRestart = _relativeLimitCurrentMeasuredJob->currentDownloadPosition();
|
||||||
|
_relativeLimitCurrentMeasuredJob->setBandwidthLimited(false);
|
||||||
|
_relativeLimitCurrentMeasuredJob->setChoked(false);
|
||||||
|
|
||||||
|
// choke all other UploadDevices
|
||||||
|
Q_FOREACH(GETFileJob *gfj, _downloadJobList) {
|
||||||
|
if (gfj != _relativeLimitCurrentMeasuredJob) {
|
||||||
|
gfj->setBandwidthLimited(true);
|
||||||
|
gfj->setChoked(true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// now we're in measuring state
|
||||||
|
}
|
||||||
|
|
||||||
|
// end downloads
|
||||||
|
|
||||||
void BandwidthManager::switchingTimerExpired() {
|
void BandwidthManager::switchingTimerExpired() {
|
||||||
qint64 newUploadLimit = _propagator->_uploadLimit.fetchAndAddAcquire(0);
|
qint64 newUploadLimit = _propagator->_uploadLimit.fetchAndAddAcquire(0);
|
||||||
if (newUploadLimit != _currentUploadLimit) {
|
if (newUploadLimit != _currentUploadLimit) {
|
||||||
|
|
|
@ -52,9 +52,12 @@ public slots:
|
||||||
void relativeUploadMeasuringTimerExpired();
|
void relativeUploadMeasuringTimerExpired();
|
||||||
void relativeUploadDelayTimerExpired();
|
void relativeUploadDelayTimerExpired();
|
||||||
|
|
||||||
|
void relativeDownloadMeasuringTimerExpired();
|
||||||
|
void relativeDownloadDelayTimerExpired();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
QTimer _switchingTimer; // for switching between absolute and relative bw limiting
|
QTimer _switchingTimer; // for switching between absolute and relative bw limiting
|
||||||
OwncloudPropagator *_propagator; // this timer and this variable could be replaced
|
OwncloudPropagator *_propagator; // FIXME this timer and this variable should be replaced
|
||||||
// by the propagator emitting the changed limit values to us as signal
|
// by the propagator emitting the changed limit values to us as signal
|
||||||
|
|
||||||
QTimer _absoluteLimitTimer; // for absolute up/down bw limiting
|
QTimer _absoluteLimitTimer; // for absolute up/down bw limiting
|
||||||
|
@ -68,6 +71,10 @@ private:
|
||||||
qint64 _currentUploadLimit;
|
qint64 _currentUploadLimit;
|
||||||
|
|
||||||
QLinkedList<GETFileJob*> _downloadJobList;
|
QLinkedList<GETFileJob*> _downloadJobList;
|
||||||
|
QTimer _relativeDownloadMeasuringTimer;
|
||||||
|
QTimer _relativeDownloadDelayTimer; // for relative bw limiting, we need to wait this amount before measuring again
|
||||||
|
GETFileJob *_relativeLimitCurrentMeasuredJob; // the device measured
|
||||||
|
qint64 _relativeDownloadLimitProgressAtMeasuringRestart; // for measuring how much progress we made at start
|
||||||
qint64 _currentDownloadLimit;
|
qint64 _currentDownloadLimit;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -389,7 +389,7 @@ void PropagateUploadFileQNAM::startNextChunk()
|
||||||
job->setTimeout(_propagator->httpTimeout() * 1000);
|
job->setTimeout(_propagator->httpTimeout() * 1000);
|
||||||
connect(job, SIGNAL(finishedSignal()), this, SLOT(slotPutFinished()));
|
connect(job, SIGNAL(finishedSignal()), this, SLOT(slotPutFinished()));
|
||||||
connect(job, SIGNAL(uploadProgress(qint64,qint64)), this, SLOT(slotUploadProgress(qint64,qint64)));
|
connect(job, SIGNAL(uploadProgress(qint64,qint64)), this, SLOT(slotUploadProgress(qint64,qint64)));
|
||||||
connect(_job, SIGNAL(uploadProgress(qint64,qint64)), device, SLOT(slotJobUploadProgress(qint64,qint64)));
|
connect(job, SIGNAL(uploadProgress(qint64,qint64)), device, SLOT(slotJobUploadProgress(qint64,qint64)));
|
||||||
connect(job, SIGNAL(destroyed(QObject*)), this, SLOT(slotJobDestroyed(QObject*)));
|
connect(job, SIGNAL(destroyed(QObject*)), this, SLOT(slotJobDestroyed(QObject*)));
|
||||||
job->start();
|
job->start();
|
||||||
_propagator->_activeJobs++;
|
_propagator->_activeJobs++;
|
||||||
|
@ -638,6 +638,7 @@ GETFileJob::GETFileJob(Account* account, const QString& path, QFile *device,
|
||||||
_device(device), _headers(headers), _expectedEtagForResume(expectedEtagForResume),
|
_device(device), _headers(headers), _expectedEtagForResume(expectedEtagForResume),
|
||||||
_resumeStart(_resumeStart) , _errorStatus(SyncFileItem::NoStatus)
|
_resumeStart(_resumeStart) , _errorStatus(SyncFileItem::NoStatus)
|
||||||
, _bandwidthLimited(false), _bandwidthChoked(false), _bandwidthQuota(0), _bandwidthManager(0)
|
, _bandwidthLimited(false), _bandwidthChoked(false), _bandwidthQuota(0), _bandwidthManager(0)
|
||||||
|
, _hasEmittedFinishedSignal(false)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -648,6 +649,7 @@ GETFileJob::GETFileJob(Account* account, const QUrl& url, QFile *device,
|
||||||
_device(device), _headers(headers), _resumeStart(0),
|
_device(device), _headers(headers), _resumeStart(0),
|
||||||
_errorStatus(SyncFileItem::NoStatus), _directDownloadUrl(url)
|
_errorStatus(SyncFileItem::NoStatus), _directDownloadUrl(url)
|
||||||
, _bandwidthLimited(false), _bandwidthChoked(false), _bandwidthQuota(0), _bandwidthManager(0)
|
, _bandwidthLimited(false), _bandwidthChoked(false), _bandwidthQuota(0), _bandwidthManager(0)
|
||||||
|
, _hasEmittedFinishedSignal(false)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -667,6 +669,7 @@ void GETFileJob::start() {
|
||||||
setupConnections(reply());
|
setupConnections(reply());
|
||||||
|
|
||||||
reply()->setReadBufferSize(16 * 1024); // keep low so we can easier limit the bandwidth
|
reply()->setReadBufferSize(16 * 1024); // keep low so we can easier limit the bandwidth
|
||||||
|
qDebug() << Q_FUNC_INFO << _bandwidthManager << _bandwidthChoked << _bandwidthLimited;
|
||||||
if (_bandwidthManager) {
|
if (_bandwidthManager) {
|
||||||
_bandwidthManager->registerDownloadJob(this);
|
_bandwidthManager->registerDownloadJob(this);
|
||||||
}
|
}
|
||||||
|
@ -768,13 +771,20 @@ void GETFileJob::giveBandwidthQuota(qint64 q)
|
||||||
QMetaObject::invokeMethod(this, "slotReadyRead", Qt::QueuedConnection);
|
QMetaObject::invokeMethod(this, "slotReadyRead", Qt::QueuedConnection);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
qint64 GETFileJob::currentDownloadPosition()
|
||||||
|
{
|
||||||
|
if (_device && _device->pos() > 0 && _device->pos() > _resumeStart) {
|
||||||
|
return _device->pos();
|
||||||
|
}
|
||||||
|
return _resumeStart;
|
||||||
|
}
|
||||||
|
|
||||||
void GETFileJob::slotReadyRead()
|
void GETFileJob::slotReadyRead()
|
||||||
{
|
{
|
||||||
int bufferSize = qMin(1024*8ll , reply()->bytesAvailable());
|
int bufferSize = qMin(1024*8ll , reply()->bytesAvailable());
|
||||||
QByteArray buffer(bufferSize, Qt::Uninitialized);
|
QByteArray buffer(bufferSize, Qt::Uninitialized);
|
||||||
|
|
||||||
qDebug() << Q_FUNC_INFO << reply()->bytesAvailable() << reply()->isOpen() << reply()->isFinished();
|
qDebug() << Q_FUNC_INFO << reply()->bytesAvailable() << reply()->isOpen() << reply()->isFinished();
|
||||||
//return;
|
|
||||||
|
|
||||||
while(reply()->bytesAvailable() > 0) {
|
while(reply()->bytesAvailable() > 0) {
|
||||||
if (_bandwidthChoked) {
|
if (_bandwidthChoked) {
|
||||||
|
@ -812,6 +822,7 @@ void GETFileJob::slotReadyRead()
|
||||||
}
|
}
|
||||||
resetTimeout();
|
resetTimeout();
|
||||||
|
|
||||||
|
qDebug() << Q_FUNC_INFO << "END" << reply()->isFinished() << reply()->bytesAvailable() << _hasEmittedFinishedSignal;
|
||||||
if (reply()->isFinished() && reply()->bytesAvailable() == 0) {
|
if (reply()->isFinished() && reply()->bytesAvailable() == 0) {
|
||||||
qDebug() << Q_FUNC_INFO << "Actually finished!";
|
qDebug() << Q_FUNC_INFO << "Actually finished!";
|
||||||
if (_bandwidthManager) {
|
if (_bandwidthManager) {
|
||||||
|
|
|
@ -167,6 +167,7 @@ public:
|
||||||
|
|
||||||
virtual void start();
|
virtual void start();
|
||||||
virtual bool finished() {
|
virtual bool finished() {
|
||||||
|
qDebug() << Q_FUNC_INFO << reply()->bytesAvailable() << _hasEmittedFinishedSignal;
|
||||||
if (reply()->bytesAvailable()) {
|
if (reply()->bytesAvailable()) {
|
||||||
qDebug() << Q_FUNC_INFO << "Not all read yet because of bandwidth limits";
|
qDebug() << Q_FUNC_INFO << "Not all read yet because of bandwidth limits";
|
||||||
return false;
|
return false;
|
||||||
|
@ -186,6 +187,7 @@ public:
|
||||||
void setChoked(bool c);
|
void setChoked(bool c);
|
||||||
void setBandwidthLimited(bool b);
|
void setBandwidthLimited(bool b);
|
||||||
void giveBandwidthQuota(qint64 q);
|
void giveBandwidthQuota(qint64 q);
|
||||||
|
qint64 currentDownloadPosition();
|
||||||
|
|
||||||
QString errorString() {
|
QString errorString() {
|
||||||
return _errorString.isEmpty() ? reply()->errorString() : _errorString;
|
return _errorString.isEmpty() ? reply()->errorString() : _errorString;
|
||||||
|
|
Loading…
Reference in a new issue