Long Running Put: store into the database so they can be resumed at startup

This commit is contained in:
Olivier Goffart 2014-07-28 12:12:52 +02:00
parent 04cc513bbd
commit 7480d34742
7 changed files with 203 additions and 49 deletions

View file

@ -21,6 +21,7 @@
#include "propagator_legacy.h"
#include "mirall/mirallconfigfile.h"
#include "mirall/utility.h"
#include <json.h>
#ifdef Q_OS_WIN
#include <windef.h>
@ -440,4 +441,35 @@ void PropagateDirectory::slotSubJobReady()
}
}
void CleanupPollsJob::start()
{
if (_pollInfos.empty()) {
emit finished();
deleteLater();
return;
}
auto info = _pollInfos.takeFirst();
SyncFileItem item;
item._file = info._file;
item._modtime = info._modtime;
PollJob *job = new PollJob(_account, info._url, item, _journal, _localPath, this);
connect(job, SIGNAL(finishedSignal()), SLOT(slotPollFinished()));
job->start();
}
void CleanupPollsJob::slotPollFinished()
{
PollJob *job = qobject_cast<PollJob *>(sender());
Q_ASSERT(job);
if (!job->_error.isEmpty()) {
qDebug() << "There was an error with file " << job->_item._file << job->_error ;
} else {
_journal->setFileRecord(SyncJournalFileRecord(job->_item, _localPath + job->_item._file));
}
// Continue with the next entry, or finish
start();
}
}

View file

@ -21,6 +21,7 @@
#include <qelapsedtimer.h>
#include "syncfileitem.h"
#include "syncjournaldb.h"
struct hbf_transfer_s;
struct ne_session_s;
@ -29,6 +30,8 @@ typedef struct ne_prop_result_set_s ne_prop_result_set;
namespace Mirall {
class Account;
class SyncJournalDb;
class OwncloudPropagator;
@ -253,6 +256,25 @@ signals:
};
// Job that wait for all the poll jobs to be completed
class CleanupPollsJob : public QObject {
Q_OBJECT
QVector< SyncJournalDb::PollInfo > _pollInfos;
Account *_account;
SyncJournalDb *_journal;
QString _localPath;
public:
explicit CleanupPollsJob(const QVector< SyncJournalDb::PollInfo > &pollInfos, Account *account,
SyncJournalDb *journal, const QString &localPath, QObject* parent = 0)
: QObject(parent), _pollInfos(pollInfos), _account(account), _journal(journal), _localPath(localPath) {}
void start();
signals:
void finished();
private slots:
void slotPollFinished();
};
}
#endif

View file

