fix: terminate demo user job cleanly

This commit is contained in:
Aravinth Manivannan 2024-01-04 23:24:36 +05:30
parent 13c3066b86
commit e0d6188853
No known key found for this signature in database
GPG key ID: F8F50389936984FF
2 changed files with 42 additions and 14 deletions

View file

@ -8,6 +8,7 @@ use std::time::Duration;
use actix::clock::sleep; use actix::clock::sleep;
use actix::spawn; use actix::spawn;
use tokio::sync::oneshot::{channel, error::TryRecvError, Receiver, Sender};
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use crate::api::v1::account::delete::runners::delete_user; use crate::api::v1::account::delete::runners::delete_user;
@ -23,20 +24,21 @@ pub const DEMO_USER: &str = "aaronsw";
pub const DEMO_PASSWORD: &str = "password"; pub const DEMO_PASSWORD: &str = "password";
pub struct DemoUser { pub struct DemoUser {
handle: JoinHandle<()>, tx: Sender<()>,
} }
impl DemoUser { impl DemoUser {
pub async fn spawn(data: AppData, duration: Duration) -> ServiceResult<Self> { pub async fn spawn(data: AppData, duration: u32) -> ServiceResult<(Self, JoinHandle<()>)> {
let handle = Self::run(data, duration).await?; let (tx, rx) = channel();
let d = Self { handle }; let handle = Self::run(data, duration, rx).await?;
let d = Self { tx };
Ok(d) Ok((d, handle))
} }
#[allow(dead_code)] #[allow(dead_code)]
pub fn abort(&self) { pub fn abort(mut self) {
self.handle.abort(); self.tx.send(());
} }
/// register demo user runner /// register demo user runner
@ -71,19 +73,42 @@ impl DemoUser {
pub async fn run( pub async fn run(
data: AppData, data: AppData,
duration: Duration, duration: u32,
mut rx: Receiver<()>,
) -> ServiceResult<JoinHandle<()>> { ) -> ServiceResult<JoinHandle<()>> {
Self::register_demo_user(&data).await?; Self::register_demo_user(&data).await?;
fn can_run(rx: &mut Receiver<()>) -> bool {
match rx.try_recv() {
Err(TryRecvError::Empty) => true,
_ => false,
}
}
let mut exit = false;
let fut = async move { let fut = async move {
loop { loop {
sleep(duration).await; if exit {
break;
}
for _ in 0..duration {
if can_run(&mut rx) {
sleep(Duration::new(1, 0)).await;
continue;
} else {
exit = true;
break;
}
}
if let Err(e) = Self::delete_demo_user(&data).await { if let Err(e) = Self::delete_demo_user(&data).await {
log::error!("Error while deleting demo user: {:?}", e); log::error!("Error while deleting demo user: {:?}", e);
} }
if let Err(e) = Self::register_demo_user(&data).await { if let Err(e) = Self::register_demo_user(&data).await {
log::error!("Error while registering demo user: {:?}", e); log::error!("Error while registering demo user: {:?}", e);
} }
} }
}; };
let handle = spawn(fut); let handle = spawn(fut);
@ -133,7 +158,7 @@ mod tests {
assert!(!username_exists(&payload, &data).await.unwrap().exists); assert!(!username_exists(&payload, &data).await.unwrap().exists);
// test the runner // test the runner
let user = DemoUser::spawn(data, duration).await.unwrap(); let user = DemoUser::spawn(data, DURATION as u32).await.unwrap();
let (_, signin_resp, token_key) = let (_, signin_resp, token_key) =
add_levels_util(data_inner, DEMO_USER, DEMO_PASSWORD).await; add_levels_util(data_inner, DEMO_USER, DEMO_PASSWORD).await;
let cookies = get_cookie!(signin_resp); let cookies = get_cookie!(signin_resp);
@ -162,6 +187,7 @@ mod tests {
assert_eq!(resp.status(), StatusCode::OK); assert_eq!(resp.status(), StatusCode::OK);
let res_levels: Vec<Level> = test::read_body_json(resp).await; let res_levels: Vec<Level> = test::read_body_json(resp).await;
assert!(res_levels.is_empty()); assert!(res_levels.is_empty());
user.abort(); user.0.abort();
user.1.await.unwrap();
} }
} }

View file

@ -12,6 +12,7 @@ use actix_web::{
error::InternalError, http::StatusCode, middleware as actix_middleware, error::InternalError, http::StatusCode, middleware as actix_middleware,
web::JsonConfig, App, HttpServer, web::JsonConfig, App, HttpServer,
}; };
use tokio::task::JoinHandle;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use log::info; use log::info;
@ -110,11 +111,11 @@ async fn main() -> std::io::Result<()> {
let data = Data::new(&settings, secrets.clone()).await; let data = Data::new(&settings, secrets.clone()).await;
let data = actix_web::web::Data::new(data); let data = actix_web::web::Data::new(data);
let mut demo_user: Option<DemoUser> = None; let mut demo_user: Option<(DemoUser, JoinHandle<()>)> = None;
if settings.allow_demo && settings.allow_registration { if settings.allow_demo && settings.allow_registration {
demo_user = Some( demo_user = Some(
DemoUser::spawn(data.clone(), Duration::from_secs(60 * 30)) DemoUser::spawn(data.clone(), 60 * 30)
.await .await
.unwrap(), .unwrap(),
); );
@ -156,7 +157,8 @@ async fn main() -> std::io::Result<()> {
} }
if let Some(demo_user) = demo_user { if let Some(demo_user) = demo_user {
demo_user.abort(); demo_user.0.abort();
demo_user.1.await.unwrap();
} }
if let Some(survey_upload_handle) = survey_upload_handle { if let Some(survey_upload_handle) = survey_upload_handle {