Move CSyncThread in the main thread, and just do the neon jobs in a thread

It is important because then we can do assynchronious job that needs
QNAM in the main thread
This commit is contained in:
Olivier Goffart 2014-02-05 20:18:03 +01:00
parent ee3df45fd8
commit 2d9c0b6c31
7 changed files with 125 additions and 99 deletions

View file

@ -65,11 +65,14 @@ CSyncThread::CSyncThread(CSYNC *csync, const QString &localPath, const QString &
_journal = journal;
qRegisterMetaType<SyncFileItem>("SyncFileItem");
qRegisterMetaType<SyncFileItem::Status>("SyncFileItem::Status");
_thread.start();
}
CSyncThread::~CSyncThread()
{
_thread.quit();
_thread.wait();
}
//Convert an error code from csync to a user readable string.
@ -406,7 +409,7 @@ void CSyncThread::handleSyncError(CSYNC *ctx, const char *state) {
csync_commit(_csync_ctx);
emit finished();
_syncMutex.unlock();
thread()->quit();
_thread.quit();
}
void CSyncThread::startSync()
@ -426,11 +429,9 @@ void CSyncThread::startSync()
_syncedItems.clear();
_needsUpdate = false;
_abortRequestedMutex.lock();
if (!_abortRequested.fetchAndAddRelease(0)) {
csync_resume(_csync_ctx);
}
_abortRequestedMutex.unlock();
if (!_journal->exists()) {
qDebug() << "=====sync looks new (no DB exists), activating recursive PROPFIND if csync supports it";
@ -450,7 +451,7 @@ void CSyncThread::startSync()
csync_commit(_csync_ctx);
emit finished();
_syncMutex.unlock();
thread()->quit();
_thread.quit();
return;
// database creation error!
@ -493,15 +494,22 @@ void CSyncThread::startSync()
_syncTime.start();
QElapsedTimer updateTime;
updateTime.start();
qDebug() << "#### Update start #################################################### >>";
if( csync_update(_csync_ctx) < 0 ) {
UpdateJob *job = new UpdateJob;
job->_csync_ctx = _csync_ctx;
job->moveToThread(&_thread);
connect(job, SIGNAL(finished(int)), this, SLOT(slotUpdateFinished(int)));
QMetaObject::invokeMethod(job, "start");
}
void CSyncThread::slotUpdateFinished(int updateResult)
{
if (updateResult < 0 ) {
handleSyncError(_csync_ctx, "csync_update");
return;
}
qDebug() << "<<#### Update end #################################################### " << updateTime.elapsed();
qDebug() << "<<#### Update end #################################################### " << _syncTime.elapsed();
if( csync_reconcile(_csync_ctx) < 0 ) {
handleSyncError(_csync_ctx, "csync_reconcile");
@ -551,7 +559,7 @@ void CSyncThread::startSync()
Q_ASSERT(session);
_propagator.reset(new OwncloudPropagator (session, _localPath, _remotePath,
_journal, &_abortRequested));
_journal, &_abortRequested, &_thread));
connect(_propagator.data(), SIGNAL(completed(SyncFileItem)),
this, SLOT(transferCompleted(SyncFileItem)), Qt::QueuedConnection);
connect(_propagator.data(), SIGNAL(progress(Progress::Kind,SyncFileItem,quint64,quint64)),
@ -586,7 +594,6 @@ void CSyncThread::setNetworkLimits()
_propagator->_uploadLimit = uploadLimit;
qDebug() << " N------N Network Limits changed!";
}
void CSyncThread::transferCompleted(const SyncFileItem &item)
@ -625,7 +632,7 @@ void CSyncThread::slotFinished()
emit finished();
_propagator.reset(0);
_syncMutex.unlock();
thread()->quit();
_thread.quit();
}
void CSyncThread::progressProblem(Progress::Kind kind, const SyncFileItem& item)
@ -703,10 +710,8 @@ QString CSyncThread::adjustRenamedPath(const QString& original)
void CSyncThread::abort()
{
QMutexLocker locker(&_abortRequestedMutex);
csync_request_abort(_csync_ctx);
_abortRequested = true;
}
} // ns Mirall

View file

