From e0d6188853e089428ff9de64b3941bcebf679890 Mon Sep 17 00:00:00 2001 From: Aravinth Manivannan Date: Thu, 4 Jan 2024 23:24:36 +0530 Subject: [PATCH] fix: terminate demo user job cleanly --- src/demo.rs | 48 +++++++++++++++++++++++++++++++++++++----------- src/main.rs | 8 +++++--- 2 files changed, 42 insertions(+), 14 deletions(-) diff --git a/src/demo.rs b/src/demo.rs index 54da1f3c..66a2f562 100644 --- a/src/demo.rs +++ b/src/demo.rs @@ -8,6 +8,7 @@ use std::time::Duration; use actix::clock::sleep; use actix::spawn; +use tokio::sync::oneshot::{channel, error::TryRecvError, Receiver, Sender}; use tokio::task::JoinHandle; use crate::api::v1::account::delete::runners::delete_user; @@ -23,20 +24,21 @@ pub const DEMO_USER: &str = "aaronsw"; pub const DEMO_PASSWORD: &str = "password"; pub struct DemoUser { - handle: JoinHandle<()>, + tx: Sender<()>, } impl DemoUser { - pub async fn spawn(data: AppData, duration: Duration) -> ServiceResult { - let handle = Self::run(data, duration).await?; - let d = Self { handle }; + pub async fn spawn(data: AppData, duration: u32) -> ServiceResult<(Self, JoinHandle<()>)> { + let (tx, rx) = channel(); + let handle = Self::run(data, duration, rx).await?; + let d = Self { tx }; - Ok(d) + Ok((d, handle)) } #[allow(dead_code)] - pub fn abort(&self) { - self.handle.abort(); + pub fn abort(mut self) { + self.tx.send(()); } /// register demo user runner @@ -71,19 +73,42 @@ impl DemoUser { pub async fn run( data: AppData, - duration: Duration, + duration: u32, + mut rx: Receiver<()>, ) -> ServiceResult> { Self::register_demo_user(&data).await?; + fn can_run(rx: &mut Receiver<()>) -> bool { + match rx.try_recv() { + Err(TryRecvError::Empty) => true, + _ => false, + } + } + + let mut exit = false; let fut = async move { loop { - sleep(duration).await; + if exit { + break; + } + for _ in 0..duration { + if can_run(&mut rx) { + sleep(Duration::new(1, 0)).await; + continue; + } else { + exit = true; + break; + } + } + if let Err(e) = Self::delete_demo_user(&data).await { log::error!("Error while deleting demo user: {:?}", e); } + if let Err(e) = Self::register_demo_user(&data).await { log::error!("Error while registering demo user: {:?}", e); } + } }; let handle = spawn(fut); @@ -133,7 +158,7 @@ mod tests { assert!(!username_exists(&payload, &data).await.unwrap().exists); // test the runner - let user = DemoUser::spawn(data, duration).await.unwrap(); + let user = DemoUser::spawn(data, DURATION as u32).await.unwrap(); let (_, signin_resp, token_key) = add_levels_util(data_inner, DEMO_USER, DEMO_PASSWORD).await; let cookies = get_cookie!(signin_resp); @@ -162,6 +187,7 @@ mod tests { assert_eq!(resp.status(), StatusCode::OK); let res_levels: Vec = test::read_body_json(resp).await; assert!(res_levels.is_empty()); - user.abort(); + user.0.abort(); + user.1.await.unwrap(); } } diff --git a/src/main.rs b/src/main.rs index c96b475c..ddc4aa2e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,6 +12,7 @@ use actix_web::{ error::InternalError, http::StatusCode, middleware as actix_middleware, web::JsonConfig, App, HttpServer, }; +use tokio::task::JoinHandle; use lazy_static::lazy_static; use log::info; @@ -110,11 +111,11 @@ async fn main() -> std::io::Result<()> { let data = Data::new(&settings, secrets.clone()).await; let data = actix_web::web::Data::new(data); - let mut demo_user: Option = None; + let mut demo_user: Option<(DemoUser, JoinHandle<()>)> = None; if settings.allow_demo && settings.allow_registration { demo_user = Some( - DemoUser::spawn(data.clone(), Duration::from_secs(60 * 30)) + DemoUser::spawn(data.clone(), 60 * 30) .await .unwrap(), ); @@ -156,7 +157,8 @@ async fn main() -> std::io::Result<()> { } if let Some(demo_user) = demo_user { - demo_user.abort(); + demo_user.0.abort(); + demo_user.1.await.unwrap(); } if let Some(survey_upload_handle) = survey_upload_handle {