Progress database:

Remember about errors, and don't retry if there is more than 3 errors

Conflicts:
	src/csync_propagate.c
This commit is contained in:
Olivier Goffart 2013-02-27 12:34:42 +01:00
parent 2e73ce5605
commit 53ad4a910f
5 changed files with 208 additions and 6 deletions

View file

@ -738,6 +738,12 @@ int csync_destroy(CSYNC *ctx) {
}
#endif
while (ctx->progress) {
csync_progressinfo_t *next = ctx->progress->next;
csync_statedb_free_progressinfo(ctx->progress);
ctx->progress = next;
}
/* destroy the rbtrees */
if (c_rbtree_size(ctx->local.tree) > 0) {
c_rbtree_destroy(ctx->local.tree, _tree_destructor);

View file

@ -143,6 +143,8 @@ struct csync_s {
uid_t euid;
} pwd;
struct csync_progressinfo_s *progress;
/* replica we are currently walking */
enum csync_replica_e current;

View file

@ -32,6 +32,7 @@
#include "csync_private.h"
#include "csync_propagate.h"
#include "csync_statedb.h"
#include "vio/csync_vio.h"
#include "c_jhash.h"
@ -69,6 +70,27 @@ static void _store_id_update(CSYNC *ctx, csync_file_stat_t *st) {
}
}
/* Record the error in the ctx->progress
pi may be a previous csync_progressinfo_t from the database.
If pi is NULL, a new one is created, else it is re-used
*/
static void _csync_record_error(CSYNC *ctx, csync_file_stat_t *st, csync_progressinfo_t *pi) {
if (pi) {
pi->error++;
} else {
pi = c_malloc(sizeof(csync_progressinfo_t));
pi->chunk = 0;
pi->tmpfile = NULL;
pi->md5 = st->md5 ? c_strdup(st->md5) : NULL;
pi->modtime = st->modtime;
pi->phash = st->phash;
pi->error = 1;
}
pi->next = ctx->progress;
ctx->progress = pi;
}
static bool _push_to_tmp_first(CSYNC *ctx)
{
if( !ctx ) return true;
@ -131,6 +153,13 @@ static int _csync_push_file(CSYNC *ctx, csync_file_stat_t *st) {
int count = 0;
int flags = 0;
csync_progressinfo_t *pi = NULL;
pi = csync_statedb_get_progressinfo(ctx, st->phash, st->modtime, st->md5);
if (pi && pi->error > 3) {
rc = 1;
goto out;
}
rep_bak = ctx->replica;
switch (ctx->current) {
@ -533,8 +562,12 @@ out:
csync_vio_unlink(ctx, turi);
}
}
_csync_record_error(ctx, st, pi);
pi = NULL;
}
csync_statedb_free_progressinfo(pi);
SAFE_FREE(prev_tdir);
SAFE_FREE(suri);
SAFE_FREE(duri);
@ -687,6 +720,13 @@ static int _csync_rename_file(CSYNC *ctx, csync_file_stat_t *st) {
const char *tmd5 = NULL;
c_rbnode_t *node = NULL;
csync_progressinfo_t *pi = NULL;
pi = csync_statedb_get_progressinfo(ctx, st->phash, st->modtime, st->md5);
if (pi && pi->error > 3) {
rc = 1;
goto out;
}
switch (ctx->current) {
case REMOTE_REPLICA:
if( !(st->path && st->destpath) ) {
@ -774,8 +814,12 @@ out:
/* set instruction for the statedb merger */
if (rc != 0) {
st->instruction = CSYNC_INSTRUCTION_NONE;
_csync_record_error(ctx, st, pi);
pi = NULL;
}
csync_statedb_free_progressinfo(pi);
return rc;
}
@ -805,6 +849,14 @@ static int _csync_remove_file(CSYNC *ctx, csync_file_stat_t *st) {
char *uri = NULL;
int rc = -1;
csync_progressinfo_t *pi = NULL;
pi = csync_statedb_get_progressinfo(ctx, st->phash, st->modtime, st->md5);
if (pi && pi->error > 3) {
rc = 1;
goto out;
}
switch (ctx->current) {
case LOCAL_REPLICA:
if (asprintf(&uri, "%s/%s", ctx->local.uri, st->path) < 0) {
@ -855,8 +907,11 @@ out:
if (rc != 0) {
/* Write file to statedb, to try to sync again on the next run. */
st->instruction = CSYNC_INSTRUCTION_NONE;
_csync_record_error(ctx, st, pi);
pi = NULL;
}
csync_statedb_free_progressinfo(pi);
return rc;
}
@ -868,6 +923,13 @@ static int _csync_new_dir(CSYNC *ctx, csync_file_stat_t *st) {
struct timeval times[2];
int rc = -1;
csync_progressinfo_t *pi = NULL;
pi = csync_statedb_get_progressinfo(ctx, st->phash, st->modtime, st->md5);
if (pi && pi->error > 3) {
rc = 1;
goto out;
}
replica_bak = ctx->replica;
switch (ctx->current) {
@ -952,8 +1014,11 @@ out:
/* set instruction for the statedb merger */
if (rc != 0) {
st->instruction = CSYNC_INSTRUCTION_ERROR;
_csync_record_error(ctx, st, pi);
pi = NULL;
}
csync_statedb_free_progressinfo(pi);
return rc;
}
@ -965,6 +1030,13 @@ static int _csync_sync_dir(CSYNC *ctx, csync_file_stat_t *st) {
struct timeval times[2];
int rc = -1;
csync_progressinfo_t *pi = NULL;
pi = csync_statedb_get_progressinfo(ctx, st->phash, st->modtime, st->md5);
if (pi && pi->error > 3) {
rc = 1;
goto out;
}
replica_bak = ctx->replica;
switch (ctx->current) {
@ -1032,8 +1104,11 @@ out:
/* set instruction for the statedb merger */
if (rc != 0) {
st->instruction = CSYNC_INSTRUCTION_ERROR;
_csync_record_error(ctx, st, pi);
pi = NULL;
}
csync_statedb_free_progressinfo(pi);
return rc;
}
@ -1347,6 +1422,7 @@ static int _csync_propagation_cleanup(CSYNC *ctx) {
static int _csync_propagation_file_visitor(void *obj, void *data) {
csync_file_stat_t *st = NULL;
CSYNC *ctx = NULL;
int rc = 0;
st = (csync_file_stat_t *) obj;
ctx = (CSYNC *) data;
@ -1357,32 +1433,32 @@ static int _csync_propagation_file_visitor(void *obj, void *data) {
case CSYNC_FTW_TYPE_FILE:
switch (st->instruction) {
case CSYNC_INSTRUCTION_NEW:
if (_csync_new_file(ctx, st) < 0) {
if ((rc = _csync_new_file(ctx, st)) < 0) {
CSYNC_LOG(CSYNC_LOG_PRIORITY_TRACE,"FAIL NEW: %s",st->path);
goto err;
}
break;
case CSYNC_INSTRUCTION_RENAME:
if (_csync_rename_file(ctx, st) < 0) {
if ((rc = _csync_rename_file(ctx, st)) < 0) {
CSYNC_LOG(CSYNC_LOG_PRIORITY_TRACE,"FAIL RENAME: %s",st->path);
goto err;
}
break;
case CSYNC_INSTRUCTION_SYNC:
if (_csync_sync_file(ctx, st) < 0) {
if ((rc = _csync_sync_file(ctx, st)) < 0) {
CSYNC_LOG(CSYNC_LOG_PRIORITY_TRACE,"FAIL SYNC: %s",st->path);
goto err;
}
break;
case CSYNC_INSTRUCTION_REMOVE:
if (_csync_remove_file(ctx, st) < 0) {
if ((rc = _csync_remove_file(ctx, st)) < 0) {
CSYNC_LOG(CSYNC_LOG_PRIORITY_TRACE,"FAIL REMOVE: %s",st->path);
goto err;
}
break;
case CSYNC_INSTRUCTION_CONFLICT:
CSYNC_LOG(CSYNC_LOG_PRIORITY_TRACE,"case CSYNC_INSTRUCTION_CONFLICT: %s",st->path);
if (_csync_conflict_file(ctx, st) < 0) {
if ((rc = _csync_conflict_file(ctx, st)) < 0) {
goto err;
}
break;
@ -1401,7 +1477,7 @@ static int _csync_propagation_file_visitor(void *obj, void *data) {
break;
}
return 0;
return rc;
err:
return -1;
}

View file

@ -251,6 +251,11 @@ int csync_statedb_write(CSYNC *ctx) {
return -1;
}
/* progress info */
if (csync_statedb_write_progressinfo(ctx, ctx->progress) < 0) {
return -1;
}
return 0;
}
@ -360,6 +365,23 @@ int csync_statedb_create_tables(CSYNC *ctx) {
return -1;
}
c_strlist_destroy(result);
result = csync_statedb_query(ctx,
"CREATE TABLE IF NOT EXISTS progress("
"phash INTEGER(8),"
"modtime INTEGER(8),"
"md5 VARCHAR(32),"
"chunk INTEGER(8),"
"error_count INTEGER(8),"
"tmpfile VARCHAR(4096),"
"PRIMARY KEY(phash)"
");"
);
if (result == NULL) {
return -1;
}
c_strlist_destroy(result);
/* write the version table. */
stmt = sqlite3_mprintf( "INSERT INTO version (major, minor, patch) VALUES (%d, %d, %d);",
@ -372,6 +394,7 @@ int csync_statedb_create_tables(CSYNC *ctx) {
return -1;
}
sqlite3_free(stmt);
return 0;
}
@ -386,6 +409,14 @@ int csync_statedb_drop_tables(CSYNC *ctx) {
return -1;
}
c_strlist_destroy(result);
result = csync_statedb_query(ctx,
"DROP TABLE IF EXISTS progress;"
);
if (result == NULL) {
return -1;
}
c_strlist_destroy(result);
return 0;
}
@ -844,4 +875,76 @@ int csync_statedb_insert(CSYNC *ctx, const char *statement) {
return sqlite3_last_insert_rowid(ctx->statedb.db);
}
csync_progressinfo_t* csync_statedb_get_progressinfo(CSYNC *ctx, uint64_t phash, uint64_t modtime, const char* md5) {
char *stmt = NULL;
csync_progressinfo_t *ret = NULL;
c_strlist_t *result = NULL;
if( ! csync_get_statedb_exists(ctx)) return ret;
stmt = sqlite3_mprintf("SELECT error_count, chunk, tmpfile FROM progress WHERE phash='%llu' AND modtime='%lld' AND md5='%q'",
(long long unsigned int) phash, (long long signed int) modtime, md5);
if (!stmt) return ret;
result = csync_statedb_query(ctx, stmt);
sqlite3_free(stmt);
if (result == NULL) {
return NULL;
}
if (result->count == 3) {
ret = c_malloc(sizeof(csync_progressinfo_t));
if (!ret) goto out;
ret->next = NULL;
ret->chunk = atoi(result->vector[1]);
ret->error = atoi(result->vector[0]);
ret->tmpfile = c_strdup(result->vector[2]);
ret->md5 = md5 ? c_strdup(md5) : NULL;
ret->modtime = modtime;
ret->phash = phash;
}
out:
c_strlist_destroy(result);
return ret;
}
void csync_statedb_free_progressinfo(csync_progressinfo_t* pi)
{
if (!pi) return;
SAFE_FREE(pi->md5);
SAFE_FREE(pi->tmpfile);
SAFE_FREE(pi);
}
int csync_statedb_write_progressinfo(CSYNC* ctx, csync_progressinfo_t* pi)
{
int rc = 0;
char *stmt = NULL;
while (rc > -1 && pi) {
stmt = sqlite3_mprintf("INSERT INTO progress "
"(phash, modtime, md5, chunk, error_count, tmpfile) VALUES"
"(%llu, %lld, '%q', %d, %d, '%q');",
(long long signed int) pi->phash,
(long long int) pi->modtime,
pi->md5,
pi->chunk,
pi->error,
pi->tmpfile);
if (stmt == NULL) {
return -1;
}
CSYNC_LOG(CSYNC_LOG_PRIORITY_TRACE, "%s" , stmt);
rc = csync_statedb_insert(ctx, stmt);
sqlite3_free(stmt);
pi = pi->next;
}
return 0;
}
/* vim: set ts=8 sw=2 et cindent: */

View file

@ -110,6 +110,21 @@ int csync_statedb_drop_tables(CSYNC *ctx);
int csync_statedb_insert_metadata(CSYNC *ctx);
typedef struct csync_progressinfo_s {
struct csync_progressinfo_s *next;
uint64_t phash;
uint64_t modtime;
char *md5;
int error;
int chunk;
char *tmpfile;
} csync_progressinfo_t;
csync_progressinfo_t *csync_statedb_get_progressinfo(CSYNC *ctx, uint64_t phash, uint64_t modtime, const char *md5);
void csync_statedb_free_progressinfo(csync_progressinfo_t *pi);
int csync_statedb_write_progressinfo(CSYNC *ctx, csync_progressinfo_t *pi);
/**
* }@
*/