@ -19,6 +19,7 @@
#include <stdint.h>
#include <QMutex>
#include <QThread>
#include <QString>
#include <qelapsedtimer.h>
@ -80,8 +81,11 @@ private slots:
void slotFinished();
void slotProgress(Progress::Kind kind, const SyncFileItem &item, quint64 curr = 0, quint64 total = 0);
void slotProgressChanged(qint64 change);
void slotUpdateFinished(int updateResult);
private:
void handleSyncError(CSYNC *ctx, const char *state);
void progressProblem(Progress::Kind kind, const SyncFileItem& item);
@ -102,6 +106,7 @@ private:
QElapsedTimer _syncTime;
QString _lastDeleted; // if the last item was a path and it has been deleted
QHash <QString, QString> _seenFiles;
QThread _thread;
// maps the origin and the target of the folders that have been renamed
@ -117,11 +122,25 @@ private:
qint64 _overallFileCount;
quint64 _lastOverallBytes;
QMutex _abortRequestedMutex; // avoid a race between csync_abort and csync_resume
QAtomicInt _abortRequested;
friend struct CSyncRunScopeHelper;
};
struct UpdateJob : public QObject {
Q_OBJECT
public:
CSYNC *_csync_ctx;
Q_INVOKABLE void start() {
emit finished(csync_update(_csync_ctx));
deleteLater();
}
signals:
void finished(int result);
};
}
#endif // CSYNCTHREAD_H

View file

