Chunking-Ng: Resume

This commit is contained in:
Olivier Goffart 2016-08-02 17:14:44 +02:00
parent a1558100b8
commit fad387b6b8
4 changed files with 102 additions and 42 deletions

View file

@ -270,6 +270,11 @@ LsColJob::LsColJob(AccountPtr account, const QString &path, QObject *parent)
{
}
LsColJob::LsColJob(AccountPtr account, const QUrl &url, QObject *parent)
: AbstractNetworkJob(account, QString(), parent), _url(url)
{
}
void LsColJob::setProperties(QList<QByteArray> properties)
{
_properties = properties;
@ -313,7 +318,8 @@ void LsColJob::start()
QBuffer *buf = new QBuffer(this);
buf->setData(xml);
buf->open(QIODevice::ReadOnly);
QNetworkReply *reply = davRequest("PROPFIND", path(), req, buf);
QNetworkReply *reply = _url.isValid() ? davRequest("PROPFIND", _url, req, buf)
: davRequest("PROPFIND", path(), req, buf);
buf->setParent(reply);
setReply(reply);
setupConnections(reply);

View file

@ -62,6 +62,7 @@ class OWNCLOUDSYNC_EXPORT LsColJob : public AbstractNetworkJob {
Q_OBJECT
public:
explicit LsColJob(AccountPtr account, const QString &path, QObject *parent = 0);
explicit LsColJob(AccountPtr account, const QUrl &url, QObject *parent = 0);
void start() Q_DECL_OVERRIDE;
QHash<QString, qint64> _sizes;
@ -87,6 +88,7 @@ private slots:
private:
QList<QByteArray> _properties;
QUrl _url; // Used instead of path() if the url is specified in the constructor
};
/**

View file

@ -286,7 +286,8 @@ class PropagateUploadFileNG : public PropagateUploadFileCommon {
private:
quint64 _sent; /// amount of data that was already sent
uint _transferId; /// transfer id (part of the url)
int _currentChunk;
int _currentChunk; /// Id of the next chunk that will be sent
QMap<int, quint64> _serverChunks; // Map chunk number with its size from the PROPFIND on resume
quint64 chunkSize() const { return _propagator->chunkSize(); }
/**
@ -300,10 +301,13 @@ public:
PropagateUploadFileCommon(propagator,item) {}
void doStartUpload() Q_DECL_OVERRIDE;
private slots:
void slotMkColFinished(QNetworkReply::NetworkError);
private:
void startNewUpload();
void startNextChunk();
private slots:
void slotPropfindFinished();
void slotPropfindFinishedWithError();
void slotMkColFinished(QNetworkReply::NetworkError);
void slotPutFinished();
void slotMoveJobFinished();
void slotUploadProgress(qint64,qint64);

View file

@ -50,14 +50,17 @@ QUrl PropagateUploadFileNG::chunkUrl(int chunk)
*----> doStartUpload()
Check the db: is there an entry?
/ \
no yes
/ \
MKCOL PROPFIND (TODO)
|
slotMkColFinished() :
| :
+-----+---------------------+
/ \
no yes
/ \
/ PROPFIND
startNewUpload() <-+ +----------------------------\
| | | \
MKCOL + slotPropfindFinishedWithError() slotPropfindFinished()
| |
slotMkColFinished() |
| |
+-----+-------------------------------------------------------+
|
+----> startNextChunk() ---finished? --+
^ | |
@ -74,27 +77,87 @@ QUrl PropagateUploadFileNG::chunkUrl(int chunk)
void PropagateUploadFileNG::doStartUpload()
{
_transferId = qrand() ^ _item->_modtime ^ (_item->_size << 16) ^ qHash(_item->_file);
#if FUTURE
const SyncJournalDb::UploadInfo progressInfo = _propagator->_journal->getUploadInfo(_item->_file);
if (progressInfo._valid && Utility::qDateTimeToTime_t(progressInfo._modtime) == _item->_modtime ) {
_transferId = progressInfo._transferid;
// TODO: make a PROPFIND call to check what the size on the server is
//_startChunk = progressInfo._chunk;
//qDebug() << Q_FUNC_INFO << _item->_file << ": Resuming from chunk " << _startChunk;
}
#endif
_duration.start();
_propagator->_activeJobList.append(this);
const SyncJournalDb::UploadInfo progressInfo = _propagator->_journal->getUploadInfo(_item->_file);
if (progressInfo._valid && Utility::qDateTimeToTime_t(progressInfo._modtime) == _item->_modtime ) {
_transferId = progressInfo._transferid;
auto url = chunkUrl();
auto job = new LsColJob(_propagator->account(), url, this);
_jobs.append(job);
job->setProperties(QList<QByteArray>() << "resourcetype" << "getcontentlength");
connect(job, SIGNAL(finishedWithoutError()), this, SLOT(slotPropfindFinished()));
connect(job, SIGNAL(finishedWithError(QNetworkReply*)),
this, SLOT(slotPropfindFinishedWithError()));
connect(job, SIGNAL(destroyed(QObject*)), this, SLOT(slotJobDestroyed(QObject*)));
//TODO: port to Qt4
connect(job, &LsColJob::directoryListingIterated,
[this, url](const QString &name, const QMap<QString,QString> &properties) mutable {
if (name == url.path()) {
return; // skip the info about the path itself
}
bool ok = false;
auto chunkId = name.midRef(name.lastIndexOf('/')+1).toUInt(&ok);
if (ok) {
this->_serverChunks[chunkId] = properties["getcontentlength"].toULongLong();
}
});
job->start();
return;
}
startNewUpload();
}
void PropagateUploadFileNG::slotPropfindFinished()
{
auto job = qobject_cast<LsColJob *>(sender());
slotJobDestroyed(job); // remove it from the _jobs list
_propagator->_activeJobList.removeOne(this);
_currentChunk = 0;
_sent = 0;
while (_serverChunks.contains(_currentChunk)) {
_sent += _serverChunks[_currentChunk];
++_currentChunk;
}
startNextChunk();
}
void PropagateUploadFileNG::slotPropfindFinishedWithError()
{
auto job = qobject_cast<LsColJob *>(sender());
slotJobDestroyed(job); // remove it from the _jobs list
QNetworkReply::NetworkError err = job->reply()->error();
auto httpErrorCode = job->reply()->attribute(QNetworkRequest::HttpStatusCodeAttribute).toInt();
auto status = classifyError(err, httpErrorCode, &_propagator->_anotherSyncNeeded);
if (status == SyncFileItem::FatalError) {
_propagator->_activeJobList.removeOne(this);
QString errorString = errorMessage(job->reply()->errorString(), job->reply()->readAll());
abortWithError(status, errorString);
return;
}
startNewUpload();
}
void PropagateUploadFileNG::startNewUpload()
{
Q_ASSERT(_propagator->_activeJobList.count(this) == 1);
_transferId = qrand() ^ _item->_modtime ^ (_item->_size << 16) ^ qHash(_item->_file);
_sent = 0;
_currentChunk = 0;
_duration.start();
emit progress(*_item, 0);
SyncJournalDb::UploadInfo pi;
pi._valid = true;
pi._transferid = _transferId;
pi._modtime = Utility::qDateTimeFromTime_t(_item->_modtime);
_propagator->_journal->setUploadInfo(_item->_file, pi);
_propagator->_journal->commit("Upload info");
auto job = new MkColJob(_propagator->account(),
chunkUrl(),
this);
@ -273,21 +336,6 @@ void PropagateUploadFileNG::slotPutFinished()
_propagator->_journal->wipeErrorBlacklistEntry(_item->_file);
_item->_hasBlacklistEntry = false;
}
SyncJournalDb::UploadInfo pi;
pi._valid = true;
auto currentChunk = job->_chunk;
foreach (auto *job, _jobs) {
// Take the minimum finished one
if (auto putJob = qobject_cast<PUTFileJob*>(job)) {
currentChunk = qMin(currentChunk, putJob->_chunk - 1);
}
}
pi._chunk = currentChunk; // FIXME
pi._transferid = _transferId;
pi._modtime = Utility::qDateTimeFromTime_t(_item->_modtime);
_propagator->_journal->setUploadInfo(_item->_file, pi);
_propagator->_journal->commit("Upload info");
}
startNextChunk();
}