Refactor the new propagator in jobs

This makes the code (IMHO) more easy to understand, and will allow
even more easy parallelism
This commit is contained in:
Olivier Goffart 2013-10-28 10:47:10 +01:00
parent 8e90782107
commit 84a40dcb59
8 changed files with 550 additions and 558 deletions

View file

@ -66,7 +66,7 @@ CSyncThread::CSyncThread(CSYNC *csync, const QString &localPath, const QString &
_journal = journal;
_mutex.unlock();
qRegisterMetaType<SyncFileItem>("SyncFileItem");
qRegisterMetaType<CSYNC_STATUS>("CSYNC_STATUS");
qRegisterMetaType<SyncFileItem::Status>("SyncFileItem::Status");
}
CSyncThread::~CSyncThread()
@ -410,8 +410,6 @@ void CSyncThread::startSync()
it->_file = adjustRenamedPath(it->_file);
}
qSort(_syncedItems);
if (!_hasFiles && !_syncedItems.isEmpty()) {
qDebug() << Q_FUNC_INFO << "All the files are going to be removed, asking the user";
bool cancel = false;
@ -436,7 +434,7 @@ void CSyncThread::startSync()
this, SLOT(transferCompleted(SyncFileItem)), Qt::QueuedConnection);
connect(_propagator.data(), SIGNAL(progress(Progress::Kind,QString,quint64,quint64)),
this, SLOT(slotProgress(Progress::Kind,QString,quint64,quint64)));
_iterator = 0;
connect(_propagator.data(), SIGNAL(finished()), this, SLOT(slotFinished()));
int downloadLimit = 0;
if (cfg.useDownloadLimit()) {
@ -454,8 +452,7 @@ void CSyncThread::startSync()
_propagator->_uploadLimit = uploadLimit;
slotProgress(Progress::StartSync, QString(), 0, 0);
startNextTransfer();
_propagator->start(_syncedItems);
}
void CSyncThread::transferCompleted(const SyncFileItem &item)
@ -463,107 +460,20 @@ void CSyncThread::transferCompleted(const SyncFileItem &item)
qDebug() << Q_FUNC_INFO << item._file << item._status << item._errorString;
/* Update the _syncedItems vector */
// Search for the item in the starting from _iterator because it should be a bit before it.
// This works because SyncFileItem::operator== only compare the file name;
int idx = _syncedItems.lastIndexOf(item, _iterator);
int idx = _syncedItems.indexOf(item);
if (idx >= 0) {
_syncedItems[idx]._instruction = item._instruction;
_syncedItems[idx]._errorString = item._errorString;
_syncedItems[idx]._status = item._status;
}
/* Remember deleted directory */
if (item._isDirectory && item._instruction == CSYNC_INSTRUCTION_DELETED) {
_lastDeleted = item._file;
} else {
_lastDeleted.clear();
}
/* Update the database */
if (item._instruction == CSYNC_INSTRUCTION_DELETED) {
_journal->deleteFileRecord(item._originalFile);
if (!item._renameTarget.isEmpty()) {
SyncJournalFileRecord record(item, _localPath + item._renameTarget);
record._path = item._renameTarget;
_journal->setFileRecord(record);
}
} else if(item._instruction == CSYNC_INSTRUCTION_ERROR) {
// Don't update parents directories
_directoriesToUpdate.clear();
} else if (item._isDirectory) {
// directory must not be saved to the db before we finished processing them.
SyncJournalFileRecord record(item, _localPath + item._file);
_directoriesToUpdate.push(record);
} else if(item._instruction == CSYNC_INSTRUCTION_UPDATED) {
SyncJournalFileRecord record(item, _localPath + item._file);
_journal->setFileRecord(record);
slotProgress((item._dir != SyncFileItem::Up) ? Progress::EndDownload : Progress::EndUpload,
item._file, item._size, item._size);
_progressInfo.current_file_no++;
_progressInfo.overall_current_bytes += item._size;
}
/* Start the transfer of the next file or abort if there is an error */
if (item._status != SyncFileItem::FatalError) {
startNextTransfer();
} else {
emit treeWalkResult(_syncedItems);
if (item._status == SyncFileItem::FatalError) {
emit csyncError(item._errorString);
emit finished();
csync_commit(_csync_ctx);
_syncMutex.unlock();
thread()->quit();
}
}
void CSyncThread::startNextTransfer()
void CSyncThread::slotFinished()
{
while (_iterator < _syncedItems.size()) {
const SyncFileItem &item = _syncedItems.at(_iterator);
++_iterator;
while (!_directoriesToUpdate.isEmpty() && !item._file.startsWith(_directoriesToUpdate.last()._path)) {
// We are leaving a directory. Everything we needed to download from that directory is done.
// Update the directory etag in the database to the new one.
_journal->setFileRecord(_directoriesToUpdate.pop());
}
if (!_lastDeleted.isEmpty() &&
item._file.startsWith(_lastDeleted) ) {
if( item._instruction != CSYNC_INSTRUCTION_REMOVE ) {
qDebug() << "WRN: Child of a deleted directory has different instruction than delete."
<< item._file << _lastDeleted << item._instruction;
} else {
// If the item's name starts with the name of the previously deleted directory, we
// can assume this file was already destroyed by the previous call.
_journal->deleteFileRecord(item._file);
continue;
}
}
if (item._instruction == CSYNC_INSTRUCTION_SYNC || item._instruction == CSYNC_INSTRUCTION_NEW
|| item._instruction == CSYNC_INSTRUCTION_CONFLICT) {
slotProgress((item._dir != SyncFileItem::Up) ? Progress::StartDownload : Progress::StartUpload,
item._file, 0, item._size);
}
_propagator->propagate(item);
return; //propagate is async.
}
// We are finished !!
while (!_directoriesToUpdate.isEmpty()) {
// Save the etag of directories to the database.
_journal->setFileRecord(_directoriesToUpdate.pop());
}
// emit the treewalk results.
emit treeWalkResult(_syncedItems);
@ -577,51 +487,6 @@ void CSyncThread::startNextTransfer()
thread()->quit();
}
Progress::Kind CSyncThread::csyncToProgressKind( enum csync_notify_type_e kind )
{
Progress::Kind pKind = Progress::Invalid;
switch(kind) {
case CSYNC_NOTIFY_INVALID:
pKind = Progress::Invalid;
break;
case CSYNC_NOTIFY_START_SYNC_SEQUENCE:
pKind = Progress::StartSync;
break;
case CSYNC_NOTIFY_START_DOWNLOAD:
pKind = Progress::StartDownload;
break;
case CSYNC_NOTIFY_START_UPLOAD:
pKind = Progress::StartUpload;
break;
case CSYNC_NOTIFY_PROGRESS:
pKind = Progress::Context;
break;
case CSYNC_NOTIFY_FINISHED_DOWNLOAD:
pKind = Progress::EndDownload;
break;
case CSYNC_NOTIFY_FINISHED_UPLOAD:
pKind = Progress::EndUpload;
break;
case CSYNC_NOTIFY_FINISHED_SYNC_SEQUENCE:
pKind = Progress::EndSync;
break;
case CSYNC_NOTIFY_START_DELETE:
pKind = Progress::StartDelete;
break;
case CSYNC_NOTIFY_END_DELETE:
pKind = Progress::EndDelete;
break;
case CSYNC_NOTIFY_ERROR:
pKind = Progress::Error;
break;
default:
pKind = Progress::Invalid;
break;
}
return pKind;
}
void CSyncThread::slotProgress(Progress::Kind kind, const QString &file, quint64 curr, quint64 total)
{
Progress::Info pInfo = _progressInfo;

View file

@ -33,8 +33,6 @@
class QProcess;
Q_DECLARE_METATYPE(CSYNC_STATUS)
namespace Mirall {
class SyncJournalFileRecord;
@ -80,7 +78,7 @@ signals:
private slots:
void transferCompleted(const SyncFileItem& item);
void startNextTransfer();
void slotFinished();
void slotProgress(Progress::Kind kind, const QString& file, quint64, quint64);
private:
@ -90,13 +88,9 @@ private:
static int treewalkRemote( TREE_WALK_FILE*, void *);
int treewalkFile( TREE_WALK_FILE*, bool );
Progress::Kind csyncToProgressKind( enum csync_notify_type_e kind );
static QMutex _mutex;
static QMutex _syncMutex;
SyncFileItemVector _syncedItems;
int _iterator; // index in _syncedItems for the next item to process.
QStack<SyncJournalFileRecord> _directoriesToUpdate;
CSYNC *_csync_ctx;
bool _needsUpdate;

File diff suppressed because it is too large Load diff

View file

@ -30,51 +30,139 @@ struct ne_decompress_s;
namespace Mirall {
class SyncJournalDb;
class OwncloudPropagator;
class PropagatorJob : public QObject {
Q_OBJECT
protected:
OwncloudPropagator *_propagator;
public:
explicit PropagatorJob(OwncloudPropagator* propagator) : _propagator(propagator) {}
public slots:
virtual void start() = 0;
signals:
void finished(SyncFileItem::Status);
void completed(const SyncFileItem &);
void progress(Progress::Kind, const QString &filename, quint64 bytes, quint64 total);
};
/*
* Propagate a directory, and all its sub entries.
*/
class PropagateDirectory : public PropagatorJob {
Q_OBJECT
public:
// e.g: create the directory
QScopedPointer<PropagatorJob>_firstJob;
// all the sub files or sub directories.
//TODO: in the future, all sub job can be run in parallel
QVector<PropagatorJob *> _subJobs;
SyncFileItem _item;
int _current; // index of the current running job
bool _hasError; // weather there was an error
explicit PropagateDirectory(OwncloudPropagator *propagator, const SyncFileItem &item = SyncFileItem())
: PropagatorJob(propagator)
, _firstJob(0), _item(item), _current(-1), _hasError(false) { }
virtual ~PropagateDirectory() {
qDeleteAll(_subJobs);
}
void append(PropagatorJob *subJob) {
_subJobs.append(subJob);
}
virtual void start() {
_current = -1;
_hasError = false;
if (!_firstJob) {
proceedNext(SyncFileItem::Success);
} else {
startJob(_firstJob.data());
}
}
private slots:
void startJob(PropagatorJob *next) {
connect(next, SIGNAL(finished(SyncFileItem::Status)), this, SLOT(proceedNext(SyncFileItem::Status)), Qt::QueuedConnection);
connect(next, SIGNAL(completed(SyncFileItem)), this, SIGNAL(completed(SyncFileItem)));
connect(next, SIGNAL(progress(Progress::Kind,QString,quint64,quint64)), this, SIGNAL(progress(Progress::Kind,QString,quint64,quint64)));
next->start();
}
void proceedNext(SyncFileItem::Status status);
};
/*
* Abstract class to propagate a single item
*/
class PropagateItemJob : public PropagatorJob {
Q_OBJECT
protected:
SyncFileItem _item;
void done(SyncFileItem::Status status, const QString &errorString = QString()) {
if (status == SyncFileItem::FatalError || status == SyncFileItem::NormalError) {
_item._instruction = CSYNC_INSTRUCTION_ERROR;
}
_item._errorString = errorString;
_item._status = status;
emit completed(_item);
emit finished(status);
}
void updateMTimeAndETag(const char *uri, time_t);
void getFileId( const char *uri );
/* fetch the error code and string from the session
in case of error, calls done with the error and returns true.
If the HTTP error code is ignoreHTTPError, the error is ignored
*/
bool updateErrorFromSession(int neon_code = 0, ne_request *req = 0, int ignoreHTTPError = 0);
/*
* to be called by the progress callback and will wait the amount of time needed.
*/
void limitBandwidth(qint64 progress, qint64 limit);
QElapsedTimer _lastTime;
qint64 _lastProgress;
public:
PropagateItemJob(OwncloudPropagator* propagator, const SyncFileItem &item)
: PropagatorJob(propagator), _item(item), _lastProgress(0) {}
};
// Dummy job that just mark it as completed and ignored.
class PropagateIgnoreJob : public PropagateItemJob {
Q_OBJECT
public:
PropagateIgnoreJob(OwncloudPropagator* propagator,const SyncFileItem& item)
: PropagateItemJob(propagator, item) {}
void start() {
done(SyncFileItem::FileIgnored);
}
};
class OwncloudPropagator : public QObject {
Q_OBJECT
PropagateItemJob *createJob(const SyncFileItem& item);
QScopedPointer<PropagateDirectory> _rootJob;
public:
ne_session_s *_session;
QString _localDir; // absolute path to the local directory. ends with '/'
QString _remoteDir; // path to the root of the remote. ends with '/'
SyncJournalDb *_journal;
QString _errorString;
SyncFileItem::Status _status;
bool check_neon_session();
csync_instructions_e localRemove(const SyncFileItem &);
csync_instructions_e localMkdir(const SyncFileItem &);
csync_instructions_e remoteRemove(const SyncFileItem &);
csync_instructions_e remoteMkdir(const SyncFileItem &);
csync_instructions_e downloadFile(const SyncFileItem &, bool isConflict = false);
csync_instructions_e uploadFile(const SyncFileItem &);
csync_instructions_e remoteRename(const SyncFileItem &);
void updateMTimeAndETag(const char *uri, time_t);
void getFileId( const char *uri );
/* fetch the error code and string from the session
* updates _status, _httpStatusCode and _errorString. and httpStatusCode
* Returns true if there was an error.
*/
bool updateErrorFromSession(int neon_code = 0, ne_request *req = 0, int *httpStatusCode = 0);
QElapsedTimer _lastTime;
quint64 _lastProgress;
quint64 _chunked_total_size;
quint64 _chunked_done;
QString _currentFile;
static void notify_status_cb (void *userdata, ne_session_status status,
const ne_session_status_info *info);
static void chunk_finished_cb(hbf_transfer_s *,int, void *userdata);
public:
OwncloudPropagator(ne_session_s *session, const QString &localDir, const QString &remoteDir,
SyncJournalDb *progressDb, QAtomicInt *abortRequested)
@ -87,9 +175,8 @@ public:
if (!localDir.endsWith(QChar('/'))) _localDir+='/';
if (!remoteDir.endsWith(QChar('/'))) _remoteDir+='/';
}
void propagate(const SyncFileItem &);
QByteArray _etag;
QString _fileId;
void start(const SyncFileItemVector &_syncedItems);
int _downloadLimit;
int _uploadLimit;
@ -99,7 +186,7 @@ public:
signals:
void completed(const SyncFileItem &);
void progress(Progress::Kind, const QString &filename, quint64 bytes, quint64 total);
void finished();
};
}

View file

@ -56,12 +56,6 @@ public:
}
friend bool operator<(const SyncFileItem& item1, const SyncFileItem& item2) {
// Delete at the end:
if (item1._instruction == CSYNC_INSTRUCTION_REMOVE && item2._instruction != CSYNC_INSTRUCTION_REMOVE)
return false;
if (item1._instruction != CSYNC_INSTRUCTION_REMOVE && item2._instruction == CSYNC_INSTRUCTION_REMOVE)
return true;
// Sort by destination
return item1.destination() < item2.destination();
}