@ -86,11 +86,50 @@ void PUTFileJob::slotTimeout() {
void PollJob::start()
{
setTimeout(30 * 1000);
setReply(davRequest("GET", path()));
connect(reply(), SIGNAL(downloadProgress(qint64,qint64)), this, SLOT(resetTimeout()));
AbstractNetworkJob::start();
}
bool PollJob::finished()
{
QNetworkReply::NetworkError err = reply()->error();
if (err != QNetworkReply::NoError) {
return false;
}
bool ok = false;
QVariantMap status = QtJson::parse(QString::fromUtf8(reply()->readAll()), ok).toMap();
if (!ok || status.isEmpty()) {
qDebug() << "Invalid json reply from the poll URL";
emit finishedSignal();
// FIXME: retry?
return true;
}
// the following code only happens after all chunks were uploaded.
// the file id should only be empty for new files up- or downloaded
QByteArray fid = status["fileid"].toByteArray();
if( !fid.isEmpty() ) {
if( !_item._fileId.isEmpty() && _item._fileId != fid ) {
qDebug() << "WARN: File ID changed!" << _item._fileId << fid;
}
_item._fileId = fid;
}
_item._etag = status["etag"].toByteArray();
_item._responseTimeStamp = responseTimestamp();
SyncJournalDb::PollInfo info;
info._file = _item._file;
// no info._url removes it from the database
_journal->setPollInfo(info);
emit finishedSignal();
return true;
}
void PropagateUploadFileQNAM::start()
{
@ -408,48 +447,27 @@ void PropagateUploadFileQNAM::slotUploadProgress(qint64 sent, qint64)
void PropagateUploadFileQNAM::startPollJob(const QString& path)
{
PollJob* job = new PollJob(AccountManager::instance()->account(), path, this);
job->setTimeout(_propagator->httpTimeout() * 10000);
connect(job, SIGNAL(finishedSignal(bool)), SLOT(slotPollFinished(bool)));
PollJob* job = new PollJob(AccountManager::instance()->account(), path, _item,
_propagator->_journal, _propagator->_localDir, this);
connect(job, SIGNAL(finishedSignal()), SLOT(slotPollFinished()));
SyncJournalDb::PollInfo info;
info._file = _item._file;
info._url = path;
info._modtime = _item._modtime;
_propagator->_journal->setPollInfo(info);
}
void PropagateUploadFileQNAM::slotPollFinished(bool success)
void PropagateUploadFileQNAM::slotPollFinished()
{
PollJob *job = qobject_cast<PollJob *>(sender());
Q_ASSERT(job);
qDebug() << Q_FUNC_INFO << job->reply()->request().url() << "FINISHED WITH STATUS"
<< job->reply()->error()
<< (job->reply()->error() == QNetworkReply::NoError ? QLatin1String("") : job->reply()->errorString())
<< job->reply()->attribute(QNetworkRequest::HttpStatusCodeAttribute)
<< job->reply()->attribute(QNetworkRequest::HttpReasonPhraseAttribute);
QNetworkReply::NetworkError err = job->reply()->error();
if (!success || err != QNetworkReply::NoError) {
startPollJob(job->path());
if (!job->_error.isEmpty()) {
done(SyncFileItem::NormalError, job->_error);
return;
}
bool ok = false;
QVariantMap status = QtJson::parse(QString::fromUtf8(job->reply()->readAll()), ok).toMap();
if (!ok || status.isEmpty()) {
_propagator->_activeJobs--;
done(SyncFileItem::NormalError, tr("Invalid json reply from the poll URL"));
// FIXME: retry?
return;
}
// the following code only happens after all chunks were uploaded.
// the file id should only be empty for new files up- or downloaded
QByteArray fid = status["fileid"].toByteArray();
if( !fid.isEmpty() ) {
if( !_item._fileId.isEmpty() && _item._fileId != fid ) {
qDebug() << "WARN: File ID changed!" << _item._fileId << fid;
}
_item._fileId = fid;
}
_item._etag = status["etag"].toByteArray();
_item._responseTimeStamp = job->responseTimestamp();
finalize(_item);
finalize(job->_item);
}
void PropagateUploadFileQNAM::abort()

View file

@ -82,26 +82,26 @@ signals:
class PollJob : public AbstractNetworkJob {
Q_OBJECT
SyncJournalDb *_journal;
QString _localPath;
public:
SyncFileItem _item;
const QString _error;
// Takes ownership of the device
explicit PollJob(Account* account, const QString &path, QObject *parent)
: AbstractNetworkJob(account, path, parent) {}
explicit PollJob(Account* account, const QString &path, SyncFileItem item,
SyncJournalDb *journal, const QString &localPath, QObject *parent)
: AbstractNetworkJob(account, path, parent), _journal(journal), _localPath(localPath), _item(item) {}
virtual void start();
virtual bool finished() {
emit finishedSignal(true);
return true;
}
virtual void slotTimeout() {
void start() Q_DECL_OVERRIDE;
bool finished() Q_DECL_OVERRIDE;
void slotTimeout() Q_DECL_OVERRIDE {
// emit finishedSignal(false);
// deleteLater();
reply()->abort();
}
signals:
void finishedSignal(bool success);
void finishedSignal();
};
@ -120,7 +120,7 @@ public:
void start();
private slots:
void slotPutFinished();
void slotPollFinished(bool success);
void slotPollFinished();
void slotUploadProgress(qint64,qint64);
void abort();
void startNextChunk();
@ -193,8 +193,6 @@ private slots:
void abort();
void downloadFinished();
void slotDownloadProgress(qint64,qint64);
};

View file

@ -438,6 +438,18 @@ void SyncEngine::handleSyncError(CSYNC *ctx, const char *state) {
void SyncEngine::startSync()
{
if (_journal->exists()) {
QVector< SyncJournalDb::PollInfo > pollInfos = _journal->getPollInfos();
if (!pollInfos.isEmpty()) {
qDebug() << "Finish Poll jobs before starting a sync";
CleanupPollsJob *job = new CleanupPollsJob(pollInfos, AccountManager::instance()->account(),
_journal, _localPath, this);
connect(job, SIGNAL(finished()), this, SLOT(startSync()));
job->start();
return;
}
}
Q_ASSERT(!_syncRunning);
_syncRunning = true;

View file

@ -195,8 +195,16 @@ bool SyncJournalDb::checkConnect()
return sqlFail("Create table blacklist", createQuery);
}
createQuery.prepare("CREATE TABLE IF NOT EXISTS poll("
"path VARCHAR(4096),"
"modtime INTEGER(8),"
"pollpath VARCHAR(4096));");
if (!createQuery.exec()) {
return sqlFail("Create table poll", createQuery);
}
createQuery.prepare("CREATE TABLE IF NOT EXISTS version("
"major INTEGER(8),"
"major VARCHAR(4096),"
"minor INTEGER(8),"
"patch INTEGER(8),"
"custom VARCHAR(256)"
@ -820,6 +828,62 @@ void SyncJournalDb::updateBlacklistEntry( const SyncJournalBlacklistRecord& item
}
QVector< SyncJournalDb::PollInfo > SyncJournalDb::getPollInfos()
{
QMutexLocker locker(&_mutex);
QVector< SyncJournalDb::PollInfo > res;
if( checkConnect() )
return res;
QSqlQuery query("SELECT path, mtime, pollpath FROM poll",_db);
if (!query.exec()) {
QString err = query.lastError().text();
qDebug() << "Database error :" << query.lastQuery() << ", Error:" << err;
return res;
}
while( query.next() ) {
PollInfo info;
info._file = query.value(0).toString();
info._modtime = query.value(1).toLongLong();
info._url = query.value(2).toString();
res.append(info);
}
query.finish();
return res;
}
void SyncJournalDb::setPollInfo(const SyncJournalDb::PollInfo& info)
{
QMutexLocker locker(&_mutex);
if( !checkConnect() ) {
return;
}
if (info._file.isEmpty()) {
QSqlQuery query("DELETE FROM poll WHERE path=?", _db);
query.bindValue(0, info._file);
if( !query.exec() ) {
qDebug() << "SQL error in setPollInfo: "<< query.lastError().text();
} else {
qDebug() << query.executedQuery() << info._file;
}
} else {
QSqlQuery query("INSERT OR REPLACE INTO poll (path, mtime, pollpath) VALUES( ? , ? , ? )", _db);
query.bindValue(0, info._file);
query.bindValue(1, QString::number(info._modtime));
query.bindValue(2, info._url);
if( !query.exec() ) {
qDebug() << "SQL error in setPollInfo: "<< query.lastError().text();
} else {
qDebug() << query.executedQuery() << info._file << info._url;
}
}
}
void SyncJournalDb::avoidRenamesOnNextSync(const QString& path)
{
QMutexLocker locker(&_mutex);

View file

@ -66,12 +66,20 @@ public:
bool _valid;
};
struct PollInfo {
QString _file;
QString _url;
time_t _modtime;
};
DownloadInfo getDownloadInfo(const QString &file);
void setDownloadInfo(const QString &file, const DownloadInfo &i);
UploadInfo getUploadInfo(const QString &file);
void setUploadInfo(const QString &file, const UploadInfo &i);
SyncJournalBlacklistRecord blacklistEntry( const QString& );
void avoidRenamesOnNextSync(const QString &path);
void setPollInfo(const PollInfo &);
QVector<PollInfo> getPollInfos();
/**
* Make sure that on the next sync, filName is not read from the DB but use the PROPFIND to