From 5cd2a0d11a463605f5bf8bd2fd2e26e579d0ec50 Mon Sep 17 00:00:00 2001 From: realaravinth Date: Sun, 6 Jun 2021 11:09:37 +0530 Subject: [PATCH] rename pocket to bucket --- src/{pocket.rs => bucket.rs} | 127 +++++++++++++++++++++-------------- src/lib.rs | 62 ++++++----------- src/mcaptcha.rs | 4 +- src/utils.rs | 32 ++++----- tests/test.py | 66 +++++++++++------- 5 files changed, 152 insertions(+), 139 deletions(-) rename src/{pocket.rs => bucket.rs} (67%) diff --git a/src/pocket.rs b/src/bucket.rs similarity index 67% rename from src/pocket.rs rename to src/bucket.rs index 0b1cbaa..0d906d8 100644 --- a/src/pocket.rs +++ b/src/bucket.rs @@ -52,16 +52,16 @@ impl Format { } #[derive(Debug, Clone, Deserialize, Serialize)] -pub struct Pocket { +pub struct Bucket { /// timer ID timer: u64, - /// instant(seconds from UNIX_EPOCH) at which time pocket begins decrement process - pocket_instant: u64, + /// instant(seconds from UNIX_EPOCH) at which time bucket begins decrement process + bucket_instant: u64, /// a list of captcha keys that should be decremented during clean up decrement: HashMap, } -impl Pocket { +impl Bucket { pub fn on_delete(ctx: &Context, event_type: NotifyEvent, event: &str, key_name: &str) { let msg = format!( "Received event: {:?} on key: {} via event: {}", @@ -69,44 +69,44 @@ impl Pocket { ); ctx.log_debug(msg.as_str()); - if !is_pocket_timer(key_name) { + if !is_bucket_timer(key_name) { return; } - let pocket_name = get_pocket_name_from_timer_name(key_name); - if pocket_name.is_none() { + let bucket_name = get_bucket_name_from_timer_name(key_name); + if bucket_name.is_none() { return; } - let pocket_name = pocket_name.unwrap(); + let bucket_name = bucket_name.unwrap(); - let pocket = ctx.open_key_writable(&pocket_name); - if pocket.key_type() == KeyType::Empty { - ctx.log_debug(&format!("Pocket doesn't exist: {}", &key_name)); + let bucket = ctx.open_key_writable(&bucket_name); + if bucket.key_type() == KeyType::Empty { + ctx.log_debug(&format!("Bucket doesn't exist: {}", &key_name)); return; } else { - Pocket::decrement_runner(ctx, &pocket); + Bucket::decrement_runner(ctx, &bucket); } } - /// creates new pocket and sets off timer to go off at `duration` + /// creates new bucket and sets off timer to go off at `duration` #[inline] pub fn new(ctx: &Context, duration: u64) -> CacheResult { let decrement = HashMap::with_capacity(HIT_PER_SECOND); - let pocket_instant = get_pocket_instant(duration)?; + let bucket_instant = get_bucket_instant(duration)?; let timer = ctx.create_timer( Duration::from_secs(duration), Self::decrement, - pocket_instant, + bucket_instant, ); - let pocket = Pocket { + let bucket = Bucket { timer, - pocket_instant, + bucket_instant, decrement, }; - Ok(pocket) + Ok(bucket) } /// increments count of key = captcha and registers for auto decrement @@ -132,29 +132,29 @@ impl Pocket { } } - let pocket_instant = get_pocket_instant(duration)?; - let pocket_name = get_pocket_name(pocket_instant); + let bucket_instant = get_bucket_instant(duration)?; + let bucket_name = get_bucket_name(bucket_instant); - ctx.log_debug(&format!("Pocket name: {}", &pocket_name)); + ctx.log_debug(&format!("Bucket name: {}", &bucket_name)); - // get pocket - let pocket = ctx.open_key_writable(&pocket_name); + // get bucket + let bucket = ctx.open_key_writable(&bucket_name); - match pocket.get_value::(&MCAPTCHA_POCKET_TYPE)? { - Some(pocket) => match pocket.decrement.get_mut(&captcha_name) { + match bucket.get_value::(&MCAPTCHA_BUCKET_TYPE)? { + Some(bucket) => match bucket.decrement.get_mut(&captcha_name) { Some(count) => *count += 1, None => { - pocket.decrement.insert(captcha_name, 1); + bucket.decrement.insert(captcha_name, 1); } }, None => { - let mut counter = Pocket::new(ctx, duration)?; + let mut counter = Bucket::new(ctx, duration)?; counter.decrement.insert(captcha_name, 1); - pocket.set_value(&MCAPTCHA_POCKET_TYPE, counter)?; - let timer = ctx.open_key_writable(&get_timer_name_from_pocket_name(&pocket_name)); + bucket.set_value(&MCAPTCHA_BUCKET_TYPE, counter)?; + let timer = ctx.open_key_writable(&get_timer_name_from_bucket_name(&bucket_name)); timer.write("1")?; - timer.set_expire(Duration::from_secs(duration + POCKET_EXPIRY_OFFSET))?; + timer.set_expire(Duration::from_secs(duration + BUCKET_EXPIRY_OFFSET))?; } }; @@ -165,11 +165,11 @@ impl Pocket { /// use [decrement] when you require auto cleanup. Internally, it calls this method. #[inline] fn decrement_runner(ctx: &Context, key: &RedisKeyWritable) { - let val = key.get_value::(&MCAPTCHA_POCKET_TYPE).unwrap(); + let val = key.get_value::(&MCAPTCHA_BUCKET_TYPE).unwrap(); match val { - Some(pocket) => { + Some(bucket) => { ctx.log_debug(&format!("entering loop hashmap ")); - for (captcha, count) in pocket.decrement.drain() { + for (captcha, count) in bucket.decrement.drain() { ctx.log_debug(&format!( "reading captcha: {} with decr count {}", &captcha, count @@ -196,34 +196,57 @@ impl Pocket { } } None => { - ctx.log_debug(&format!("pocket not found, can't decrement")); + ctx.log_debug(&format!("bucket not found, can't decrement")); } } } /// executes when timer goes off. Decrements all registered counts and cleans itself up - fn decrement(ctx: &Context, pocket_instant: u64) { - // get pocket - let pocket_name = get_pocket_name(pocket_instant); + fn decrement(ctx: &Context, bucket_instant: u64) { + // get bucket + let bucket_name = get_bucket_name(bucket_instant); - let timer = ctx.open_key_writable(&get_timer_name_from_pocket_name(&pocket_name)); + let timer = ctx.open_key_writable(&get_timer_name_from_bucket_name(&bucket_name)); let _ = timer.delete(); - ctx.log_debug(&format!("Pocket instant: {}", &pocket_instant)); + ctx.log_debug(&format!("Bucket instant: {}", &bucket_instant)); - let pocket = ctx.open_key_writable(&pocket_name); - Pocket::decrement_runner(ctx, &pocket); + let bucket = ctx.open_key_writable(&bucket_name); + Bucket::decrement_runner(ctx, &bucket); - match pocket.delete() { + match bucket.delete() { Err(e) => ctx.log_warning(&format!("enountered error while deleting hashmap: {:?}", e)), Ok(_) => (), } } + + pub fn get(ctx: &Context, args: Vec) -> RedisResult { + let mut args = args.into_iter().skip(1); + let key_name = args.next_string()?; + let key_name = utils::get_captcha_key(&key_name); + + let stored_captcha = ctx.open_key(&key_name); + if stored_captcha.key_type() == KeyType::Empty { + return errors::CacheError::new(format!("key {} not found", key_name)).into(); + } + + Ok(stored_captcha.read()?.unwrap().into()) + } + + pub fn counter_create(ctx: &Context, args: Vec) -> RedisResult { + let mut args = args.into_iter().skip(1); + // mcaptcha captcha key name + let key_name = args.next_string()?; + // expiry + let duration = args.next_u64()?; + bucket::Bucket::increment(ctx, duration, &key_name)?; + REDIS_OK + } } -pub static MCAPTCHA_POCKET_TYPE: RedisType = RedisType::new( +pub static MCAPTCHA_BUCKET_TYPE: RedisType = RedisType::new( "mcaptbuck", - REDIS_MCAPTCHA_POCKET_TYPE_VERSION, + REDIS_MCAPTCHA_BUCKET_TYPE_VERSION, raw::RedisModuleTypeMethods { version: raw::REDISMODULE_TYPE_METHOD_VERSION as u64, rdb_load: Some(type_methods::rdb_load), @@ -256,29 +279,29 @@ pub mod type_methods { #[allow(non_snake_case, unused)] pub extern "C" fn rdb_load(rdb: *mut raw::RedisModuleIO, encver: c_int) -> *mut c_void { - let pocket = match encver { + let bucket = match encver { 0 => { let data = raw::load_string(rdb); let fmt = Format::JSON; - let pocket: Pocket = fmt.from_str(&data).unwrap(); - pocket + let bucket: Bucket = fmt.from_str(&data).unwrap(); + bucket } _ => panic!("Can't load old RedisJSON RDB"), }; - // if pocket. - Box::into_raw(Box::new(pocket)) as *mut c_void + // if bucket. + Box::into_raw(Box::new(bucket)) as *mut c_void } pub unsafe extern "C" fn free(value: *mut c_void) { - let val = value as *mut Pocket; + let val = value as *mut Bucket; Box::from_raw(val); } #[allow(non_snake_case, unused)] pub unsafe extern "C" fn rdb_save(rdb: *mut raw::RedisModuleIO, value: *mut c_void) { - let pocket = &*(value as *mut Pocket); - match &serde_json::to_string(pocket) { + let bucket = &*(value as *mut Bucket); + match &serde_json::to_string(bucket) { Ok(string) => raw::save_string(rdb, &string), Err(e) => eprintln!("error while rdb_save: {}", e), } diff --git a/src/lib.rs b/src/lib.rs index a28f645..2a486c0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -15,38 +15,37 @@ * along with this program. If not, see . */ use lazy_static::lazy_static; -use redis_module::raw::KeyType; use redis_module::{redis_command, redis_event_handler, redis_module}; -use redis_module::{Context, NextArg, RedisResult, REDIS_OK}; +use redis_module::{NextArg, RedisResult, REDIS_OK}; +mod bucket; mod errors; #[allow(dead_code, unused_features, unused_variables)] mod mcaptcha; -mod pocket; mod utils; -use pocket::MCAPTCHA_POCKET_TYPE; +use bucket::MCAPTCHA_BUCKET_TYPE; -/// Initial allocation ammount of pocket[pocket::Pocket] +/// Initial allocation ammount of bucket[bucket::Bucket] pub const HIT_PER_SECOND: usize = 100; -/// Pocket[pocket::Pocket] type version -pub const REDIS_MCAPTCHA_POCKET_TYPE_VERSION: i32 = 1; +/// Bucket[bucket::Bucket] type version +pub const REDIS_MCAPTCHA_BUCKET_TYPE_VERSION: i32 = 1; pub const PKG_NAME: &str = "mcap"; pub const PKG_VERSION: usize = 1; -/// pocket timer key prefix -// PREFIX_POCKET_TIMER is used like this: -// PREFIX_POCKET_TIMER:PREFIX_POCKET:time(where time is variable) +/// bucket timer key prefix +// PREFIX_BUCKET_TIMER is used like this: +// PREFIX_BUCKET_TIMER:PREFIX_BUCKET:time(where time is variable) // It contains PKG_NAME and key hash tag for node pinning // so, I guess it's okay for us to just use timer and not enfore pinning // and PKG_NAME -pub const PREFIX_POCKET_TIMER: &str = "timer:"; +pub const PREFIX_BUCKET_TIMER: &str = "timer:"; -/// If pockets perform clean up at x instant, then pockets themselves will get cleaned -/// up at x + POCKET_EXPIRY_OFFSET(if they haven't already been cleaned up) -pub const POCKET_EXPIRY_OFFSET: u64 = 30; +/// If buckets perform clean up at x instant, then buckets themselves will get cleaned +/// up at x + BUCKET_EXPIRY_OFFSET(if they haven't already been cleaned up) +pub const BUCKET_EXPIRY_OFFSET: u64 = 30; lazy_static! { /// node unique identifier, useful when running in cluster mode @@ -57,42 +56,19 @@ lazy_static! { }; /// counter/captcha key prefix pub static ref PREFIX_COUNTER: String = format!("{}:captcha:{}:", PKG_NAME, *ID); - /// pocket key prefix - pub static ref PREFIX_POCKET: String = format!("{}:pocket:{{{}}}:", PKG_NAME, *ID); -} - -fn get(ctx: &Context, args: Vec) -> RedisResult { - let mut args = args.into_iter().skip(1); - let key_name = args.next_string()?; - let key_name = utils::get_captcha_key(&key_name); - - let stored_captcha = ctx.open_key(&key_name); - if stored_captcha.key_type() == KeyType::Empty { - return errors::CacheError::new(format!("key {} not found", key_name)).into(); - } - - Ok(stored_captcha.read()?.unwrap().into()) -} - -fn counter_create(ctx: &Context, args: Vec) -> RedisResult { - let mut args = args.into_iter().skip(1); - // mcaptcha captcha key name - let key_name = args.next_string()?; - // expiry - let duration = args.next_u64()?; - pocket::Pocket::increment(ctx, duration, &key_name)?; - REDIS_OK + /// bucket key prefix + pub static ref PREFIX_BUCKET: String = format!("{}:bucket:{{{}}}:", PKG_NAME, *ID); } redis_module! { name: "mcaptcha_cahce", version: PKG_VERSION, - data_types: [MCAPTCHA_POCKET_TYPE,], + data_types: [MCAPTCHA_BUCKET_TYPE,], commands: [ - ["mcaptcha_cache.count", counter_create, "write", 1, 1, 1], - ["mcaptcha_cache.get", get, "readonly", 1, 1, 1], + ["mcaptcha_cache.count", bucket::Bucket::counter_create, "write", 1, 1, 1], + ["mcaptcha_cache.get", bucket::Bucket::get, "readonly", 1, 1, 1], ], event_handlers: [ - [@EXPIRED @EVICTED: pocket::Pocket::on_delete], + [@EXPIRED @EVICTED: bucket::Bucket::on_delete], ] } diff --git a/src/mcaptcha.rs b/src/mcaptcha.rs index 3c457e5..6c64b1c 100644 --- a/src/mcaptcha.rs +++ b/src/mcaptcha.rs @@ -18,7 +18,7 @@ use redis_module::native_types::RedisType; use redis_module::raw; use serde::{Deserialize, Serialize}; -use crate::pocket::Format; +use crate::bucket::Format; const REDIS_MCPATCHA_MCAPTCHA_TYPE_VERSION: i32 = 1; @@ -59,7 +59,7 @@ impl MCaptcha { } } -pub static MCAPTCHA_POCKET_TYPE: RedisType = RedisType::new( +pub static MCAPTCHA_MCAPTCHA_TYPE: RedisType = RedisType::new( "mcaptmcapa", REDIS_MCPATCHA_MCAPTCHA_TYPE_VERSION, raw::RedisModuleTypeMethods { diff --git a/src/utils.rs b/src/utils.rs index 24cfc28..b16d78b 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -21,27 +21,27 @@ use crate::*; #[inline] /// duration in seconds -pub fn get_pocket_name(pocket_instant: u64) -> String { - format!("{}{}", &*PREFIX_POCKET, pocket_instant) +pub fn get_bucket_name(bucket_instant: u64) -> String { + format!("{}{}", &*PREFIX_BUCKET, bucket_instant) } #[inline] /// duration in seconds -pub fn get_timer_name_from_pocket_name(pocket_name: &str) -> String { - format!("{}{}", &*PREFIX_POCKET_TIMER, pocket_name) +pub fn get_timer_name_from_bucket_name(bucket_name: &str) -> String { + format!("{}{}", &*PREFIX_BUCKET_TIMER, bucket_name) } #[inline] /// duration in seconds -pub fn get_pocket_name_from_timer_name(name: &str) -> Option<&str> { - // PREFIX_POCKET_TIMER doesn't have node unique crate::ID +pub fn get_bucket_name_from_timer_name(name: &str) -> Option<&str> { + // PREFIX_BUCKET_TIMER doesn't have node unique crate::ID // this way, even if we are loading keys of a different instance, well - // get POCKET keys from whatever TIMER is expiring - name.strip_prefix(&*PREFIX_POCKET_TIMER) + // get BUCKET keys from whatever TIMER is expiring + name.strip_prefix(&*PREFIX_BUCKET_TIMER) } #[inline] -pub fn get_pocket_instant(duration: u64) -> CacheResult { +pub fn get_bucket_instant(duration: u64) -> CacheResult { match SystemTime::now().duration_since(UNIX_EPOCH) { Ok(val) => Ok(val.as_secs() + duration), Err(_) => Err(CacheError::new("SystemTime before UNIX EPOCH!".into())), @@ -54,8 +54,8 @@ pub fn get_captcha_key(name: &str) -> String { } #[inline] -pub fn is_pocket_timer(name: &str) -> bool { - name.contains(&*PREFIX_POCKET_TIMER) +pub fn is_bucket_timer(name: &str) -> bool { + name.contains(&*PREFIX_BUCKET_TIMER) } #[cfg(test)] @@ -64,13 +64,13 @@ mod tests { #[test] fn timer_name_works() { - const POCKET_INSTANT: u64 = 12345678; - let pocket_name: String = get_pocket_name(POCKET_INSTANT); + const BUCKET_INSTANT: u64 = 12345678; + let bucket_name: String = get_bucket_name(BUCKET_INSTANT); - let timer_name = get_timer_name_from_pocket_name(&pocket_name); + let timer_name = get_timer_name_from_bucket_name(&bucket_name); assert_eq!( - get_pocket_name_from_timer_name(&timer_name), - Some(pocket_name.as_str()) + get_bucket_name_from_timer_name(&timer_name), + Some(bucket_name.as_str()) ); } } diff --git a/tests/test.py b/tests/test.py index 2507ea3..d13aec1 100755 --- a/tests/test.py +++ b/tests/test.py @@ -1,6 +1,7 @@ #!/bin/env python3 from time import sleep +from threading import Thread from redis.client import Redis from redis import BlockingConnectionPool @@ -12,10 +13,6 @@ COMMANDS = { "GET" : "mcaptcha_cache.get", } - -KEY = "testing_module" -TIME = 20 - def connect(): r = Redis(connection_pool=BlockingConnectionPool(max_connections=2)) r.from_url(REDIS_URL) @@ -37,33 +34,50 @@ def get_count(key): except: return 0 -def race(count): - for _ in range(count): - incr(KEY, TIME) - -def assert_count(expect): - count = get_count(KEY) +def assert_count(expect, key): + count = get_count(key) assert count == expect +def incr_one_works(): + key = "incr_one" + time = 2 + initial_count = get_count(key) + # incriment + incr(key, time) + assert_count(initial_count + 1, key) + # wait till expiry + sleep(time + 2) + assert_count(initial_count, key) + print("Incr one works") + +def race_works(): + key = "race_works" + initial_count = get_count(key) + race_num = 200 + time = 3 + + for _ in range(race_num): + incr(key, time) + assert_count(initial_count + race_num, key) + # wait till expiry + sleep(time + 2) + assert_count(initial_count, key) + print("Race works") + + def main(): # check connectivity ping() - # get initial count(residual) - initial_count = get_count(KEY) - # incriment - incr(KEY, TIME) - assert_count(initial_count + 1) - # wait till expiry - sleep(TIME + 4) - assert_count(initial_count) - # increment by 200 - race_num = 200 - race(race_num) - assert_count(initial_count + race_num) - # wait till expiry - sleep(TIME + 4) - assert_count(initial_count) - print("All good") + + t1 = Thread(target=incr_one_works) + t2 = Thread(target=race_works) + + t1.start() + t2.start() + t1.join() + t2.join() + + print("All tests passed") if __name__ == "__main__":