@ -28,9 +28,6 @@
#include "creds/abstractcredentials.h"
#include <QThread>
extern "C" {
enum csync_exclude_type_e {
@ -62,7 +59,6 @@ Folder::Folder(const QString &alias, const QString &path, const QString& secondP
, _remotePath(secondPath)
, _alias(alias)
, _enabled(true)
, _thread(0)
, _csync(0)
, _csyncError(false)
, _csyncUnavail(false)
@ -135,11 +131,10 @@ bool Folder::init()
Folder::~Folder()
{
if( _thread ) {
if( _csync ) {
_csync->abort();
_thread->wait();
delete _csync;
}
delete _csync;
// Destroy csync here.
csync_destroy(_csync_ctx);
}
@ -196,7 +191,7 @@ QString Folder::path() const
bool Folder::isBusy() const
{
return ( _thread && _thread->isRunning() );
return _csync;
}
QString Folder::remotePath() const
@ -420,7 +415,7 @@ void Folder::slotLocalPathChanged( const QString& dir )
if( notifiedDir.absolutePath() == localPath.absolutePath() ) {
if( !localPath.exists() ) {
qDebug() << "XXXXXXX The sync folder root was removed!!";
if( _thread && _thread->isRunning() ) {
if( isBusy() ) {
qDebug() << "CSync currently running, set wipe flag!!";
} else {
qDebug() << "CSync not running, wipe it now!!";
@ -451,7 +446,7 @@ void Folder::slotTerminateSync(bool block)
{
qDebug() << "folder " << alias() << " Terminating!";
if( _thread && _csync ) {
if( _csync ) {
_csync->abort();
// Do not display an error message, user knows his own actions.
@ -462,10 +457,7 @@ void Folder::slotTerminateSync(bool block)
return;
}
_thread->wait();
_csync->deleteLater();
delete _thread;
_thread = 0;
delete _csync;
slotCSyncFinished();
}
setSyncEnabled(false);
@ -544,14 +536,11 @@ void Folder::startSync(const QStringList &pathList)
setProxyDirty(false);
}
if (_thread && _thread->isRunning()) {
if (isBusy()) {
qCritical() << "* ERROR csync is still running and new sync requested.";
return;
}
if (_thread)
_thread->quit();
delete _csync;
delete _thread;
_errors.clear();
_csyncError = false;
_csyncUnavail = false;
@ -562,10 +551,8 @@ void Folder::startSync(const QStringList &pathList)
qDebug() << "*** Start syncing";
_thread = new QThread(this);
setIgnoredFiles();
_csync = new CSyncThread( _csync_ctx, path(), remoteUrl().path(), &_journal);
_csync->moveToThread(_thread);
qRegisterMetaType<SyncFileItemVector>("SyncFileItemVector");
qRegisterMetaType<SyncFileItem::Direction>("SyncFileItem::Direction");
@ -584,9 +571,6 @@ void Folder::startSync(const QStringList &pathList)
connect(_csync, SIGNAL(transmissionProgress(Progress::Info)), this, SLOT(slotTransmissionProgress(Progress::Info)));
connect(_csync, SIGNAL(transmissionProblem(Progress::SyncProblem)), this, SLOT(slotTransmissionProblem(Progress::SyncProblem)));
_thread->start();
_thread->setPriority(QThread::LowPriority);
QMetaObject::invokeMethod(_csync, "startSync", Qt::QueuedConnection);
// disable events until syncing is done
@ -623,6 +607,8 @@ void Folder::slotCsyncUnavailable()
void Folder::slotCSyncFinished()
{
qDebug() << "-> CSync Finished slot with error " << _csyncError << "warn count" << _syncResult.warnCount();
delete _csync;
_csync = 0;
// _watcher->setEventsEnabledDelayed(2000);
_pollTimer.start();
_timeSinceLastSync.restart();
@ -643,9 +629,6 @@ void Folder::slotCSyncFinished()
_syncResult.setStatus(SyncResult::Success);
}
if( _thread && _thread->isRunning() ) {
_thread->quit();
}
emit syncStateChange();
emit syncFinished( _syncResult );
}

View file

@ -205,7 +205,6 @@ private:
QFileSystemWatcher *_pathWatcher;
bool _enabled;
SyncResult _syncResult;
QThread *_thread;
CSyncThread *_csync;
QStringList _errors;
bool _csyncError;

View file

@ -100,7 +100,8 @@ void PropagateItemJob::done(SyncFileItem::Status status, const QString &errorStr
}
bool PropagateItemJob::checkForProblemsWithShared()
bool PropagateNeonJob::checkForProblemsWithShared()
{
QString errorString = QString::fromUtf8(ne_get_error(_propagator->_session));
int httpStatusCode = errorString.mid(0, errorString.indexOf(QChar(' '))).toInt();
@ -114,13 +115,13 @@ bool PropagateItemJob::checkForProblemsWithShared()
_restoreJob.reset(new PropagateDownloadFile(_propagator, downloadItem));
connect(_restoreJob.data(), SIGNAL(completed(SyncFileItem)),
this, SLOT(slotRestoreJobCompleted(SyncFileItem)));
_restoreJob->start();
QMetaObject::invokeMethod(_restoreJob.data(), "start");
return true;
}
return false;
}
void PropagateItemJob::slotRestoreJobCompleted(const SyncFileItem& item )
void PropagateNeonJob::slotRestoreJobCompleted(const SyncFileItem& item )
{
if( item._status == SyncFileItem::Success ) {
done( SyncFileItem::SoftError, tr("The file was removed from a read only share. The file has been restored."));
@ -456,8 +457,7 @@ void PropagateUploadFile::notify_status_cb(void* userdata, ne_session_status sta
that->_chunked_done + info->sr.progress,
that->_chunked_total_size ? that->_chunked_total_size : info->sr.total );
QCoreApplication::processEvents();
that->limitBandwidth(that->_chunked_done + info->sr.progress, that->_propagator->_uploadLimit);
that->limitBandwidth(that->_chunked_done + info->sr.progress, that->_propagator->_uploadLimit.fetchAndAddAcquire(0));
}
}
@ -473,7 +473,7 @@ static QString parseFileId(ne_request *req) {
return fileId;
}
void PropagateItemJob::updateMTimeAndETag(const char* uri, time_t mtime)
void PropagateNeonJob::updateMTimeAndETag(const char* uri, time_t mtime)
{
QByteArray modtime = QByteArray::number(qlonglong(mtime));
ne_propname pname;
@ -519,7 +519,7 @@ void PropagateItemJob::updateMTimeAndETag(const char* uri, time_t mtime)
}
}
void PropagateItemJob::limitBandwidth(qint64 progress, qint64 bandwidth_limit)
void PropagateNeonJob::limitBandwidth(qint64 progress, qint64 bandwidth_limit)
{
if (bandwidth_limit > 0) {
int64_t diff = _lastTime.nsecsElapsed() / 1000;
@ -649,8 +649,7 @@ void PropagateDownloadFile::notify_status_cb(void* userdata, ne_session_status s
if (status == ne_status_recving && info->sr.total > 0) {
emit that->progress(Progress::Context, that->_item, info->sr.progress, info->sr.total );
QCoreApplication::processEvents();
that->limitBandwidth(info->sr.progress, that->_propagator->_downloadLimit);
that->limitBandwidth(info->sr.progress, that->_propagator->_downloadLimit.fetchAndAddAcquire(0));
}
}
@ -925,7 +924,7 @@ void PropagateRemoteRename::start()
done(SyncFileItem::Success);
}
bool PropagateItemJob::updateErrorFromSession(int neon_code, ne_request* req, int ignoreHttpCode)
bool PropagateNeonJob::updateErrorFromSession(int neon_code, ne_request* req, int ignoreHttpCode)
{
if( neon_code != NE_OK ) {
qDebug("Neon error code was %d", neon_code);
@ -1070,7 +1069,7 @@ void OwncloudPropagator::start(const SyncFileItemVector& _syncedItems)
SIGNAL(progress(Progress::Kind,SyncFileItem,quint64,quint64)));
connect(_rootJob.data(), SIGNAL(finished(SyncFileItem::Status)), this, SIGNAL(finished()));
_rootJob->start();
QMetaObject::invokeMethod(_rootJob.data(), "start");
}
void OwncloudPropagator::overallTransmissionSizeChanged(qint64 change)

View file

@ -86,7 +86,7 @@ private slots:
connect(next, SIGNAL(finished(SyncFileItem::Status)), this, SLOT(proceedNext(SyncFileItem::Status)), Qt::QueuedConnection);
connect(next, SIGNAL(completed(SyncFileItem)), this, SIGNAL(completed(SyncFileItem)));
connect(next, SIGNAL(progress(Progress::Kind,SyncFileItem,quint64,quint64)), this, SIGNAL(progress(Progress::Kind,SyncFileItem,quint64,quint64)));
next->start();
QMetaObject::invokeMethod(next, "start");
}
void proceedNext(SyncFileItem::Status status);
@ -95,42 +95,21 @@ private slots:
/*
* Abstract class to propagate a single item
* (Only used for neon job)
*/
class PropagateItemJob : public PropagatorJob {
Q_OBJECT
protected:
void done(SyncFileItem::Status status, const QString &errorString = QString());
void updateMTimeAndETag(const char *uri, time_t);
/* fetch the error code and string from the session
in case of error, calls done with the error and returns true.
If the HTTP error code is ignoreHTTPError, the error is ignored
*/
bool updateErrorFromSession(int neon_code = 0, ne_request *req = 0, int ignoreHTTPError = 0);
/*
* to be called by the progress callback and will wait the amount of time needed.
*/
void limitBandwidth(qint64 progress, qint64 limit);
bool checkForProblemsWithShared();
QElapsedTimer _lastTime;
qint64 _lastProgress;
int _httpStatusCode;
SyncFileItem _item;
protected slots:
void slotRestoreJobCompleted(const SyncFileItem& );
private:
QScopedPointer<PropagateItemJob> _restoreJob;
public:
PropagateItemJob(OwncloudPropagator* propagator, const SyncFileItem &item)
: PropagatorJob(propagator), _lastProgress(0), _httpStatusCode(0), _item(item) {}
: PropagatorJob(propagator), _item(item) {}
};
@ -153,28 +132,27 @@ class OwncloudPropagator : public QObject {
QScopedPointer<PropagateDirectory> _rootJob;
public:
QThread* _neonThread;
ne_session_s *_session;
QString _localDir; // absolute path to the local directory. ends with '/'
QString _remoteDir; // path to the root of the remote. ends with '/'
const QString _localDir; // absolute path to the local directory. ends with '/'
const QString _remoteDir; // path to the root of the remote. ends with '/'
SyncJournalDb *_journal;
public:
OwncloudPropagator(ne_session_s *session, const QString &localDir, const QString &remoteDir,
SyncJournalDb *progressDb, QAtomicInt *abortRequested)
: _session(session)
, _localDir(localDir)
, _remoteDir(remoteDir)
SyncJournalDb *progressDb, QAtomicInt *abortRequested, QThread *neonThread)
: _neonThread(neonThread)
, _session(session)
, _localDir((localDir.endsWith(QChar('/'))) ? localDir : localDir+'/' )
, _remoteDir((remoteDir.endsWith(QChar('/'))) ? remoteDir : remoteDir+'/' )
, _journal(progressDb)
, _abortRequested(abortRequested)
{
if (!localDir.endsWith(QChar('/'))) _localDir+='/';
if (!remoteDir.endsWith(QChar('/'))) _remoteDir+='/';
}
{ }
void start(const SyncFileItemVector &_syncedItems);
int _downloadLimit;
int _uploadLimit;
QAtomicInt _downloadLimit;
QAtomicInt _uploadLimit;
QAtomicInt *_abortRequested; // boolean set by the main thread to abort.

View file

@ -34,6 +34,49 @@ struct ScopedPointerHelpers {
};
/*
* Abstract class for neon job. Lives in the neon thread
*/
class PropagateNeonJob : public PropagateItemJob {
Q_OBJECT
protected:
void updateMTimeAndETag(const char *uri, time_t);
/* fetch the error code and string from the session
in case of error, calls done with the error and returns true.
If the HTTP error code is ignoreHTTPError, the error is ignored
*/
bool updateErrorFromSession(int neon_code = 0, ne_request *req = 0, int ignoreHTTPError = 0);
/*
* to be called by the progress callback and will wait the amount of time needed.
*/
void limitBandwidth(qint64 progress, qint64 limit);
bool checkForProblemsWithShared();
QElapsedTimer _lastTime;
qint64 _lastProgress;
int _httpStatusCode;
protected slots:
void slotRestoreJobCompleted(const SyncFileItem& );
private:
QScopedPointer<PropagateItemJob> _restoreJob;
public:
PropagateNeonJob(OwncloudPropagator* propagator, const SyncFileItem &item)
: PropagateItemJob(propagator, item), _lastProgress(0), _httpStatusCode(0) {
moveToThread(propagator->_neonThread);
}
};
class PropagateLocalRemove : public PropagateItemJob {
Q_OBJECT
public:
@ -46,16 +89,16 @@ public:
PropagateLocalMkdir (OwncloudPropagator* propagator,const SyncFileItem& item) : PropagateItemJob(propagator, item) {}
void start();
};
class PropagateRemoteRemove : public PropagateItemJob {
class PropagateRemoteRemove : public PropagateNeonJob {
Q_OBJECT
public:
PropagateRemoteRemove (OwncloudPropagator* propagator,const SyncFileItem& item) : PropagateItemJob(propagator, item) {}
PropagateRemoteRemove (OwncloudPropagator* propagator,const SyncFileItem& item) : PropagateNeonJob(propagator, item) {}
void start();
};
class PropagateRemoteMkdir : public PropagateItemJob {
class PropagateRemoteMkdir : public PropagateNeonJob {
Q_OBJECT
public:
PropagateRemoteMkdir (OwncloudPropagator* propagator,const SyncFileItem& item) : PropagateItemJob(propagator, item) {}
PropagateRemoteMkdir (OwncloudPropagator* propagator,const SyncFileItem& item) : PropagateNeonJob(propagator, item) {}
void start();
};
class PropagateLocalRename : public PropagateItemJob {
@ -64,18 +107,18 @@ public:
PropagateLocalRename (OwncloudPropagator* propagator,const SyncFileItem& item) : PropagateItemJob(propagator, item) {}
void start();
};
class PropagateRemoteRename : public PropagateItemJob {
class PropagateRemoteRename : public PropagateNeonJob {
Q_OBJECT
public:
PropagateRemoteRename (OwncloudPropagator* propagator,const SyncFileItem& item) : PropagateItemJob(propagator, item) {}
PropagateRemoteRename (OwncloudPropagator* propagator,const SyncFileItem& item) : PropagateNeonJob(propagator, item) {}
void start();
};
class PropagateUploadFile: public PropagateItemJob {
class PropagateUploadFile: public PropagateNeonJob {
Q_OBJECT
public:
explicit PropagateUploadFile(OwncloudPropagator* propagator,const SyncFileItem& item)
: PropagateItemJob(propagator, item), _previousFileSize(0) {}
: PropagateNeonJob(propagator, item), _previousFileSize(0) {}
void start();
private:
// Log callback for httpbf
@ -100,11 +143,11 @@ private:
qint64 _previousFileSize; // In case the file size has changed during upload, this is the previous one.
};
class PropagateDownloadFile: public PropagateItemJob {
class PropagateDownloadFile: public PropagateNeonJob {
Q_OBJECT
public:
explicit PropagateDownloadFile(OwncloudPropagator* propagator,const SyncFileItem& item)
: PropagateItemJob(propagator, item), _file(0) {}
: PropagateNeonJob(propagator, item), _file(0) {}
void start();
private:
QFile *_file;