Runtime dropped the dispatch task

Hi,

I am trying to run the unit tests (9 of them; via cargo test) in rust. I see that 1 test may fail at times --not consistent. However if i run the failed tests individually its successful.

Any thoughts?

---- aws_utils::tests::test_copy_s3_to_localfolder stdout ----
thread 'aws_utils::tests::test_copy_s3_to_localfolder' panicked at 'couldnt get list form aws; bucket: bucket keyjunits-rust/aws_utils/d/: DispatchFailure(DispatchFailure { source: ConnectorError { kind: User, source: hyper::Error(User(DispatchGone), "runtime dropped the dispatch task") } })', src/aws_utils.rs:65:14
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

Looks like the runtime environment is shutting off even before the API call to s3 has completed. I had double checked if I had missed any .await s in the code path, but couldn't find any misses. I am using tokio framework.

code

     #[tokio::test]
    async fn test_copy_s3_to_localfolder() {
        before_all().await;
        let path = PathBuf::from("/tmp/rust_test/aws_utils/d/1.txt");
        let path = path.parent().unwrap();
        std::fs::create_dir_all(path).unwrap();

        std::fs::write(&PathBuf::from("/tmp/rust_test/aws_utils/d/1.txt"), "Hello").unwrap();
        std::fs::write(&PathBuf::from("/tmp/rust_test/aws_utils/d/2.txt"), "Hello").unwrap();
       
        let _ = aws_utils_arc
            .get()
            .unwrap()
            .copy_folder_to_s3(
                "/tmp/rust_test/aws_utils/d/",
                "s3://bucket/junits-rust/aws_utils/d/",
            )
            .await;

        aws_utils_arc
            .get()
            .unwrap()
            .copy_s3_prefix_to_local_folder(
                "s3://bucket/junits-rust/aws_utils/d/",
                "/tmp/rust_test/aws_utils/dd/",
            )
            .await
            .unwrap();
        let str = fs::read_to_string("/tmp/rust_test/aws_utils/dd/1.txt").unwrap();
        println!("XXX XXXX downloaded string.. {}", str);
        assert!(str.len() > 1, "the message should be downloaed to local");
    }


lazy_static! {
    pub static ref initialized:OnceCell<bool>=OnceCell::new();
    pub static ref aws_utils: OnceCell<AwsUtilsSt>  =  OnceCell::new();
    pub static ref aws_utils_arc: OnceCell<Arc<&'static AwsUtilsSt>> = OnceCell::new(); 
}


    pub async fn copy_s3_prefix_to_local_folder(
        self: &Arc<&Self>,
        s3_prefix: &str,
        mut local_folder: &str,
    ) -> Result<(), Error> {
        let s3_files = self.list(s3_prefix.to_string()).await.unwrap();
        let mut handles = vec![];
        let (bucket, _) = self.get_bucket_and_path(s3_prefix).unwrap();
        /*spin a thread per file */
        for mut s3_file in s3_files {
            s3_file = format!("s3://{}/{}", bucket, s3_file);
            let mut s3_key = s3_file.strip_prefix(s3_prefix).unwrap().to_string();
            s3_key = s3_key
                .strip_prefix("/")
                .unwrap_or(&s3_key)
                .strip_suffix("/")
                .unwrap_or(&s3_key)
                .to_string();
            local_folder = local_folder.strip_suffix("/").unwrap_or(local_folder);
            let local_key = format!("{}/{}", local_folder, s3_key);

            let handle = tokio::spawn(async move {
                aws_utils_arc
                    .get()
                    .unwrap()
                    .copy_s3_file_to_local(s3_file.clone(), local_key.clone())
                    .await
                    .expect(&format!(
                        "couldnt copy from src:{} to dest:{}",
                        s3_file.clone(),
                        local_key.clone()
                    ));
            });
            handles.push(handle);
        }
        /* wait for all threads to complete */
        for handle in handles {
            handle.await;
        }
        return Ok(());
    }


When I initialize the AWS s3 client for every method, things are working fine. It's strange to know that AWS client has to be initialized for every AWS S3 call.

let shared_config = aws_config::load_from_env().await;
        let a = AwsUtilsSt {
            client: S3Client::new(&shared_config),
        };

now I do this for every operation and the use the client.

Clients for APIs are generally tied to the runtime they're created on. Since each test has its own runtime, that causes issues.

3 Likes

So, Did you mean that i can use 1 aws client in multi-thread env and still it works well?

