New with async: how to structure application?

Hiya all,

I'm pretty new to async but wanted to give it a spin now that it is stableized and the crates are getting there. The possibilities are still a bit overwhelming so I'd like some pointers how to structure my app.
Here's it's job:

  • core function is to manage selected (e.g., but not limited to, systemd) services
    • start, kill & restart
    • get current status and log output
  • users (managers) connect over the net hosts to call these functions via jsonrpc
    • later i'll write a nice webgui for this, not the scope of the app

The app should keep the current state of the system up to date on its own (for systemd I think dbus can notify me).

Now in a non-async-world I would start with the http server, let it create a thread for each client, and have a central thread that takes requests from them (via channels, probably). The central thread then has a loop to answer requests. Then probably another thread per service to handle updating its state in the central data structure.

In the async world I am stuck a bit. The http/jsonrpc part looks easy, just a hyper server which takes care of everything once I spawn it.

But how to structure the central "thread" that actually runs everything? Is that another task? I need some objects that represent each service, and have methods like start. But how to call them from the server's rpc handler functions? It seems to me that everything will need to be readonly in Arc<X> or have a Mutex which doesn't sound very async?

I'll be glad for any ideas and pointers to existing similar apps in structure... Thanks much!

One thing to keep in mind with async is that if your central task is CPU bound (aka very little waiting on IO), it can starve the hyper server tasks as the executor gives the central task all the time it wants to run to completion.

I ran into this issue when I built a small full-text search engine with futures 0.1 with a HTTP front-end. I found the HTTP server side was ignored while the search task was busy (~4 seconds per search), but enough that I needed separate threads to run the searches while connections were accepted asynchronously.

I used two channels to send values from an HTTP request from my async thread to my search thread and back. The nice thing about channels is the async thread can await the receiver-side for the search results. (See futures.channel in docs.rs). You're effectively moving values across threads via channels in this case.

If you're sharing data across threads, then you can use either global 'static variables, or Arc<X>. And if you need to mutate data, then you now have a futures.lock.Mutex for use between aync tasks to complicate things further.

Thanks for the reply!

I thought the tokio runtime had multiple threads to run futures/tasks (which one is correct terminlogy?) on? Then only one of them should be busy with the cpu-bound one...

Ok, so using channels is still a good thing...

Tokio supports both single and multi-threaded executors. tokio::runtime - Rust

Certainly. But if that cpu-bound task is receiving work from the HTTP tasks, those HTTP tasks will pile up waiting for the cpu-bound task to compute and return their result. In my case, I had one thread for handling HTTP requests (accept and send to search task), and the search task would run on a multi-threaded pool so the cpu-bound work was completing as quickly and concurrently as possible. Food for thought.

Another note. I kept shared data in a registry of sorts (e.g. HashMap<&String, Mutex<Arc<SearchDomain>>>), so that when a search request came in from a client, I could send an Arc<SearchDomain> clone to the search task to perform its work. If my update task refreshed the info in the registry, it didn't interfere with the search that was already in progress. And Arc kept memory allocations to a minimum. In my case, a SearchDomain was expected to hold ~3-5 MB of text. If your client data is much smaller, then you might not be as concerned with memory allocations affecting performance.

Here's my best attempt at a very basic outline for organizing your tasks. It's based on what I've done, so tweak it as you see fit. It's not the only way to design an async app. Hope this helps.

Update: Rearrange http task to spawn sooner and avoid blocking on send_to_central.send().

