Async::task is creating multiple tasks which os is unable to recognize

So we have this task at our school:

this requires us to use single thread and process. We can see that the PID is same so we are thinking that it is showing multiple tasks under the same process, tasks created by async_std::task::spawn as described in rust documentation.

here is the snippet of code that does that:

    zero_path_buf: PathBuf,
    server_configs: Vec<ServerConfig>,
) -> Result<(), Box<dyn Error>> {
    let ports = match get_usize_unique_ports(&server_configs).await {
        Ok(ports) => ports,
        Err(e) => {
            eprintln!("ERROR: Failed to get ports: {}", e);
            return Err("Failed to get ports".into());
        }
    };

    let server_address = "0.0.0.0";

    for port in ports.clone() {
        let addr: SocketAddr = match format!("{}:{}", server_address, port).parse() {
            Ok(v) => v,
            Err(e) => {
                eprintln!("ERROR: Failed to parse 0.0.0.0:port into SocketAddr: {}", e);
                return Err("Failed to parse 0.0.0.0:port into SocketAddr".into());
            }
        };

        let listener = match TcpListener::bind(addr).await {
            Ok(v) => v,
            Err(e) => {
                eprintln!("ERROR: Failed to bind addr: {}", e);
                return Err("Failed to bind addr".into());
            }
        };

        append_to_file(&format!("addr {}", addr)).await;

        let zero_path_buf = zero_path_buf.clone();
        let server_configs = server_configs.clone();

        task::spawn(async move {
            let cookies_storage: Arc<Mutex<HashMap<String, Cookie>>> =
                Arc::new(Mutex::new(HashMap::new()));

            listener
                .incoming()
                .for_each_concurrent(None, |stream| async {
                    let mut stream = match stream {
                        Ok(v) => v,
                        Err(e) => {
                            eprintln!("ERROR: Failed to get stream: {}", e);
                            return;
                        }
                    };

                    let mut server = Server {
                        cookies: cookies_storage.clone(),
                        cookies_check_time: SystemTime::now() + Duration::from_secs(60),
                    };

                    let mut headers_buffer: Vec<u8> = Vec::new();
                    let mut body_buffer: Vec<u8> = Vec::new();
                    let mut global_error_string = Status::Ok.to_string();

                    let mut response: Response<Vec<u8>> = Response::new(Vec::new());
                    let timeout = Duration::from_millis(3000);

                    let choosen_server_config = read_with_timeout(
                        timeout,
                        &mut stream,
                        &mut headers_buffer,
                        &mut body_buffer,
                        &server_configs,
                        &mut global_error_string,
                    )
                    .await;

                    let mut request = Request::new(Vec::new());
                    if global_error_string == Status::Ok.to_string() {
                        parse_raw_request(
                            headers_buffer,
                            body_buffer,
                            &mut request,
                            &mut global_error_string,
                        )
                        .await;
                    }

                    server.check_expired_cookies().await;

                    let (cookie_value, cookie_is_ok) = server
                        .extract_cookies_from_request_or_provide_new(&request)
                        .await;

                    if !cookie_is_ok {
                        global_error_string = Status::Invalid_Cookie.to_string();
                    }

                    if global_error_string == Status::Ok.to_string() {
                        response = handle_request(
                            &request,
                            cookie_value.clone(),
                            &zero_path_buf,
                            choosen_server_config.clone(),
                            &mut global_error_string,
                        )
                        .await;
                    }

                    check_custom_errors(
                        global_error_string,
                        &request,
                        cookie_value.clone(),
                        &zero_path_buf,
                        choosen_server_config.clone(),
                        &mut response,
                    )
                    .await;

                    match write_response_into_stream(&mut stream, response).await {
                        Ok(_) => {}
                        Err(e) => {
                            eprintln!("ERROR: Failed to write response into stream: {}", e);
                            return;
                        }
                    };

                    match stream.flush().await {
                        Ok(_) => {}
                        Err(e) => {
                            eprintln!("ERROR: Failed to flush stream: {}", e);
                            return;
                        }
                    };

                    match stream.shutdown(std::net::Shutdown::Both) {
                        Ok(_) => {}
                        Err(e) => {
                            eprintln!("ERROR: Failed to shutdown stream: {}", e);
                        }
                    }
                })
                .await;
        });
    }
    println!("Server is listening at below http://ip:port pairs:");

    let all_listeners: Vec<ServerListeningAt> = get_server_listening_ats(&server_configs).await;
    for server_listening_at in all_listeners {
        println!(
            "http://{}:{}       {}",
            server_listening_at.server_address,
            server_listening_at.ports.join(", "),
            server_listening_at.server_name
        );
    }
    async_std::task::sleep(Duration::from_secs(u64::MAX)).await;
    Ok(())
}

