Only starts 6 jobs in parallel

This commit is contained in:
Olivier Goffart 2014-02-12 11:07:34 +01:00
parent 6db8daabf7
commit f35b1f8a2b
4 changed files with 67 additions and 14 deletions

View file

@ -709,7 +709,7 @@ QString CSyncThread::adjustRenamedPath(const QString& original)
void CSyncThread::abort()
{
csync_request_abort(_csync_ctx);
if(_propagator);
if(_propagator)
_propagator->abort();
}

View file

@ -49,6 +49,9 @@
#include <time.h>
/* The maximum number of active job in parallel */
static const int maximumActiveJob = 6;
// We use some internals of csync:
extern "C" int c_utimes(const char *, const struct timeval *);
extern "C" void csync_win32_set_file_hidden( const char *file, bool h );
@ -1110,7 +1113,7 @@ void PropagateDirectory::start()
_current = -1;
_hasError = SyncFileItem::NoStatus;
if (!_firstJob) {
slotSubJobFinished(SyncFileItem::Success);
slotSubJobReady();
} else {
startJob(_firstJob.data());
}
@ -1125,18 +1128,32 @@ void PropagateDirectory::slotSubJobFinished(SyncFileItem::Status status)
} else if (status == SyncFileItem::NormalError || status == SyncFileItem::SoftError) {
_hasError = status;
}
_runningNow--;
slotSubJobReady();
}
if (_current == -1) {
// Start all the jobs
foreach( PropagatorJob *next , _subJobs ) {
startJob(next);
}
void PropagateDirectory::slotSubJobReady()
{
qDebug() << Q_FUNC_INFO << _runningNow << _propagator->_activeJobs;
if (_runningNow && _current == -1)
return; // Ignore the case when the _fistJob is ready and not yet finished
if (_runningNow && _current >= 0 && _current < _subJobs.count()) {
// there is a job running and the current one is not ready yet, we can't start new job
qDebug() << _subJobs[_current]->_readySent << maximumActiveJob << _subJobs[_current];
if (!_subJobs[_current]->_readySent || _propagator->_activeJobs >= maximumActiveJob)
return;
}
_current ++;
if (_current >= _subJobs.size()) {
// We finished to process all the jobs
_current++;
if (_current < _subJobs.size() && !_propagator->_abortRequested.fetchAndAddRelaxed(0)) {
PropagatorJob *next = _subJobs.at(_current);
startJob(next);
return;
}
// We finished to processing all the jobs
emitReady();
if (!_runningNow) {
if (!_item.isEmpty() && _hasError == SyncFileItem::NoStatus) {
if( !_item._renameTarget.isEmpty() ) {
_item._file = _item._renameTarget;

View file

@ -36,16 +36,39 @@ class PropagatorJob : public QObject {
Q_OBJECT
protected:
OwncloudPropagator *_propagator;
void emitReady() {
bool wasReady = _readySent;
_readySent = true;
if (!wasReady)
emit ready();
};
public:
explicit PropagatorJob(OwncloudPropagator* propagator) : _propagator(propagator) {}
bool _readySent;
explicit PropagatorJob(OwncloudPropagator* propagator) : _propagator(propagator), _readySent(false) {}
public slots:
virtual void start() = 0;
virtual void abort() {}
signals:
/**
* Emitted when the job is fully finished
*/
void finished(SyncFileItem::Status);
/**
* Emitted when one item has been completed within a job.
*/
void completed(const SyncFileItem &);
/**
* Emitted when all the sub-jobs have been scheduled and
* we are ready and more jobs might be started
* This signal is not always emitted.
*/
void ready();
void progress(Progress::Kind, const SyncFileItem& item, quint64 bytes, quint64 total);
};
/*
@ -64,12 +87,13 @@ public:
SyncFileItem _item;
int _current; // index of the current running job
int _runningNow; // number of subJob running now
SyncFileItem::Status _hasError; // NoStatus, or NormalError / SoftError if there was an error
explicit PropagateDirectory(OwncloudPropagator *propagator, const SyncFileItem &item = SyncFileItem())
: PropagatorJob(propagator)
, _firstJob(0), _item(item), _current(-1), _hasError(SyncFileItem::NoStatus) { }
, _firstJob(0), _item(item), _current(-1), _runningNow(0), _hasError(SyncFileItem::NoStatus) { }
virtual ~PropagateDirectory() {
qDeleteAll(_subJobs);
@ -92,10 +116,13 @@ private slots:
connect(next, SIGNAL(finished(SyncFileItem::Status)), this, SLOT(slotSubJobFinished(SyncFileItem::Status)), Qt::QueuedConnection);
connect(next, SIGNAL(completed(SyncFileItem)), this, SIGNAL(completed(SyncFileItem)));
connect(next, SIGNAL(progress(Progress::Kind,SyncFileItem,quint64,quint64)), this, SIGNAL(progress(Progress::Kind,SyncFileItem,quint64,quint64)));
connect(next, SIGNAL(ready()), this, SLOT(slotSubJobReady()));
_runningNow++;
QMetaObject::invokeMethod(next, "start");
}
void slotSubJobFinished(SyncFileItem::Status status);
void slotSubJobReady();
};
@ -152,6 +179,7 @@ public:
, _localDir((localDir.endsWith(QChar('/'))) ? localDir : localDir+'/' )
, _remoteDir((remoteDir.endsWith(QChar('/'))) ? remoteDir : remoteDir+'/' )
, _journal(progressDb)
, _activeJobs(0)
{ }
void start(const SyncFileItemVector &_syncedItems);
@ -161,9 +189,14 @@ public:
QAtomicInt _abortRequested; // boolean set by the main thread to abort.
/* The number of currently active jobs */
int _activeJobs;
void overallTransmissionSizeChanged( qint64 change );
bool isInSharedDirectory(const QString& file);
void abort() {
_abortRequested.fetchAndStoreOrdered(true);
if (_rootJob)

View file

@ -23,7 +23,6 @@ namespace Mirall {
void PUTFileJob::start() {
QNetworkRequest req;
qDebug() << _headers;
for(QMap<QByteArray, QByteArray>::const_iterator it = _headers.begin(); it != _headers.end(); ++it) {
req.setRawHeader(it.key(), it.value());
qDebug() << it.key() << it.value();
@ -217,10 +216,14 @@ void PropagateUploadFileQNAM::start()
emit progress(Progress::StartUpload, _item, 0, file->size());
job->start();
_propagator->_activeJobs++;
emitReady();
}
void PropagateUploadFileQNAM::slotPutFinished()
{
_propagator->_activeJobs--;
PUTFileJob *job = qobject_cast<PUTFileJob *>(sender());
Q_ASSERT(job);