View file

@ -226,22 +226,33 @@ bool SyncJournalDb::setFileRecord( const SyncJournalFileRecord& record )
}
}
bool SyncJournalDb::deleteFileRecord(const QString& filename)
bool SyncJournalDb::deleteFileRecord(const QString& filename, bool recursively)
{
QMutexLocker locker(&_mutex);
qlonglong phash = getPHash(filename);
if( checkConnect() ) {
if (recursively) {
qlonglong phash = getPHash(filename);
QSqlQuery query( "DELETE FROM metadata WHERE phash=?", _db );
query.bindValue( 0, QString::number(phash) );
QSqlQuery query( "DELETE FROM metadata WHERE phash=?" );
query.bindValue( 0, QString::number(phash) );
if( !query.exec() ) {
qWarning() << "Exec error of SQL statement: " << query.lastQuery() << " : " << query.lastError().text();
return false;
}
qDebug() << query.executedQuery() << phash << filename;
return true;
} else {
QSqlQuery query( "DELETE FROM metadata WHERE path LIKE(?||'/%')", _db );
query.bindValue( 0, filename );
if( !query.exec() ) {
qWarning() << "Exec error of SQL statement: " << query.lastQuery() << " : " << query.lastError().text();
return false;
if( !query.exec() ) {
qWarning() << "Exec error of SQL statement: " << query.lastQuery() << " : " << query.lastError().text();
return false;
}
qDebug() << query.executedQuery() << filename;
return true;
}
qDebug() << query.executedQuery() << phash << filename;
return true;
} else {
qDebug() << "Failed to connect database.";
return false; // checkConnect failed.

View file

@ -29,7 +29,7 @@ public:
explicit SyncJournalDb(const QString& path, QObject *parent = 0);
SyncJournalFileRecord getFileRecord( const QString& filename );
bool setFileRecord( const SyncJournalFileRecord& record );
bool deleteFileRecord( const QString& filename );
bool deleteFileRecord( const QString& filename, bool recursively = false );
int getFileRecordCount();
bool exists();
QStringList tableColumns( const QString& table );

View file

@ -120,5 +120,6 @@ int main(int argc, char **argv) {
app.exec();
csync_destroy(_csync_ctx);
return 0;
}