Now you can see that we are spawning multiple tasks but not multiple threads.

However, there is a group of students who want to fail us on this. Should we fail?

async tasks are definitely not counted as threads, but from the linked readme, I think the intention of this problem is to learn how to do IO with epoll or equivalent system calls, and it explicitly prohibit "crates that already implement server features like tokio or nix":

and personally I would consider any async task scheduler/executor/runtime (tokio, smol, async-std, bastion, etc) not in the spirit of the project.

so, I think it should fail.

3 Likes

Pretty bold asking us to tell you what your instructor should grade you. Other students don't decide if you fail. If you're afraid your instructor isn't going to agree with you, it's a bad sign.

Anyway, you clearly didn't use only one thread or, more importantly, learn anything about non-blocking syscalls. Do you know how many times it's calling epoll per communication? You should be able to answer that just from having written the code, without even running it. This assignment seems like it's meant to teach you how you would write something like Tokio or async-std from scratch, not how you use them.

3 Likes

Thank you. WE created epoll on Mac as we are using Mac. We created epoll implementation for async ourselves. This is not the full code, just a portion.

I want to thank both of you for replying promptly. That confirmed to my colleague that no multi-threading was there. Although I tried to convince him that this does not create new os threads, he also lacked the knowledge base to know what async is or how one can make their own polling solution if one can do some system calls.

We are in a cohort/bootcamp named Gritlab.ax and here there are no teachers, so this had to be settled between students only. The real spirit of the tasks I believe is to learn from each other here.

that's not necessarily entirely true. an async task is not an OS thread, they run on top of the async runtime, but the async runtime itself can be multithreaded, it depends on how you use the API, and also depends on runtime configurations.

for example, in async-std, if you use the default runtime (annotate with async_std::main attribute), the actual worker threads are determined at runtime by the environment variable ASYNC_STD_THREAD_COUNT, and default to the number of CPU cores when the environment variable is not configured.

it also depends on the API you use. e.g. task::spawn() will spawn the future onto a global scheduler, while task::spawn_local() will spawn it onto the scheduler for the calling thread. you can fine tune the worker threads using other APIs, such as task::block_on().

spawning an async task doesn't create new OS threads, but "not creating new threads" is the not same as "no multi-threading was there".

still, I read the problem description as you should write your own multplexer/scheduler based on the low level primitive -- the epoll syscall -- instead of just write an socket server on top of an readily made scheduler. so I don't think your solution should qualify.

that's really a good way to learn stuff. kudos to all of you.

2 Likes

I should clarify that, I'm not saying async rust should be avoided, after all async is just sugar for Future. my point is, you should build your own async scheduler (and preferably reactor) based on epoll or equivalent system calls, and spawn async tasks on your own runtime instead of async-std. by doing this, you'll have through understanding how the async ecosystem is constructed.

1 Like

I think the confusion is why the assignment says to only use one thread. That was put in there to ensure you actually write non-blocking code. To fulfill the other requirements, a multithreaded implementation may or may not be non-blocking, but a singlethreaded one would have to be non-blocking.

This code doesn't actually need to spawn anything. You could put them all in a FuturesUnordered and await them at the end instead of the very long sleep.

If TcpListener and its corresponding stream type were written by yourself and don't use anything from async-std, then you've learned what was intended.

2 Likes

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.