fn main () -> Result<(), _> {
    let http_runtime = tokio::runtime::Builder::new()
        .basic_scheduler()
        .build()?;
        
    let central_runtime = tokio::runtime::Builder::new()
        .threaded_scheduler()
        .build()?;
        
    let (mut send_to_central, mut recv_from_http) = tokio::sync::mpsc::channel(4);
        
    std::thread::spawn(move || {
        central_runtime.block_on(async move {
            while let Some((result_to_central, data)) = recv_from_http.recv().await {
                let result = process_central_task(data);
                result_to_central.send(result);
            }
        });
    });
    
    http_runtime.block_on(async move {
        // Setup http stream.
        
        while let Some((request, socket)) = http_stream.next().await {
            // May need to clone sender for each spawn task below.
            let send_to_central = send_to_central.clone();

            // Spawn task to submit, wait for, and handle response so http_runtime
            // can accept more connections while this task is idle.
            http_runtime.spawn(async move {
                // Create new channel for central task.
                let (response_to_http, mut response_from_central) =
                    tokio::sync::oneshot::channel();
            
                let data = /* gather data needed by central task. */;
            
                // Schedule a central task, and provide a channel to relay the
                // result back.  May block
                send_to_central.send((response_to_http, data));

                // Wait for response from central.
                let response = response_from_central.await;

                // Send response to client.
                socket.write(response);

                Ok(())
            });
        }
        
        Ok(())
    });
        
    Ok(())
}

Thank you so much for the reply! I was thinking about using channels to communicate, and couldn't figure out how to let the main task know about new clients and the channel to send stuff back to them.

Just sending the response channel with the request is ingenious! Rust community really is as nice as everyone keeps saying... :slight_smile:

Do you, by chance, know if I have to make two different runtimes with tokio 0.2 as well? The builder doesn't seem to have the basic vs threaded scheduler methods anymore...

Generally if you have CPU-bound tasks that starve your other tasks, you shouldn't be putting that in a runtime designed for asynchronous code at all. Use a thread pool such as rayon to spawn and manage them. (of course you can still give the thread pool task an oneshot channel to notify on completion)

Na, I just used two to demonstrate such a process. Create your central and http tasks by spawning one (runtime.spawn(task)), then blocking on the last (runtime.block_on(task)).

Maybe something like the following example will accomplish that.

#[tokio::main]
async fn main () -> Result<(), _> {
    let (mut send_to_central, mut recv_from_http) =
        tokio::sync::mpsc::channel(4);
        
    // Central task.
    tokio::spawn(async move {
        while let Some((result_to_central, data)) = recv_from_http.recv().await {
            let result = process_central_task(data);
            result_to_central.send(result);
        }
    });
    
    // Incoming HTTP request task.  Since main is async in this example,
    // just end with the last task.

    // Setup HTTP stream.
    
    while let Some((request, socket)) = http_stream.next().await {
        // May need to clone sender for each spawn task below.
        let send_to_central = send_to_central.clone();

        // Spawn task to submit, wait for, and handle response
        // so http_runtime can accept more connections while
        // this task is idle.
        tokio::spawn(async move {
            // Create new channel for central task.
            let (response_to_http, mut response_from_central) =
                tokio::sync::oneshot::channel();
        
            let data = /* gather data needed by central task. */;
        
            // Schedule a central task, and provide a channel to relay the
            // result back.  May block
            send_to_central.send((response_to_http, data));

            // Wait for response from central.
            let response = response_from_central.await;

            // Send response to client.
            socket.write(response);
        });
    }
        
    Ok(())
}

It seems to me that the builder in v0.2.0 and v0.2.2 has both .basic_scheduler() and .threader_scheduler() methods [1] [2].

Perhaps you didn't enable the feature in your Cargo.toml?

tokio = { version = "0.2", features = [ "full" ] }

[1]: tokio::runtime - Rust
[2]: tokio::runtime - Rust

Looks like tokio has some provisions for spawning CPU-bound tasks onto a separate thread [3], but may fall short if one is looking for a thread pool with an upper bound.

[3]: tokio::task - Rust

Ah, I was looking for that functionality! I knew it existed in tokio 0.1, but I couldn't find it and assumed it had been removed with the rewrite of the executor. Using tokios methods specifically designed for blocking code is also a good place to run blocking code.

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.