also, any way to let all unit tests use the same runtime?

I would expect that you can use a single AWS client in a multi-threaded runtime.

Also, like you said if each unit test uses a new runtime; shouldn't we be having many AWSCLients and shouldn't the tests run well?

My point is: after I started to create a new AWS client per copy all unit tests worked perfectly. So this gives me the opinion that probably all Unit tests are having 1 runtime; and in this case, AWS clients may not be shared across threads.

Sharing across threads is fine. Sharing across runtimes is not.

You would have to do it manually. You can define a global that contains a runtime and have each test call block_on on that shared runtime.

hi,

I updated my testcases with the below code (in each of the test case);

    async fn test_get_s3_file_as_map() {
        let rt = Handle::current();
        rt.block_on(async {
        ...
        ...
        
        });

But this is giving e the error

---- aws_utils::tests::test9 stdout ----
thread '...' panicked at 'Cannot start a runtime from within a runtime. This happens because a function (like `block_on`) attempted to block the current thread while the thread is being used to drive asynchronous tasks.', src/aws_utils.rs:372:12

Any thoughts?

I have tests which run for 10-15sec with out issues. Even iff i have a test which just does thread.sleep for 60 sec --it goes fine. Only i have issues with tests involving AWS S3. Please note that my s3 client is defined as.

lazy_static! {
    static ref INIT_MU:Mutex<()>=Mutex::new(());
   pub static ref S3:OnceCell<s3::Client>=OnceCell::new();

You can use block_on from async code. Start a normal non-async test, then use block_on in there.

Hi,

I am getting the same issue with other testcases also. Individually the tests work fine. But when I run them together (by clicking on runTests in vs code on top of mod tests ) I see that some tests fail with error runtime dropped the dispatch task

I am thinking the tokio test is not waiting for all the tests to complete. I tried to find if there is any configuration at global or mod test level. but no luck. Below is the new code

Btw, I didn't know how to use block_on I am new to rust.

mod test {
    use crate::{
        aws_utils::AwsUtilsSt,
        constants::fsconstants::FsConstantsSt,
        inits,
        services::{absorber::AbsorberSt, emitter::EmitterSt, emittermanager::EmitterManagerSt},
    };

    use super::*;

    async fn before_all() {
        inits::init_loggers();
        FsConstantsSt::init().await;
    }

    #[tokio::test]
    pub async fn test_read_write() {
        before_all().await;
        let ctx = Ctx::new_from_path("/opt/deployment/ctx_arr.json".to_string()).await.get(0).unwrap().to_owned();
        let sch = SchedulerSt::new().await;
        sch.write_objects(
            &SchedulerRequest {

                objects: &vec!["abc".to_string()],
                rack: "r1".to_string(),
            },
            ctx.clone(),
        )
        .await
        .unwrap();
        let result = sch.read_objects(2, 2, ctx).await.unwrap();
        info!("test read write result:: {:?}", result);
        assert!(result.contains(&"abc".to_string()), "abc should be found");
    }

    #[tokio::test]
    pub async fn test_read_write_racks() {
        before_all().await;
        let ctx = Ctx::new_from_path("/opt/deployment/ctx_arr.json".to_string()).await.get(0).unwrap().to_owned();
        SchedulerSt::write_rack("r1".to_string(), &ctx).await;
        let result = SchedulerSt::get_racks(&ctx).await;

        info!("test read write racks result --> :: {:?}", result);
        assert!(result.contains(&"abc".to_string()), "abc should be found");
    }

however,

the below code works without any issues. If I run all tests in this file. all of them run without issues. So I am not sure if adding async ... may result into error runtime dropped the dispatch task


mod test {
    use std::{thread::sleep, time::Duration};

    #[tokio::test]
    async fn it_works1() {
        sleep(Duration::from_secs(10));
        assert_eq!(4, 4);
    }
    #[tokio::test]
    async fn it_works2() {
        sleep(Duration::from_secs(10));
        assert_eq!(4, 4);
    }
    #[tokio::test]
    async fn it_works3() {
        sleep(Duration::from_secs(10));
        assert_eq!(4, 4);
    }
    #[tokio::test]
    async fn it_works4() {
        sleep(Duration::from_secs(10));
        assert_eq!(4, 4);
    }
}

Of course, since the tests are independent - they are creating each its own runtime, which doesn't use any common resources and therefore doesn't interfere with other runtimes.

in that case why is rust not waiting till all such runtimes are fully done? Had it waited, I would not have got this error

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.