Parallel download

This commit is contained in:
Olivier Goffart 2014-02-17 13:48:56 +01:00
parent 7b6269b4bf
commit b35e38f80f
4 changed files with 280 additions and 10 deletions

View file

@ -52,10 +52,6 @@
/* The maximum number of active job in parallel */
static const int maximumActiveJob = 6;
// We use some internals of csync:
extern "C" int c_utimes(const char *, const struct timeval *);
extern "C" void csync_win32_set_file_hidden( const char *file, bool h );
namespace Mirall {
void PropagateItemJob::done(SyncFileItem::Status status, const QString &errorString)
@ -153,7 +149,7 @@ void PropagateNeonJob::slotRestoreJobCompleted(const SyncFileItem& item )
// compare two files with given filename and return true if they have the same content
static bool fileEquals(const QString &fn1, const QString &fn2) {
bool fileEquals(const QString &fn1, const QString &fn2) {
QFile f1(fn1);
QFile f2(fn2);
if (!f1.open(QIODevice::ReadOnly) || !f2.open(QIODevice::ReadOnly)) {
@ -1039,7 +1035,7 @@ PropagateItemJob* OwncloudPropagator::createJob(const SyncFileItem& item) {
// Should we set the mtime?
return 0;
}
if (item._dir != SyncFileItem::Up) return new PropagateDownloadFile(this, item);
if (item._dir != SyncFileItem::Up) return new PropagateDownloadFileQNAM(this, item);
else return new PropagateUploadFileQNAM(this, item);
case CSYNC_INSTRUCTION_RENAME:
if (item._dir == SyncFileItem::Up) {

View file

@ -21,8 +21,16 @@
#include <QFile>
#include <qdebug.h>
// We use some internals of csync:
extern "C" int c_utimes(const char *, const struct timeval *);
extern "C" void csync_win32_set_file_hidden( const char *file, bool h );
namespace Mirall {
/** compare two files with given filename and return true if they have the same content */
bool fileEquals(const QString &fn1, const QString &fn2);
/* Helper for QScopedPointer<>, to be used as the deleter.
* QScopePointer will call the right overload of cleanup for the pointer it holds
*/

View file

@ -19,6 +19,7 @@
#include "syncjournalfilerecord.h"
#include "utility.h"
#include <QNetworkAccessManager>
#include <QFileInfo>
#include <cmath>
namespace Mirall {
@ -27,7 +28,6 @@ void PUTFileJob::start() {
QNetworkRequest req;
for(QMap<QByteArray, QByteArray>::const_iterator it = _headers.begin(); it != _headers.end(); ++it) {
req.setRawHeader(it.key(), it.value());
qDebug() << it.key() << it.value();
}
setReply(davRequest("PUT", path(), req, _device));
@ -38,10 +38,10 @@ void PUTFileJob::start() {
qDebug() << "getting etag: request network error: " << reply()->errorString();
}
AbstractNetworkJob::start();
}
static const int CHUNKING_SIZE = (10*1024);
// FIXME: increase and make configurable
static const int CHUNKING_SIZE = (100*1024);
void PropagateUploadFileQNAM::start()
{
@ -106,6 +106,10 @@ struct ChunkDevice : QIODevice {
void PropagateUploadFileQNAM::startNextChunk()
{
if (_propagator->_abortRequested.fetchAndAddRelaxed(0))
return;
/*
* // If the source file has changed during upload, it is detected and the
* // variable _previousFileSize is set accordingly. The propagator waits a
@ -155,7 +159,7 @@ void PropagateUploadFileQNAM::slotPutFinished()
QNetworkReply::NetworkError err = job->reply()->error();
if (err != QNetworkReply::NoError) {
// /* If the source file changed during submission, lets try again */
// /* If the source file changed during submission, lets try again */
// if( state == HBF_SOURCE_FILE_CHANGE ) {
// if( attempts++ < 5 ) { /* FIXME: How often do we want to try? */
// qDebug("SOURCE file has changed during upload, retry #%d in %d seconds!", attempts, 2*attempts);
@ -285,5 +289,222 @@ void PropagateUploadFileQNAM::abort()
_job->reply()->abort();
}
///////////////////////////////////////////////////////////////////////////////////////////////////
void GETFileJob::start() {
QNetworkRequest req;
for(QMap<QByteArray, QByteArray>::const_iterator it = _headers.begin(); it != _headers.end(); ++it) {
req.setRawHeader(it.key(), it.value());
}
setReply(davRequest("GET", path(), req));
setupConnections(reply());
if( reply()->error() != QNetworkReply::NoError ) {
qDebug() << "getting etag: request network error: " << reply()->errorString();
}
connect(reply(), SIGNAL(readyRead()), this, SLOT(slotReadyRead()));
AbstractNetworkJob::start();
}
void GETFileJob::slotReadyRead()
{
// FIXME: error handling (hard drive full, ....)
_device->write(reply()->readAll());
}
void PropagateDownloadFileQNAM::start()
{
if (_propagator->_abortRequested.fetchAndAddRelaxed(0))
return;
qDebug() << Q_FUNC_INFO << _item._file << _propagator->_activeJobs;
emit progress(Progress::StartDownload, _item, 0, _item._size);
QString tmpFileName;
const SyncJournalDb::DownloadInfo progressInfo = _propagator->_journal->getDownloadInfo(_item._file);
if (progressInfo._valid) {
// if the etag has changed meanwhile, remove the already downloaded part.
if (progressInfo._etag != _item._etag) {
QFile::remove(_propagator->_localDir + progressInfo._tmpfile);
_propagator->_journal->setDownloadInfo(_item._file, SyncJournalDb::DownloadInfo());
} else {
tmpFileName = progressInfo._tmpfile;
_expectedEtagForResume = progressInfo._etag;
}
}
if (tmpFileName.isEmpty()) {
tmpFileName = _item._file;
//add a dot at the begining of the filename to hide the file.
int slashPos = tmpFileName.lastIndexOf('/');
tmpFileName.insert(slashPos+1, '.');
//add the suffix
tmpFileName += ".~" + QString::number(uint(qrand()), 16);
}
_tmpFile.setFileName(_propagator->_localDir + tmpFileName);
if (!_tmpFile.open(QIODevice::Append | QIODevice::Unbuffered)) {
done(SyncFileItem::NormalError, _tmpFile.errorString());
return;
}
csync_win32_set_file_hidden(_tmpFile.fileName().toUtf8().constData(), true);
{
SyncJournalDb::DownloadInfo pi;
pi._etag = _item._etag;
pi._tmpfile = tmpFileName;
pi._valid = true;
_propagator->_journal->setDownloadInfo(_item._file, pi);
_propagator->_journal->commit("download file start");
}
QMap<QByteArray, QByteArray> headers;
/* Allow compressed content by setting the header */
//headers["Accept-Encoding"] = "gzip";
if (_tmpFile.size() > 0) {
quint64 done = _tmpFile.size();
if (done == _item._size) {
qDebug() << "File is already complete, no need to download";
downloadFinished();
return;
}
headers["Range"] = "bytes=" + QByteArray::number(done) +'-';
headers["Accept-Ranges"] = "bytes";
qDebug() << "Retry with range " << headers["Range"];
}
_job = new GETFileJob(AccountManager::instance()->account(), _item._file, &_tmpFile, headers);
connect(_job, SIGNAL(finishedSignal()), this, SLOT(slotGetFinished()));
_propagator->_activeJobs ++;
_job->start();
emitReady();
}
void PropagateDownloadFileQNAM::slotGetFinished()
{
_propagator->_activeJobs--;
GETFileJob *job = qobject_cast<GETFileJob *>(sender());
Q_ASSERT(job);
qDebug() << Q_FUNC_INFO << job->reply()->request().url() << "FINISHED WITH STATUS" << job->reply()->error() << job->reply()->errorString();
QNetworkReply::NetworkError err = job->reply()->error();
if (err != QNetworkReply::NoError) {
if (_tmpFile.size() == 0) {
// don't keep the temporary file if it is empty.
_tmpFile.close();
_tmpFile.remove();
_propagator->_journal->setDownloadInfo(_item._file, SyncJournalDb::DownloadInfo());
}
// FIXME!
_item._httpErrorCode = job->reply()->attribute(QNetworkRequest::HttpStatusCodeAttribute).toInt();
_propagator->_activeJobs--;
done(SyncFileItem::NormalError, job->reply()->errorString());
return;
}
_item._etag = parseEtag(job->reply()->rawHeader("Etag"));
_tmpFile.close();
_tmpFile.flush();
downloadFinished();
}
void PropagateDownloadFileQNAM::downloadFinished()
{
QString fn = _propagator->_localDir + _item._file;
bool isConflict = _item._instruction == CSYNC_INSTRUCTION_CONFLICT
&& !fileEquals(fn, _tmpFile.fileName()); // compare the files to see if there was an actual conflict.
//In case of conflict, make a backup of the old file
if (isConflict) {
QFile f(fn);
QString conflictFileName(fn);
// Add _conflict-XXXX before the extention.
int dotLocation = conflictFileName.lastIndexOf('.');
// If no extention, add it at the end (take care of cases like foo/.hidden or foo.bar/file)
if (dotLocation <= conflictFileName.lastIndexOf('/') + 1) {
dotLocation = conflictFileName.size();
}
QString timeString = Utility::qDateTimeFromTime_t(_item._modtime).toString("yyyyMMdd-hhmmss");
conflictFileName.insert(dotLocation, "_conflict-" + timeString);
if (!f.rename(conflictFileName)) {
//If the rename fails, don't replace it.
done(SyncFileItem::NormalError, f.errorString());
return;
}
}
QFileInfo existingFile(fn);
if(existingFile.exists() && existingFile.permissions() != _tmpFile.permissions()) {
_tmpFile.setPermissions(existingFile.permissions());
}
csync_win32_set_file_hidden(_tmpFile.fileName().toUtf8().constData(), false);
//FIXME: duplicated code.
#ifndef Q_OS_WIN
bool success;
#if QT_VERSION < QT_VERSION_CHECK(5, 0, 0)
success = _tmpFile.fileEngine()->rename(fn);
// qDebug() << "Renaming " << tmpFile.fileName() << " to " << fn;
#else
// We want a rename that also overwite. QFile::rename does not overwite.
// Qt 5.1 has QSaveFile::renameOverwrite we cold use.
// ### FIXME
QFile::remove(fn);
success = _tmpFile.rename(fn);
#endif
// unixoids
if (!success) {
qDebug() << "FAIL: renaming temp file to final failed: " << _tmpFile.errorString();
done(SyncFileItem::NormalError, _tmpFile.errorString());
return;
}
#else //Q_OS_WIN
BOOL ok;
ok = MoveFileEx((wchar_t*)_tmpFile.fileName().utf16(),
(wchar_t*)QString(_propagator->_localDir + _item._file).utf16(),
MOVEFILE_REPLACE_EXISTING+MOVEFILE_COPY_ALLOWED+MOVEFILE_WRITE_THROUGH);
if (!ok) {
wchar_t *string = 0;
FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER|FORMAT_MESSAGE_FROM_SYSTEM,
NULL, ::GetLastError(), MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
(LPWSTR)&string, 0, NULL);
done(SyncFileItem::NormalError, QString::fromWCharArray(string));
LocalFree((HLOCAL)string);
return;
}
#endif
struct timeval times[2];
times[0].tv_sec = times[1].tv_sec = _item._modtime;
times[0].tv_usec = times[1].tv_usec = 0;
c_utimes(fn.toUtf8().data(), times);
_propagator->_journal->setFileRecord(SyncJournalFileRecord(_item, fn));
_propagator->_journal->setDownloadInfo(_item._file, SyncJournalDb::DownloadInfo());
_propagator->_journal->commit("download file start2");
emit progress(Progress::EndDownload, _item, _item._size, _item._size);
done(isConflict ? SyncFileItem::Conflict : SyncFileItem::Success);
}
void PropagateDownloadFileQNAM::abort()
{
if (_job && _job->reply())
_job->reply()->abort();
}
}

View file

@ -53,6 +53,7 @@ class PUTFileJob : public AbstractNetworkJob {
QMap<QByteArray, QByteArray> _headers;
public:
// Takes ownership of the device
explicit PUTFileJob(Account* account, const QString& path, QIODevice *device,
const QMap<QByteArray, QByteArray> &headers, QObject* parent = 0)
: AbstractNetworkJob(account, path, parent), _device(device), _headers(headers) {}
@ -67,6 +68,7 @@ signals:
void finishedSignal();
};
class PropagateUploadFileQNAM : public PropagateItemJob {
Q_OBJECT
QPointer<PUTFileJob> _job;
@ -86,4 +88,47 @@ private slots:
};
class GETFileJob : public AbstractNetworkJob {
Q_OBJECT
QIODevice* _device;
QMap<QByteArray, QByteArray> _headers;
public:
// DOES NOT take owncership of the device.
explicit GETFileJob(Account* account, const QString& path, QIODevice *device,
const QMap<QByteArray, QByteArray> &headers, QObject* parent = 0)
: AbstractNetworkJob(account, path, parent), _device(device), _headers(headers) {}
virtual void start();
virtual void finished() {
emit finishedSignal();
}
signals:
void finishedSignal();
private slots:
void slotReadyRead();
};
class PropagateDownloadFileQNAM : public PropagateItemJob {
Q_OBJECT
QPointer<GETFileJob> _job;
QByteArray _expectedEtagForResume;
// QFile *_file;
QFile _tmpFile;
public:
PropagateDownloadFileQNAM(OwncloudPropagator* propagator,const SyncFileItem& item)
: PropagateItemJob(propagator, item) {}
void start();
private slots:
void slotGetFinished();
void abort();
void downloadFinished();
};
}