From 84a40dcb59d9c6b02390003cdd7e62d901973405 Mon Sep 17 00:00:00 2001 From: Olivier Goffart Date: Mon, 28 Oct 2013 10:47:10 +0100 Subject: [PATCH] Refactor the new propagator in jobs This makes the code (IMHO) more easy to understand, and will allow even more easy parallelism --- src/mirall/csyncthread.cpp | 147 +----- src/mirall/csyncthread.h | 8 +- src/mirall/owncloudpropagator.cpp | 748 ++++++++++++++++-------------- src/mirall/owncloudpropagator.h | 167 +++++-- src/mirall/syncfileitem.h | 6 - src/mirall/syncjournaldb.cpp | 29 +- src/mirall/syncjournaldb.h | 2 +- src/owncloudcmd/owncloudcmd.cpp | 1 + 8 files changed, 550 insertions(+), 558 deletions(-) diff --git a/src/mirall/csyncthread.cpp b/src/mirall/csyncthread.cpp index 0b26f4eb0..1f6a06581 100644 --- a/src/mirall/csyncthread.cpp +++ b/src/mirall/csyncthread.cpp @@ -66,7 +66,7 @@ CSyncThread::CSyncThread(CSYNC *csync, const QString &localPath, const QString & _journal = journal; _mutex.unlock(); qRegisterMetaType("SyncFileItem"); - qRegisterMetaType("CSYNC_STATUS"); + qRegisterMetaType("SyncFileItem::Status"); } CSyncThread::~CSyncThread() @@ -410,8 +410,6 @@ void CSyncThread::startSync() it->_file = adjustRenamedPath(it->_file); } - qSort(_syncedItems); - if (!_hasFiles && !_syncedItems.isEmpty()) { qDebug() << Q_FUNC_INFO << "All the files are going to be removed, asking the user"; bool cancel = false; @@ -436,7 +434,7 @@ void CSyncThread::startSync() this, SLOT(transferCompleted(SyncFileItem)), Qt::QueuedConnection); connect(_propagator.data(), SIGNAL(progress(Progress::Kind,QString,quint64,quint64)), this, SLOT(slotProgress(Progress::Kind,QString,quint64,quint64))); - _iterator = 0; + connect(_propagator.data(), SIGNAL(finished()), this, SLOT(slotFinished())); int downloadLimit = 0; if (cfg.useDownloadLimit()) { @@ -454,8 +452,7 @@ void CSyncThread::startSync() _propagator->_uploadLimit = uploadLimit; slotProgress(Progress::StartSync, QString(), 0, 0); - - startNextTransfer(); + _propagator->start(_syncedItems); } void CSyncThread::transferCompleted(const SyncFileItem &item) @@ -463,107 +460,20 @@ void CSyncThread::transferCompleted(const SyncFileItem &item) qDebug() << Q_FUNC_INFO << item._file << item._status << item._errorString; /* Update the _syncedItems vector */ - - // Search for the item in the starting from _iterator because it should be a bit before it. - // This works because SyncFileItem::operator== only compare the file name; - int idx = _syncedItems.lastIndexOf(item, _iterator); + int idx = _syncedItems.indexOf(item); if (idx >= 0) { _syncedItems[idx]._instruction = item._instruction; _syncedItems[idx]._errorString = item._errorString; _syncedItems[idx]._status = item._status; } - /* Remember deleted directory */ - - if (item._isDirectory && item._instruction == CSYNC_INSTRUCTION_DELETED) { - _lastDeleted = item._file; - } else { - _lastDeleted.clear(); - } - - /* Update the database */ - - if (item._instruction == CSYNC_INSTRUCTION_DELETED) { - _journal->deleteFileRecord(item._originalFile); - if (!item._renameTarget.isEmpty()) { - SyncJournalFileRecord record(item, _localPath + item._renameTarget); - record._path = item._renameTarget; - _journal->setFileRecord(record); - } - } else if(item._instruction == CSYNC_INSTRUCTION_ERROR) { - // Don't update parents directories - _directoriesToUpdate.clear(); - } else if (item._isDirectory) { - // directory must not be saved to the db before we finished processing them. - SyncJournalFileRecord record(item, _localPath + item._file); - _directoriesToUpdate.push(record); - } else if(item._instruction == CSYNC_INSTRUCTION_UPDATED) { - SyncJournalFileRecord record(item, _localPath + item._file); - _journal->setFileRecord(record); - - slotProgress((item._dir != SyncFileItem::Up) ? Progress::EndDownload : Progress::EndUpload, - item._file, item._size, item._size); - _progressInfo.current_file_no++; - _progressInfo.overall_current_bytes += item._size; - - } - - /* Start the transfer of the next file or abort if there is an error */ - - if (item._status != SyncFileItem::FatalError) { - startNextTransfer(); - } else { - emit treeWalkResult(_syncedItems); + if (item._status == SyncFileItem::FatalError) { emit csyncError(item._errorString); - emit finished(); - csync_commit(_csync_ctx); - _syncMutex.unlock(); - thread()->quit(); } } -void CSyncThread::startNextTransfer() +void CSyncThread::slotFinished() { - while (_iterator < _syncedItems.size()) { - const SyncFileItem &item = _syncedItems.at(_iterator); - ++_iterator; - - while (!_directoriesToUpdate.isEmpty() && !item._file.startsWith(_directoriesToUpdate.last()._path)) { - // We are leaving a directory. Everything we needed to download from that directory is done. - // Update the directory etag in the database to the new one. - _journal->setFileRecord(_directoriesToUpdate.pop()); - } - - if (!_lastDeleted.isEmpty() && - item._file.startsWith(_lastDeleted) ) { - if( item._instruction != CSYNC_INSTRUCTION_REMOVE ) { - qDebug() << "WRN: Child of a deleted directory has different instruction than delete." - << item._file << _lastDeleted << item._instruction; - } else { - // If the item's name starts with the name of the previously deleted directory, we - // can assume this file was already destroyed by the previous call. - _journal->deleteFileRecord(item._file); - continue; - } - } - - if (item._instruction == CSYNC_INSTRUCTION_SYNC || item._instruction == CSYNC_INSTRUCTION_NEW - || item._instruction == CSYNC_INSTRUCTION_CONFLICT) { - slotProgress((item._dir != SyncFileItem::Up) ? Progress::StartDownload : Progress::StartUpload, - item._file, 0, item._size); - } - - _propagator->propagate(item); - return; //propagate is async. - } - - // We are finished !! - - while (!_directoriesToUpdate.isEmpty()) { - // Save the etag of directories to the database. - _journal->setFileRecord(_directoriesToUpdate.pop()); - } - // emit the treewalk results. emit treeWalkResult(_syncedItems); @@ -577,51 +487,6 @@ void CSyncThread::startNextTransfer() thread()->quit(); } -Progress::Kind CSyncThread::csyncToProgressKind( enum csync_notify_type_e kind ) -{ - Progress::Kind pKind = Progress::Invalid; - - switch(kind) { - case CSYNC_NOTIFY_INVALID: - pKind = Progress::Invalid; - break; - case CSYNC_NOTIFY_START_SYNC_SEQUENCE: - pKind = Progress::StartSync; - break; - case CSYNC_NOTIFY_START_DOWNLOAD: - pKind = Progress::StartDownload; - break; - case CSYNC_NOTIFY_START_UPLOAD: - pKind = Progress::StartUpload; - break; - case CSYNC_NOTIFY_PROGRESS: - pKind = Progress::Context; - break; - case CSYNC_NOTIFY_FINISHED_DOWNLOAD: - pKind = Progress::EndDownload; - break; - case CSYNC_NOTIFY_FINISHED_UPLOAD: - pKind = Progress::EndUpload; - break; - case CSYNC_NOTIFY_FINISHED_SYNC_SEQUENCE: - pKind = Progress::EndSync; - break; - case CSYNC_NOTIFY_START_DELETE: - pKind = Progress::StartDelete; - break; - case CSYNC_NOTIFY_END_DELETE: - pKind = Progress::EndDelete; - break; - case CSYNC_NOTIFY_ERROR: - pKind = Progress::Error; - break; - default: - pKind = Progress::Invalid; - break; - } - return pKind; -} - void CSyncThread::slotProgress(Progress::Kind kind, const QString &file, quint64 curr, quint64 total) { Progress::Info pInfo = _progressInfo; diff --git a/src/mirall/csyncthread.h b/src/mirall/csyncthread.h index 06ec21490..b46ed34ea 100644 --- a/src/mirall/csyncthread.h +++ b/src/mirall/csyncthread.h @@ -33,8 +33,6 @@ class QProcess; -Q_DECLARE_METATYPE(CSYNC_STATUS) - namespace Mirall { class SyncJournalFileRecord; @@ -80,7 +78,7 @@ signals: private slots: void transferCompleted(const SyncFileItem& item); - void startNextTransfer(); + void slotFinished(); void slotProgress(Progress::Kind kind, const QString& file, quint64, quint64); private: @@ -90,13 +88,9 @@ private: static int treewalkRemote( TREE_WALK_FILE*, void *); int treewalkFile( TREE_WALK_FILE*, bool ); - Progress::Kind csyncToProgressKind( enum csync_notify_type_e kind ); - static QMutex _mutex; static QMutex _syncMutex; SyncFileItemVector _syncedItems; - int _iterator; // index in _syncedItems for the next item to process. - QStack _directoriesToUpdate; CSYNC *_csync_ctx; bool _needsUpdate; diff --git a/src/mirall/owncloudpropagator.cpp b/src/mirall/owncloudpropagator.cpp index 0dcc2e325..cdc8b128f 100644 --- a/src/mirall/owncloudpropagator.cpp +++ b/src/mirall/owncloudpropagator.cpp @@ -15,6 +15,7 @@ #include "owncloudpropagator.h" #include "syncjournaldb.h" +#include "syncjournalfilerecord.h" #include #include #include @@ -27,6 +28,7 @@ #endif #include #include +#include #include #include @@ -55,68 +57,14 @@ struct ScopedPointerHelpers { // static inline void cleanup(ne_propfind_handler *pointer) { if (pointer) ne_propfind_destroy(pointer); } }; -void OwncloudPropagator::propagate(const SyncFileItem &item) -{ - _errorString.clear(); - _etag = item._etag; - _fileId = item._fileId; - - if (_abortRequested->fetchAndAddRelaxed(0)) { - SyncFileItem newItem = item; - newItem._instruction = CSYNC_INSTRUCTION_ERROR; - newItem._status = SyncFileItem::FatalError; - newItem._errorString = tr("Aborted"); - emit completed(newItem); - return; - } - - _status = SyncFileItem::NoStatus; - csync_instructions_e instruction; - - switch(item._instruction) { - case CSYNC_INSTRUCTION_REMOVE: - instruction = item._dir == SyncFileItem::Down ? localRemove(item) : remoteRemove(item); - break; - case CSYNC_INSTRUCTION_NEW: - if (item._isDirectory) { - instruction = item._dir == SyncFileItem::Down ? localMkdir(item) : remoteMkdir(item); - break; - } //fall trough - case CSYNC_INSTRUCTION_SYNC: - if (item._isDirectory) { - // Should we set the mtime? - instruction = CSYNC_INSTRUCTION_UPDATED; - break; - } - instruction = item._dir == SyncFileItem::Down ? downloadFile(item) : uploadFile(item); - break; - case CSYNC_INSTRUCTION_CONFLICT: - if (item._isDirectory) { - instruction = CSYNC_INSTRUCTION_UPDATED; - break; - } - instruction = downloadFile(item, true); - break; - case CSYNC_INSTRUCTION_RENAME: - instruction = remoteRename(item); - break; - case CSYNC_INSTRUCTION_IGNORE: - _status = SyncFileItem::FileIgnored; - instruction = CSYNC_INSTRUCTION_IGNORE; - break; - default: - instruction = item._instruction; - break; - } - SyncFileItem newItem = item; - newItem._instruction = instruction; - newItem._errorString = _errorString; - newItem._etag = _etag; - newItem._status = _status; - newItem._fileId = _fileId; - - emit completed(newItem); -} +#define DECLARE_JOB(NAME) \ +class NAME : public PropagateItemJob { \ + /* Q_OBJECT */ \ +public: \ + NAME(OwncloudPropagator* propagator,const SyncFileItem& item) \ + : PropagateItemJob(propagator, item) {} \ + void start(); \ +}; // compare two files with given filename and return true if they have the same content static bool fileEquals(const QString &fn1, const QString &fn2) { @@ -171,118 +119,145 @@ static bool removeRecursively(const QString &path) return success; } -csync_instructions_e OwncloudPropagator::localRemove(const SyncFileItem& item) +DECLARE_JOB(PropagateLocalRemove) + +void PropagateLocalRemove::start() { - QString filename = _localDir + item._file; - if (item._isDirectory) { - if (!QDir(filename).exists() || removeRecursively(filename)) { - _status = SyncFileItem::Success; - return CSYNC_INSTRUCTION_DELETED; + QString filename = _propagator->_localDir + _item._file; + if (_item._isDirectory) { + if (QDir(filename).exists() && !removeRecursively(filename)) { + done(SyncFileItem::NormalError, tr("Could not remove directory %1").arg(filename)); + return; } - _errorString = tr("Could not remove directory %1").arg(filename); } else { QFile file(filename); - if (!file.exists() || file.remove()) { - _status = SyncFileItem::Success; - return CSYNC_INSTRUCTION_DELETED; + if (file.exists() && !file.remove()) { + done(SyncFileItem::NormalError, file.errorString()); } - _errorString = file.errorString(); } - _status = SyncFileItem::NormalError; - return CSYNC_INSTRUCTION_ERROR; + _propagator->_journal->deleteFileRecord(_item._originalFile); + _item._instruction = CSYNC_INSTRUCTION_DELETED; + done(SyncFileItem::Success); } -csync_instructions_e OwncloudPropagator::localMkdir(const SyncFileItem &item) +DECLARE_JOB(PropagateLocalMkdir) + +void PropagateLocalMkdir::start() { QDir d; - if (!d.mkpath(_localDir + item._file)) { - _errorString = "could not create directory " + _localDir + item._file; - _status = SyncFileItem::NormalError; - return CSYNC_INSTRUCTION_ERROR; + if (!d.mkpath(_propagator->_localDir + _item._file)) { + done(SyncFileItem::NormalError, tr("could not create directory %1").arg(_propagator->_localDir + _item._file)); + return; } - _status = SyncFileItem::Success; - return CSYNC_INSTRUCTION_UPDATED; + _item._instruction = CSYNC_INSTRUCTION_UPDATED; + done(SyncFileItem::Success); } -csync_instructions_e OwncloudPropagator::remoteRemove(const SyncFileItem &item) +DECLARE_JOB(PropagateRemoteRemove) + +void PropagateRemoteRemove::start() { - QScopedPointer uri(ne_path_escape((_remoteDir + item._file).toUtf8())); - int rc = ne_delete(_session, uri.data()); + QScopedPointer uri( + ne_path_escape((_propagator->_remoteDir + _item._file).toUtf8())); + int rc = ne_delete(_propagator->_session, uri.data()); if (updateErrorFromSession(rc)) { - return CSYNC_INSTRUCTION_ERROR; + return; } - _status = SyncFileItem::Success; - return CSYNC_INSTRUCTION_DELETED; + _item._instruction = CSYNC_INSTRUCTION_DELETED; + _propagator->_journal->deleteFileRecord(_item._originalFile, _item._isDirectory); + done(SyncFileItem::Success); } -csync_instructions_e OwncloudPropagator::remoteMkdir(const SyncFileItem &item) +DECLARE_JOB(PropagateRemoteMkdir) + +void PropagateRemoteMkdir::start() { - QScopedPointer uri(ne_path_escape((_remoteDir + item._file).toUtf8())); - bool error = false; + QScopedPointer uri( + ne_path_escape((_propagator->_remoteDir + _item._file).toUtf8())); - int rc = ne_mkcol(_session, uri.data()); - int httpStatusCode = 0; - error = updateErrorFromSession( rc , 0, &httpStatusCode ); + int rc = ne_mkcol(_propagator->_session, uri.data()); - if( error ) { - /* Special for mkcol: it returns 405 if the directory already exists. - * Ignore that error */ - if (httpStatusCode != 405) { - return CSYNC_INSTRUCTION_ERROR; + /* Special for mkcol: it returns 405 if the directory already exists. + * Ignore that error */ + if( updateErrorFromSession( rc , 0, 405 ) ) { + return; + } + _item._instruction = CSYNC_INSTRUCTION_UPDATED; + done(SyncFileItem::Success); +} + +class PropagateUploadFile: public PropagateItemJob { +public: + explicit PropagateUploadFile(OwncloudPropagator* propagator,const SyncFileItem& item) + : PropagateItemJob(propagator, item) {} + void start(); +private: + // Log callback for httpbf + static void _log_callback(const char *func, const char *text, void*) + { + qDebug() << func << text; + } + + // abort callback for httpbf + static int _user_want_abort(void *userData) + { + return static_cast(userData)->_propagator->_abortRequested->fetchAndAddRelaxed(0); + } + + // callback from httpbf when a chunk is finished + static void chunk_finished_cb(hbf_transfer_s *trans, int chunk, void* userdata) + { + PropagateUploadFile *that = static_cast(userdata); + Q_ASSERT(that); + that->_chunked_done += trans->block_arr[chunk]->size; + if (trans->block_cnt > 1) { + SyncJournalDb::UploadInfo pi; + pi._valid = true; + pi._chunk = chunk + 1; // next chunk to start with + pi._transferid = trans->transfer_id; + pi._modtime = QDateTime::fromTime_t(trans->modtime); + that->_propagator->_journal->setUploadInfo(that->_item._file, pi); } } - _status = SyncFileItem::Success; - return CSYNC_INSTRUCTION_UPDATED; -} -// Log callback for httpbf -static void _log_callback(const char *func, const char *text, void*) -{ - qDebug() << func << text; -} + static void notify_status_cb(void* userdata, ne_session_status status, + const ne_session_status_info* info) + { + PropagateUploadFile* that = reinterpret_cast(userdata); -// abort callback for httpbf -static int _user_want_abort(void *userData) -{ - return static_cast(userData)->_abortRequested->fetchAndAddRelaxed(0); -} + if (status == ne_status_sending && info->sr.total > 0) { + emit that->progress(Progress::Context, that->_item._file , + that->_chunked_done + info->sr.progress, + that->_chunked_total_size ? that->_chunked_total_size : info->sr.total ); -// callback from httpbf when a chunk is finished -void OwncloudPropagator::chunk_finished_cb(hbf_transfer_s *trans, int chunk, void* userdata) -{ - OwncloudPropagator *that = static_cast(userdata); - Q_ASSERT(that); - if (trans->block_cnt > 1) { - SyncJournalDb::UploadInfo pi; - pi._valid = true; - pi._chunk = chunk + 1; // next chunk to start with - pi._transferid = trans->transfer_id; - pi._modtime = QDateTime::fromTime_t(trans->modtime); - that->_journal->setUploadInfo(that->_currentFile, pi); + that->limitBandwidth(that->_chunked_done + info->sr.progress, that->_propagator->_uploadLimit); + } } -} + qint64 _chunked_done; // amount of bytes already sent with the previous chunks + qint64 _chunked_total_size; // total size of the whole file +}; -csync_instructions_e OwncloudPropagator::uploadFile(const SyncFileItem &item) +void PropagateUploadFile::start() { - QFile file(_localDir + item._file); + emit progress(Progress::StartUpload, _item._file, 0, _item._size); + + QFile file(_propagator->_localDir + _item._file); if (!file.open(QIODevice::ReadOnly)) { - _errorString = file.errorString(); - _status = SyncFileItem::NormalError; - return CSYNC_INSTRUCTION_ERROR; + done(SyncFileItem::NormalError, file.errorString()); + return; } - QScopedPointer uri(ne_path_escape((_remoteDir + item._file).toUtf8())); + QScopedPointer uri( + ne_path_escape((_propagator->_remoteDir + _item._file).toUtf8())); - bool finished = true; - int attempts = 0; - _etag.clear(); + int attempts = 0; /* * do ten tries to upload the file chunked. Check the file size and mtime * before submitting a chunk and after having submitted the last one. * If the file has changed, retry. - */ + */ do { Hbf_State state = HBF_SUCCESS; QScopedPointer trans(hbf_init_transfer(uri.data())); @@ -290,45 +265,38 @@ csync_instructions_e OwncloudPropagator::uploadFile(const SyncFileItem &item) hbf_set_log_callback(trans.data(), _log_callback); hbf_set_abort_callback(trans.data(), _user_want_abort); trans.data()->chunk_finished_cb = chunk_finished_cb; - finished = true; Q_ASSERT(trans); state = hbf_splitlist(trans.data(), file.handle()); - const SyncJournalDb::UploadInfo progressInfo = _journal->getUploadInfo(item._file); + const SyncJournalDb::UploadInfo progressInfo = _propagator->_journal->getUploadInfo(_item._file); if (progressInfo._valid) { - if (progressInfo._modtime.toTime_t() == item._modtime) { + if (progressInfo._modtime.toTime_t() == _item._modtime) { trans->start_id = progressInfo._chunk; trans->transfer_id = progressInfo._transferid; } } - ne_set_notifier(_session, notify_status_cb, this); + ne_set_notifier(_propagator->_session, notify_status_cb, this); _lastTime.restart(); _lastProgress = 0; _chunked_done = 0; - _chunked_total_size = item._size; - _currentFile = item._file; - + _chunked_total_size = _item._size; if( state == HBF_SUCCESS ) { QByteArray previousEtag; - if (!item._etag.isEmpty() && item._etag != "empty_etag") { + if (!_item._etag.isEmpty() && _item._etag != "empty_etag") { // We add quotes because the owncloud server always add quotes around the etag, and // csync_owncloud.c's owncloud_file_id always strip the quotes. - previousEtag = '"' + item._etag + '"'; + previousEtag = '"' + _item._etag + '"'; trans->previous_etag = previousEtag.data(); } _chunked_total_size = trans->stat_size; - qDebug() << "About to upload " << item._file << " (" << previousEtag << item._size << ")"; + qDebug() << "About to upload " << _item._file << " (" << previousEtag << _item._size << ")"; /* Transfer all the chunks through the HTTP session using PUT. */ - state = hbf_transfer( _session, trans.data(), "PUT" ); + state = hbf_transfer( _propagator->_session, trans.data(), "PUT" ); } - if( trans->modtime_accepted ) { - _etag = QByteArray(hbf_transfer_etag( trans.data() )); - } - - _fileId = QString::fromUtf8( hbf_transfer_file_id( trans.data() )); + _item._fileId = QString::fromUtf8( hbf_transfer_file_id( trans.data() )); /* Handle errors. */ if ( state != HBF_SUCCESS ) { @@ -336,36 +304,37 @@ csync_instructions_e OwncloudPropagator::uploadFile(const SyncFileItem &item) /* If the source file changed during submission, lets try again */ if( state == HBF_SOURCE_FILE_CHANGE ) { if( attempts++ < 30 ) { /* FIXME: How often do we want to try? */ - finished = false; /* make it try again from scratch. */ qDebug("SOURCE file has changed during upload, retry #%d in two seconds!", attempts); sleep(2); + continue; } } - - if( finished ) { - _errorString = hbf_error_string(trans.data(), state); - // FIXME: find out the error class. - //_httpStatusCode = hbf_fail_http_code(trans.data()); - _status = SyncFileItem::NormalError; - return CSYNC_INSTRUCTION_ERROR; - } + // FIXME: find out the error class. + //_httpStatusCode = hbf_fail_http_code(trans.data()); + done(SyncFileItem::NormalError, hbf_error_string(trans.data(), state)); + return; } - } while( !finished ); - ne_set_notifier(_session, 0, 0); - if( _etag.isEmpty() ) { - updateMTimeAndETag(uri.data(), item._modtime); - } - // the file id should only be empty for new files up- or downloaded - if( _fileId.isEmpty() ) { - getFileId( uri.data() ); - } - _status = SyncFileItem::Success; + ne_set_notifier(_propagator->_session, 0, 0); - // Remove entries from the database - _journal->setUploadInfo(item._file, SyncJournalDb::UploadInfo()); + if( trans->modtime_accepted ) { + _item._etag = QByteArray(hbf_transfer_etag( trans.data() )); + } else { + updateMTimeAndETag(uri.data(), _item._modtime); + } + if( _item._fileId.isEmpty() ) { + getFileId( uri.data() ); + } - return CSYNC_INSTRUCTION_UPDATED; + _item._instruction = CSYNC_INSTRUCTION_UPDATED; + _propagator->_journal->setFileRecord(SyncJournalFileRecord(_item, _propagator->_localDir + _item._file)); + // Remove from the progress database: + _propagator->_journal->setUploadInfo(_item._file, SyncJournalDb::UploadInfo()); + emit progress(Progress::EndUpload, _item._file, 0, _item._size); + done(SyncFileItem::Success); + return; + + } while( true ); } static QByteArray parseEtag(ne_request *req) { @@ -377,7 +346,7 @@ static QByteArray parseEtag(ne_request *req) { } } -void OwncloudPropagator::updateMTimeAndETag(const char* uri, time_t mtime) +void PropagateItemJob::updateMTimeAndETag(const char* uri, time_t mtime) { QByteArray modtime = QByteArray::number(qlonglong(mtime)); ne_propname pname; @@ -389,68 +358,92 @@ void OwncloudPropagator::updateMTimeAndETag(const char* uri, time_t mtime) ops[0].value = modtime.constData(); ops[1].name = NULL; - int rc = ne_proppatch( _session, uri, ops ); + int rc = ne_proppatch( _propagator->_session, uri, ops ); + /* FIXME: error handling bool error = updateErrorFromSession( rc ); if( error ) { // FIXME: We could not set the mtime. Error or not? qDebug() << "PROP-Patching of modified date failed."; - } + }*/ // get the etag - QScopedPointer req(ne_request_create(_session, "HEAD", uri)); + QScopedPointer req(ne_request_create(_propagator->_session, "HEAD", uri)); int neon_stat = ne_request_dispatch(req.data()); - - if( updateErrorFromSession(neon_stat, req.data()) ) { + const ne_status *status = ne_get_status(req.data()); + if( neon_stat != NE_OK || status->klass != 2 ) { // error happend - qDebug() << "Could not issue HEAD request for ETag."; + qDebug() << "Could not issue HEAD request for ETag." << ne_get_error(_propagator->_session); } else { - _etag = parseEtag(req.data()); + _item._etag = parseEtag(req.data()); } } -void OwncloudPropagator::getFileId( const char *uri ) { - +void PropagateItemJob::getFileId(const char* uri) +{ if( ! uri ) return; - QScopedPointer req(ne_request_create(_session, "HEAD", uri)); + QScopedPointer req(ne_request_create(_propagator->_session, "HEAD", uri)); int neon_stat = ne_request_dispatch(req.data()); - if( updateErrorFromSession(neon_stat, req.data()) ) { + if( neon_stat != NE_OK ) { // error happend qDebug() << "Could not issue HEAD request for FileID."; } else { const char *header = ne_get_response_header(req.data(), "X-OC-FileId"); if( header ) { - _fileId = QString::fromUtf8(header); + _item._fileId = QString::fromUtf8(header); } } } -class DownloadContext { +void PropagateItemJob::limitBandwidth(qint64 progress, qint64 bandwidth_limit) +{ + if (bandwidth_limit > 0) { + int64_t diff = _lastTime.nsecsElapsed() / 1000; + int64_t len = progress - _lastProgress; + if (len > 0 && diff > 0 && (1000000 * len / diff) > bandwidth_limit) { + int64_t wait_time = (1000000 * len / bandwidth_limit) - diff; + if (wait_time > 0) { + usleep(wait_time); + } + } + _lastProgress = progress; + _lastTime.start(); + } else if (bandwidth_limit < 0 && bandwidth_limit > -100) { + int64_t diff = _lastTime.nsecsElapsed() / 1000; + if (diff > 0) { + // -bandwidth_limit is the % of bandwidth + int64_t wait_time = -diff * (1 + 100.0 / bandwidth_limit); + if (wait_time > 0) { + usleep(wait_time); + } + } + _lastTime.start(); + } +} +class PropagateDownloadFile: public PropagateItemJob { public: + explicit PropagateDownloadFile(OwncloudPropagator* propagator,const SyncFileItem& item) + : PropagateItemJob(propagator, item), _file(0) {} + void start(); + +private: QIODevice *_file; QScopedPointer _decompress; - ne_session *_session; - QAtomicInt *_abortRequested; - - explicit DownloadContext(QIODevice *file, - ne_session *session, - QAtomicInt *abortRequested) - : _file(file), _session(session), _abortRequested(abortRequested) {} static int content_reader(void *userdata, const char *buf, size_t len) { - DownloadContext *writeCtx = static_cast(userdata); + PropagateDownloadFile *that = static_cast(userdata); size_t written = 0; - if (writeCtx->_abortRequested->fetchAndAddRelaxed(0)) { - ne_set_error(writeCtx->_session, "Aborted by user"); + if (that->_propagator->_abortRequested->fetchAndAddRelaxed(0)) { + ne_set_error(that->_propagator->_session, "Aborted by user"); return NE_ERROR; } if(buf) { - written = writeCtx->_file->write(buf, len); + written = that->_file->write(buf, len); if( len != written ) { qDebug() << "WRN: content_reader wrote wrong num of bytes:" << len << "," << written; } @@ -459,7 +452,6 @@ public: return NE_ERROR; } - /* * This hook is called after the response is here from the server, but before * the response body is parsed. It decides if the response is compressed and @@ -468,11 +460,11 @@ public: */ static void install_content_reader( ne_request *req, void *userdata, const ne_status *status ) { - DownloadContext *writeCtx = static_cast(userdata); + PropagateDownloadFile *that = static_cast(userdata); Q_UNUSED(status); - if( !writeCtx ) { + if( !that ) { qDebug("Error: install_content_reader called without valid write context!"); return; } @@ -482,26 +474,38 @@ public: status ? status->code : -1 ); if( enc == QLatin1String("gzip") ) { - writeCtx->_decompress.reset(ne_decompress_reader( req, ne_accept_2xx, + that->_decompress.reset(ne_decompress_reader( req, ne_accept_2xx, content_reader, /* reader callback */ - writeCtx )); /* userdata */ + that )); /* userdata */ } else { ne_add_response_body_reader( req, ne_accept_2xx, content_reader, - (void*) writeCtx ); + (void*) that ); + } + } + + static void notify_status_cb(void* userdata, ne_session_status status, + const ne_session_status_info* info) + { + PropagateDownloadFile* that = reinterpret_cast(userdata); + if (status == ne_status_recving && info->sr.total > 0) { + emit that->progress(Progress::Context, that->_item._file, info->sr.progress, info->sr.total ); + that->limitBandwidth(info->sr.progress, that->_propagator->_downloadLimit); } } }; -csync_instructions_e OwncloudPropagator::downloadFile(const SyncFileItem &item, bool isConflict) +void PropagateDownloadFile::start() { + emit progress(Progress::StartDownload, _item._file, 0, _item._size); + QString tmpFileName; - const SyncJournalDb::DownloadInfo progressInfo = _journal->getDownloadInfo(item._file); + const SyncJournalDb::DownloadInfo progressInfo = _propagator->_journal->getDownloadInfo(_item._file); if (progressInfo._valid) { // if the etag has changed meanwhile, remove the already downloaded part. - if (progressInfo._etag != item._etag) { - QFile::remove(_localDir + progressInfo._tmpfile); - _journal->setDownloadInfo(item._file, SyncJournalDb::DownloadInfo()); + if (progressInfo._etag != _item._etag) { + QFile::remove(_propagator->_localDir + progressInfo._tmpfile); + _propagator->_journal->setDownloadInfo(_item._file, SyncJournalDb::DownloadInfo()); } else { tmpFileName = progressInfo._tmpfile; } @@ -509,7 +513,7 @@ csync_instructions_e OwncloudPropagator::downloadFile(const SyncFileItem &item, } if (tmpFileName.isEmpty()) { - tmpFileName = item._file; + tmpFileName = _item._file; //add a dot at the begining of the filename to hide the file. int slashPos = tmpFileName.lastIndexOf('/'); tmpFileName.insert(slashPos+1, '.'); @@ -517,31 +521,31 @@ csync_instructions_e OwncloudPropagator::downloadFile(const SyncFileItem &item, tmpFileName += ".~" + QString::number(uint(qrand()), 16); } - QFile tmpFile(_localDir + tmpFileName); + QFile tmpFile(_propagator->_localDir + tmpFileName); + _file = &tmpFile; if (!tmpFile.open(QIODevice::Append)) { - _errorString = tmpFile.errorString(); - _status = SyncFileItem::NormalError; - return CSYNC_INSTRUCTION_ERROR; + done(SyncFileItem::NormalError, tmpFile.errorString()); + return; } csync_win32_set_file_hidden(tmpFileName.toUtf8().constData(), true); { SyncJournalDb::DownloadInfo pi; - pi._etag = item._etag; + pi._etag = _item._etag; pi._tmpfile = tmpFileName; pi._valid = true; - _journal->setDownloadInfo(item._file, pi); + _propagator->_journal->setDownloadInfo(_item._file, pi); } /* actually do the request */ int retry = 0; - QScopedPointer uri(ne_path_escape((_remoteDir + item._file).toUtf8())); - DownloadContext writeCtx(&tmpFile, _session , _abortRequested); + QScopedPointer uri( + ne_path_escape((_propagator->_remoteDir + _item._file).toUtf8())); do { - QScopedPointer req(ne_request_create(_session, "GET", uri.data())); + QScopedPointer req(ne_request_create(_propagator->_session, "GET", uri.data())); /* Allow compressed content by setting the header */ ne_add_request_header( req.data(), "Accept-Encoding", "gzip" ); @@ -557,53 +561,51 @@ csync_instructions_e OwncloudPropagator::downloadFile(const SyncFileItem &item, /* hook called before the content is parsed to set the correct reader, * either the compressed- or uncompressed reader. */ - ne_hook_post_headers( _session, DownloadContext::install_content_reader, &writeCtx); - ne_set_notifier(_session, notify_status_cb, this); + ne_hook_post_headers( _propagator->_session, install_content_reader, this); + ne_set_notifier(_propagator->_session, notify_status_cb, this); _lastProgress = 0; _lastTime.start(); - _chunked_done = _chunked_total_size = 0; - _currentFile = item._file; int neon_stat = ne_request_dispatch(req.data()); + /* delete the hook again, otherwise they get chained as they are with the session */ + ne_unhook_post_headers( _propagator->_session, install_content_reader, this ); + ne_set_notifier(_propagator->_session, 0, 0); + if (neon_stat == NE_TIMEOUT && (++retry) < 3) { continue; } - /* delete the hook again, otherwise they get chained as they are with the session */ - ne_unhook_post_headers( _session, DownloadContext::install_content_reader, &writeCtx ); - ne_set_notifier(_session, 0, 0); - _chunked_done = _chunked_total_size = 0; - if( updateErrorFromSession(neon_stat, req.data() ) ) { qDebug("Error GET: Neon: %d", neon_stat); if (tmpFile.size() == 0) { // don't keep the temporary file if it is empty. tmpFile.close(); tmpFile.remove(); - _journal->setDownloadInfo(item._file, SyncJournalDb::DownloadInfo()); + _propagator->_journal->setDownloadInfo(_item._file, SyncJournalDb::DownloadInfo()); } - return CSYNC_INSTRUCTION_ERROR; + return; } - - _etag = parseEtag(req.data()); + _item._etag = parseEtag(req.data()); break; } while (1); - tmpFile.close(); tmpFile.flush(); + QString fn = _propagator->_localDir + _item._file; //In case of conflict, make a backup of the old file + bool isConflict = _item._instruction == CSYNC_INSTRUCTION_CONFLICT; if (isConflict) { - QString fn = _localDir + item._file; - // compare the files to see if there was an actual conflict. if (fileEquals(fn, tmpFile.fileName())) { tmpFile.remove(); - _journal->setDownloadInfo(item._file, SyncJournalDb::DownloadInfo()); - _status = SyncFileItem::Success; - return CSYNC_INSTRUCTION_UPDATED; + _item._instruction = CSYNC_INSTRUCTION_UPDATED; + _propagator->_journal->setDownloadInfo(_item._file, SyncJournalDb::DownloadInfo()); + _propagator->_journal->setFileRecord(SyncJournalFileRecord(_item, fn)); + emit progress(Progress::EndDownload, _item._file, 0, _item._size); + done(SyncFileItem::Success); + return; } QFile f(fn); @@ -613,12 +615,11 @@ csync_instructions_e OwncloudPropagator::downloadFile(const SyncFileItem &item, if (dotLocation <= fn.lastIndexOf('/') + 1) { dotLocation = fn.size(); } - fn.insert(dotLocation, "_conflict-" + QDateTime::fromTime_t(item._modtime).toString("yyyyMMdd-hhmmss")); + fn.insert(dotLocation, "_conflict-" + QDateTime::fromTime_t(_item._modtime).toString("yyyyMMdd-hhmmss")); if (!f.rename(fn)) { //If the rename fails, don't replace it. - _errorString = f.errorString(); - _status = SyncFileItem::NormalError; - return CSYNC_INSTRUCTION_ERROR; + done(SyncFileItem::NormalError, f.errorString()); + return; } } @@ -627,20 +628,18 @@ csync_instructions_e OwncloudPropagator::downloadFile(const SyncFileItem &item, #ifndef QT_OS_WIN bool success; #if QT_VERSION < QT_VERSION_CHECK(5, 0, 0) - success = tmpFile.fileEngine()->rename(_localDir + item._file); + success = tmpFile.fileEngine()->rename(fn); #else // We want a rename that also overwite. QFile::rename does not overwite. // Qt 5.1 has QSaveFile::renameOverwrite we cold use. // ### FIXME - QString newName(_localDir + item._file); - QFile::remove(newName); - success = tmpFile.rename(newName); + QFile::remove(fn); + success = tmpFile.rename(fn); #endif // unixoids if (!success) { - _errorString = tmpFile.errorString(); - _status = SyncFileItem::NormalError; - return CSYNC_INSTRUCTION_ERROR; + done(SyncFileItem::NormalError, tmpFile.errorString()); + return; } #else //QT_OS_WIN if (::MoveFileEx((wchar_t*)tmpFile.fileName().utf16(), @@ -650,175 +649,216 @@ csync_instructions_e OwncloudPropagator::downloadFile(const SyncFileItem &item, FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER|FORMAT_MESSAGE_FROM_SYSTEM, NULL, ::GetLastError(), MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), (LPWSTR)&string, 0, NULL); - _errorString = QString::fromWCharArray(string); + + done(SyncFileItem::NormalError, QString::fromWCharArray(string)); LocalFree((HLOCAL)string); - _status = SyncFileItem::NormalError; - return CSYNC_INSTRUCTION_ERROR; + return; } #endif - - _journal->setDownloadInfo(item._file, SyncJournalDb::DownloadInfo()); - struct timeval times[2]; - times[0].tv_sec = times[1].tv_sec = item._modtime; + times[0].tv_sec = times[1].tv_sec = _item._modtime; times[0].tv_usec = times[1].tv_usec = 0; - c_utimes((_localDir + item._file).toUtf8().data(), times); + c_utimes(fn.toUtf8().data(), times); - _status = isConflict ? SyncFileItem::Conflict : SyncFileItem::Success; - return CSYNC_INSTRUCTION_UPDATED; + _item._instruction = CSYNC_INSTRUCTION_UPDATED; + _propagator->_journal->setFileRecord(SyncJournalFileRecord(_item, fn)); + _propagator->_journal->setDownloadInfo(_item._file, SyncJournalDb::DownloadInfo()); + done(isConflict ? SyncFileItem::Conflict : SyncFileItem::Success); } -csync_instructions_e OwncloudPropagator::remoteRename(const SyncFileItem &item) +DECLARE_JOB(PropagateRemoteRename) + +void PropagateRemoteRename::start() { - if (item._file == item._renameTarget) { - if (!item._isDirectory) { + if (_item._file == _item._renameTarget) { + if (!_item._isDirectory) { // The parents has been renamed already so there is nothing more to do. // But we still need to fetch the new ETAG // FIXME maybe do a recusrsive propfind after having moced the parent. // Note: we also update the mtime because the server do not keep the mtime when moving files - QScopedPointer uri2(ne_path_escape((_remoteDir + item._renameTarget).toUtf8())); - updateMTimeAndETag(uri2.data(), item._modtime); + QScopedPointer uri2( + ne_path_escape((_propagator->_remoteDir + _item._renameTarget).toUtf8())); + updateMTimeAndETag(uri2.data(), _item._modtime); } - _status = SyncFileItem::Success; - return CSYNC_INSTRUCTION_DELETED; - } - - // Check if it is the toplevel Shared folder and do not propagate it. - if (item._file == QLatin1String("Shared") ) { - if( QFile::rename( _localDir + item._renameTarget, _localDir + QLatin1String("Shared")) ) { - _errorString = tr("This folder must not be renamed. It is renamed back to its original name."); + } else if (_item._file == QLatin1String("Shared") ) { + // Check if it is the toplevel Shared folder and do not propagate it. + if( QFile::rename( _propagator->_localDir + _item._renameTarget, _propagator->_localDir + QLatin1String("Shared")) ) { + done(SyncFileItem::NormalError, tr("This folder must not be renamed. It is renamed back to its original name.")); } else { - _errorString = tr("This folder must not be renamed. Please name it back to Shared."); + done(SyncFileItem::NormalError, tr("This folder must not be renamed. Please name it back to Shared.")); } - return CSYNC_INSTRUCTION_ERROR; + return; + } else { + + QScopedPointer uri1(ne_path_escape((_propagator->_remoteDir + _item._file).toUtf8())); + QScopedPointer uri2(ne_path_escape((_propagator->_remoteDir + _item._renameTarget).toUtf8())); + + int rc = ne_move(_propagator->_session, 1, uri1.data(), uri2.data()); + if (updateErrorFromSession(rc)) { + return; + } + + updateMTimeAndETag(uri2.data(), _item._modtime); } - QScopedPointer uri1(ne_path_escape((_remoteDir + item._file).toUtf8())); - QScopedPointer uri2(ne_path_escape((_remoteDir + item._renameTarget).toUtf8())); - - int rc = ne_move(_session, 1, uri1.data(), uri2.data()); - if (updateErrorFromSession(rc)) { - return CSYNC_INSTRUCTION_ERROR; - } - - updateMTimeAndETag(uri2.data(), item._modtime); - - _status = SyncFileItem::Success; - return CSYNC_INSTRUCTION_DELETED; + _item._instruction = CSYNC_INSTRUCTION_DELETED; + _propagator->_journal->deleteFileRecord(_item._originalFile); + SyncJournalFileRecord record(_item, _propagator->_localDir + _item._renameTarget); + record._path = _item._renameTarget; + _propagator->_journal->setFileRecord(record); + emit progress(Progress::EndDownload, _item._file, 0, _item._size); + done(SyncFileItem::Success); } - -// returns true in case there was an error -bool OwncloudPropagator::updateErrorFromSession(int neon_code, ne_request *req, int *httpStatusCode) +bool PropagateItemJob::updateErrorFromSession(int neon_code, ne_request* req, int ignoreHttpCode) { if( neon_code != NE_OK ) { qDebug("Neon error code was %d", neon_code); } + QString errorString; + int httpStatusCode = 0; + switch(neon_code) { case NE_OK: /* Success, but still the possiblity of problems */ if( req ) { const ne_status *status = ne_get_status(req); if (status) { - if ( status->klass == 2) { + if ( status->klass == 2 || status->code == ignoreHttpCode) { // Everything is ok, no error. return false; } - if (httpStatusCode) { - *httpStatusCode = status->code; - } - _errorString = QString::fromUtf8( status->reason_phrase ); + errorString = QString::fromUtf8( status->reason_phrase ); + httpStatusCode = status->code; } } else { - QString errorString = QString::fromUtf8(ne_get_error( _session )); - int code = errorString.mid(0, errorString.indexOf(QChar(' '))).toInt(); - if (code >= 200 && code < 300) { + errorString = QString::fromUtf8(ne_get_error(_propagator->_session)); + int httpStatusCode = errorString.mid(0, errorString.indexOf(QChar(' '))).toInt(); + if ((httpStatusCode >= 200 && httpStatusCode < 300) + || (httpStatusCode != 0 && httpStatusCode == ignoreHttpCode)) { // No error return false; } - if (httpStatusCode) { - *httpStatusCode = code; - } - _errorString = errorString; } // FIXME: classify the error - _status = SyncFileItem::NormalError; + done (SyncFileItem::NormalError, errorString); return true; case NE_ERROR: /* Generic error; use ne_get_error(session) for message */ - _errorString = QString::fromUtf8( ne_get_error(_session) ); - _status = SyncFileItem::NormalError; + done(SyncFileItem::NormalError, QString::fromUtf8(ne_get_error(_propagator->_session))); return true; case NE_LOOKUP: /* Server or proxy hostname lookup failed */ - _errorString = QString::fromUtf8( ne_get_error(_session) ); - break; case NE_AUTH: /* User authentication failed on server */ - _errorString = QString::fromUtf8( ne_get_error(_session) ); - break; case NE_PROXYAUTH: /* User authentication failed on proxy */ - _errorString = QString::fromUtf8( ne_get_error(_session) ); - break; case NE_CONNECT: /* Could not connect to server */ - _errorString = QString::fromUtf8( ne_get_error(_session) ); - break; case NE_TIMEOUT: /* Connection timed out */ - _errorString = QString::fromUtf8( ne_get_error(_session) ); - break; + done(SyncFileItem::FatalError, QString::fromUtf8(ne_get_error(_propagator->_session))); + return true; case NE_FAILED: /* The precondition failed */ case NE_RETRY: /* Retry request (ne_end_request ONLY) */ case NE_REDIRECT: /* See ne_redirect.h */ default: - _errorString = QString::fromUtf8( ne_get_error(_session) ); - _status = SyncFileItem::SoftError; + done(SyncFileItem::SoftError, QString::fromUtf8(ne_get_error(_propagator->_session))); return true; } - _status = SyncFileItem::FatalError; - return true; + return false; } -void OwncloudPropagator::notify_status_cb(void* userdata, ne_session_status status, - const ne_session_status_info* info) +PropagateItemJob* OwncloudPropagator::createJob(const SyncFileItem& item) { + switch(item._instruction) { + case CSYNC_INSTRUCTION_REMOVE: + if (item._dir == SyncFileItem::Down) return new PropagateLocalRemove(this, item); + else return new PropagateRemoteRemove(this, item); + case CSYNC_INSTRUCTION_NEW: + if (item._isDirectory) { + if (item._dir == SyncFileItem::Down) return new PropagateLocalMkdir(this, item); + else return new PropagateRemoteMkdir(this, item); + } //fall trough + case CSYNC_INSTRUCTION_SYNC: + case CSYNC_INSTRUCTION_CONFLICT: + if (item._isDirectory) { + // Should we set the mtime? + return 0; + } + if (item._dir == SyncFileItem::Down) return new PropagateDownloadFile(this, item); + else return new PropagateUploadFile(this, item); + case CSYNC_INSTRUCTION_RENAME: + Q_ASSERT(item._dir == SyncFileItem::Up); // only supported for remote + return new PropagateRemoteRename(this, item); + case CSYNC_INSTRUCTION_IGNORE: + return new PropagateIgnoreJob(this, item); + default: + return 0; + } + return 0; +} + +void OwncloudPropagator::start(const SyncFileItemVector& _syncedItems) { - OwncloudPropagator* this_ = reinterpret_cast(userdata); - - if ((status == ne_status_sending || status == ne_status_recving)) { - if (info->sr.total > 0) { - emit this_->progress(Progress::Context, this_->_currentFile, - this_->_chunked_done + info->sr.progress, - this_->_chunked_total_size ? this_->_chunked_total_size : info->sr.total ); + SyncFileItemVector items = _syncedItems; + std::sort(items.begin(), items.end()); + _rootJob.reset(new PropagateDirectory(this)); + QStack > directories; + directories.push(qMakePair(QString(), _rootJob.data())); + QVector directoriesToRemove; + QString removedDirectory; + foreach(const SyncFileItem &item, items) { + if (item._instruction == CSYNC_INSTRUCTION_REMOVE + && !removedDirectory.isEmpty() && item._file.startsWith(removedDirectory)) { + //already taken care of. (by the removal of the parent directory) + continue; } - if (this_->_chunked_total_size && info->sr.total > 0 && info->sr.total == info->sr.progress) { - this_->_chunked_done += info->sr.total; + + while (!item._file.startsWith(directories.top().first)) { + directories.pop(); + } + + if (item._isDirectory) { + PropagateDirectory *dir = new PropagateDirectory(this, item); + dir->_firstJob.reset(createJob(item)); + if (item._instruction == CSYNC_INSTRUCTION_REMOVE) { + //We do the removal of directories at the end + directoriesToRemove.append(dir); + removedDirectory = item._file + "/"; + } else { + directories.top().second->append(dir); + } + directories.push(qMakePair(item._file + "/" , dir)); + } else if (PropagateItemJob* current = createJob(item)) { + directories.top().second->append(current); } } - /* throttle connection */ - int bandwidth_limit = 0; - if (status == ne_status_sending) bandwidth_limit = this_->_uploadLimit; - if (status == ne_status_recving) bandwidth_limit = this_->_downloadLimit; - if (bandwidth_limit > 0) { - int64_t diff = this_->_lastTime.nsecsElapsed() / 1000; - int64_t len = info->sr.progress - this_->_lastProgress; - if (len > 0 && diff > 0 && (1000000 * len / diff) > (int64_t)bandwidth_limit) { - int64_t wait_time = (1000000 * len / bandwidth_limit) - diff; - if (wait_time > 0) { - usleep(wait_time); - } + foreach(PropagatorJob* it, directoriesToRemove) { + _rootJob->append(it); + } + + _rootJob->start(); + connect(_rootJob.data(), SIGNAL(completed(SyncFileItem)), this, SIGNAL(completed(SyncFileItem))); + connect(_rootJob.data(), SIGNAL(progress(Progress::Kind,QString,quint64,quint64)), this, SIGNAL(progress(Progress::Kind,QString,quint64,quint64))); + connect(_rootJob.data(), SIGNAL(finished(SyncFileItem::Status)), this, SIGNAL(finished())); +} + +void PropagateDirectory::proceedNext(SyncFileItem::Status status) +{ + if (status == SyncFileItem::FatalError) { + emit finished(status); + return; + } else if (status == SyncFileItem::NormalError) { + _hasError = true; + } + + _current ++; + if (_current < _subJobs.size()) { + PropagatorJob *next = _subJobs.at(_current); + startJob(next); + } else { + if (!_item.isEmpty() && !_hasError) { + SyncJournalFileRecord record(_item, _propagator->_localDir + _item._file); + _propagator->_journal->setFileRecord(record); } - this_->_lastProgress = info->sr.progress; - this_->_lastTime.start(); - } else if (bandwidth_limit < 0 && bandwidth_limit > -100) { - int64_t diff = this_->_lastTime.nsecsElapsed() / 1000; - if (diff > 0) { - // -bandwidth_limit is the % of bandwidth - int64_t wait_time = -diff * (1 + 100.0 / bandwidth_limit); - if (wait_time > 0) { - usleep(wait_time); - } - } - this_->_lastTime.start(); + emit finished(_hasError ? SyncFileItem::NormalError : SyncFileItem::Success); } } - } diff --git a/src/mirall/owncloudpropagator.h b/src/mirall/owncloudpropagator.h index e8bd1a6ab..59b8d665c 100644 --- a/src/mirall/owncloudpropagator.h +++ b/src/mirall/owncloudpropagator.h @@ -30,51 +30,139 @@ struct ne_decompress_s; namespace Mirall { class SyncJournalDb; +class OwncloudPropagator; + +class PropagatorJob : public QObject { + Q_OBJECT +protected: + OwncloudPropagator *_propagator; +public: + explicit PropagatorJob(OwncloudPropagator* propagator) : _propagator(propagator) {} +public slots: + virtual void start() = 0; +signals: + void finished(SyncFileItem::Status); + void completed(const SyncFileItem &); + void progress(Progress::Kind, const QString &filename, quint64 bytes, quint64 total); +}; + +/* + * Propagate a directory, and all its sub entries. + */ +class PropagateDirectory : public PropagatorJob { + Q_OBJECT +public: + // e.g: create the directory + QScopedPointer_firstJob; + + // all the sub files or sub directories. + //TODO: in the future, all sub job can be run in parallel + QVector _subJobs; + + SyncFileItem _item; + + int _current; // index of the current running job + bool _hasError; // weather there was an error + + + explicit PropagateDirectory(OwncloudPropagator *propagator, const SyncFileItem &item = SyncFileItem()) + : PropagatorJob(propagator) + , _firstJob(0), _item(item), _current(-1), _hasError(false) { } + + virtual ~PropagateDirectory() { + qDeleteAll(_subJobs); + } + + void append(PropagatorJob *subJob) { + _subJobs.append(subJob); + } + + virtual void start() { + _current = -1; + _hasError = false; + if (!_firstJob) { + proceedNext(SyncFileItem::Success); + } else { + startJob(_firstJob.data()); + } + } + +private slots: + void startJob(PropagatorJob *next) { + connect(next, SIGNAL(finished(SyncFileItem::Status)), this, SLOT(proceedNext(SyncFileItem::Status)), Qt::QueuedConnection); + connect(next, SIGNAL(completed(SyncFileItem)), this, SIGNAL(completed(SyncFileItem))); + connect(next, SIGNAL(progress(Progress::Kind,QString,quint64,quint64)), this, SIGNAL(progress(Progress::Kind,QString,quint64,quint64))); + next->start(); + } + + void proceedNext(SyncFileItem::Status status); +}; + + +/* + * Abstract class to propagate a single item + */ +class PropagateItemJob : public PropagatorJob { + Q_OBJECT +protected: + SyncFileItem _item; + void done(SyncFileItem::Status status, const QString &errorString = QString()) { + if (status == SyncFileItem::FatalError || status == SyncFileItem::NormalError) { + _item._instruction = CSYNC_INSTRUCTION_ERROR; + } + _item._errorString = errorString; + _item._status = status; + emit completed(_item); + emit finished(status); + } + + void updateMTimeAndETag(const char *uri, time_t); + void getFileId( const char *uri ); + + /* fetch the error code and string from the session + in case of error, calls done with the error and returns true. + + If the HTTP error code is ignoreHTTPError, the error is ignored + */ + bool updateErrorFromSession(int neon_code = 0, ne_request *req = 0, int ignoreHTTPError = 0); + + /* + * to be called by the progress callback and will wait the amount of time needed. + */ + void limitBandwidth(qint64 progress, qint64 limit); + QElapsedTimer _lastTime; + qint64 _lastProgress; + + +public: + PropagateItemJob(OwncloudPropagator* propagator, const SyncFileItem &item) + : PropagatorJob(propagator), _item(item), _lastProgress(0) {} +}; + +// Dummy job that just mark it as completed and ignored. +class PropagateIgnoreJob : public PropagateItemJob { + Q_OBJECT +public: + PropagateIgnoreJob(OwncloudPropagator* propagator,const SyncFileItem& item) + : PropagateItemJob(propagator, item) {} + void start() { + done(SyncFileItem::FileIgnored); + } +}; + class OwncloudPropagator : public QObject { Q_OBJECT + PropagateItemJob *createJob(const SyncFileItem& item); + QScopedPointer _rootJob; + +public: ne_session_s *_session; QString _localDir; // absolute path to the local directory. ends with '/' QString _remoteDir; // path to the root of the remote. ends with '/' SyncJournalDb *_journal; - QString _errorString; - SyncFileItem::Status _status; - - bool check_neon_session(); - - - csync_instructions_e localRemove(const SyncFileItem &); - csync_instructions_e localMkdir(const SyncFileItem &); - csync_instructions_e remoteRemove(const SyncFileItem &); - csync_instructions_e remoteMkdir(const SyncFileItem &); - csync_instructions_e downloadFile(const SyncFileItem &, bool isConflict = false); - csync_instructions_e uploadFile(const SyncFileItem &); - csync_instructions_e remoteRename(const SyncFileItem &); - - void updateMTimeAndETag(const char *uri, time_t); - - void getFileId( const char *uri ); - - /* fetch the error code and string from the session - * updates _status, _httpStatusCode and _errorString. and httpStatusCode - * Returns true if there was an error. - */ - bool updateErrorFromSession(int neon_code = 0, ne_request *req = 0, int *httpStatusCode = 0); - - QElapsedTimer _lastTime; - quint64 _lastProgress; - quint64 _chunked_total_size; - quint64 _chunked_done; - QString _currentFile; - - - static void notify_status_cb (void *userdata, ne_session_status status, - const ne_session_status_info *info); - - static void chunk_finished_cb(hbf_transfer_s *,int, void *userdata); - public: OwncloudPropagator(ne_session_s *session, const QString &localDir, const QString &remoteDir, SyncJournalDb *progressDb, QAtomicInt *abortRequested) @@ -87,9 +175,8 @@ public: if (!localDir.endsWith(QChar('/'))) _localDir+='/'; if (!remoteDir.endsWith(QChar('/'))) _remoteDir+='/'; } - void propagate(const SyncFileItem &); - QByteArray _etag; - QString _fileId; + + void start(const SyncFileItemVector &_syncedItems); int _downloadLimit; int _uploadLimit; @@ -99,7 +186,7 @@ public: signals: void completed(const SyncFileItem &); void progress(Progress::Kind, const QString &filename, quint64 bytes, quint64 total); - + void finished(); }; } diff --git a/src/mirall/syncfileitem.h b/src/mirall/syncfileitem.h index 018dcb5a1..4404925c5 100644 --- a/src/mirall/syncfileitem.h +++ b/src/mirall/syncfileitem.h @@ -56,12 +56,6 @@ public: } friend bool operator<(const SyncFileItem& item1, const SyncFileItem& item2) { - // Delete at the end: - if (item1._instruction == CSYNC_INSTRUCTION_REMOVE && item2._instruction != CSYNC_INSTRUCTION_REMOVE) - return false; - if (item1._instruction != CSYNC_INSTRUCTION_REMOVE && item2._instruction == CSYNC_INSTRUCTION_REMOVE) - return true; - // Sort by destination return item1.destination() < item2.destination(); } diff --git a/src/mirall/syncjournaldb.cpp b/src/mirall/syncjournaldb.cpp index e80bc95ac..62b033bc6 100644 --- a/src/mirall/syncjournaldb.cpp +++ b/src/mirall/syncjournaldb.cpp @@ -226,22 +226,33 @@ bool SyncJournalDb::setFileRecord( const SyncJournalFileRecord& record ) } } -bool SyncJournalDb::deleteFileRecord(const QString& filename) +bool SyncJournalDb::deleteFileRecord(const QString& filename, bool recursively) { QMutexLocker locker(&_mutex); - qlonglong phash = getPHash(filename); if( checkConnect() ) { + if (recursively) { + qlonglong phash = getPHash(filename); + QSqlQuery query( "DELETE FROM metadata WHERE phash=?", _db ); + query.bindValue( 0, QString::number(phash) ); - QSqlQuery query( "DELETE FROM metadata WHERE phash=?" ); - query.bindValue( 0, QString::number(phash) ); + if( !query.exec() ) { + qWarning() << "Exec error of SQL statement: " << query.lastQuery() << " : " << query.lastError().text(); + return false; + } + qDebug() << query.executedQuery() << phash << filename; + return true; + } else { + QSqlQuery query( "DELETE FROM metadata WHERE path LIKE(?||'/%')", _db ); + query.bindValue( 0, filename ); - if( !query.exec() ) { - qWarning() << "Exec error of SQL statement: " << query.lastQuery() << " : " << query.lastError().text(); - return false; + if( !query.exec() ) { + qWarning() << "Exec error of SQL statement: " << query.lastQuery() << " : " << query.lastError().text(); + return false; + } + qDebug() << query.executedQuery() << filename; + return true; } - qDebug() << query.executedQuery() << phash << filename; - return true; } else { qDebug() << "Failed to connect database."; return false; // checkConnect failed. diff --git a/src/mirall/syncjournaldb.h b/src/mirall/syncjournaldb.h index 943f7b2e6..93257964b 100644 --- a/src/mirall/syncjournaldb.h +++ b/src/mirall/syncjournaldb.h @@ -29,7 +29,7 @@ public: explicit SyncJournalDb(const QString& path, QObject *parent = 0); SyncJournalFileRecord getFileRecord( const QString& filename ); bool setFileRecord( const SyncJournalFileRecord& record ); - bool deleteFileRecord( const QString& filename ); + bool deleteFileRecord( const QString& filename, bool recursively = false ); int getFileRecordCount(); bool exists(); QStringList tableColumns( const QString& table ); diff --git a/src/owncloudcmd/owncloudcmd.cpp b/src/owncloudcmd/owncloudcmd.cpp index 8b8f5a7a9..1113c87a6 100644 --- a/src/owncloudcmd/owncloudcmd.cpp +++ b/src/owncloudcmd/owncloudcmd.cpp @@ -120,5 +120,6 @@ int main(int argc, char **argv) { app.exec(); csync_destroy(_csync_ctx); + return 0; }