Tokio use advice request

My async app (Tokio runtime) is built upon 5 concurrent/parallel threads/processes communicating (within their respective loops) over Tcp sockets with each other; I'm using the tmq crate (= async zmq). This app communicates with potentially a high number of of user threads each of which communicate (also through tmq) with one app thread. All threads (app and user) are not cpu intensive — speed is paramount.

My questions are:

  1. Should app and user thteads be managed by the same Tokio runtime? Or is it better for them to (a) use blocking threads, or (b) OS threads?
  2. If using a single runtime, how do I start these processes (loops) simultaneously (after starting all, async-ly, if I await the future of the first thread, the second never enters the loop...)?
  3. What's the best document dealing with multi-threaded Tokio apps (rather than IO-heavy web servers)?

Thanks.

Hi @stustd! is the whole system on on machine or distributed across multiple machines? I'd recommend one tokio runtime per machine - I don't know of advantages of running multiple runtimes.

If the processes are not cpu intensive, it seems like a good fit for tokio tasks. They are inexpensive to spawn (compared to threads).

If using a single runtime, you can spawn the processes using tokio::task::spawn and use the join handles returned from spawn to wait for them all to finish (like in this example) or use select to determine when the first finishes. join! and select! are a few options of what you can do with the join handle - what you end up doing will depend on your application.

I've had good luck with the tokio docs, I'm not sure what other resources are out there, maybe others will chime in :smile:

Should app and user thteads be managed by the same Tokio runtime? Or is it better for them to (a) use blocking threads, or (b) OS threads?

tokio uses OS threads, there is a little difference whether you're managing it or tokio

If using a single runtime, how do I start these processes (loops) simultaneously (after starting all, async-ly, if I await the future of the first thread, the second never enters the loop...)?

You can start each in own thread or run each as new instance of program (if you want distributed system)
But if you want your instances to be run on separate threads then probably just use multi threaded runtime

What's the best document dealing with multi-threaded Tokio apps (rather than IO-heavy web servers)?

Thanks @danbruder and @DoumanAsh. Spawning my application_manager (appmngr)

 let f_am = tokio::spawn(async move {
            process::appmngr::run(
                context.clone(), config.clone()).await;
        });

now produces the following error:

57  |         let f_am = tokio::spawn(async move {
    |                    ^^^^^^^^^^^^^^ `*mut std::ffi::c_void` cannot be shared between threads safely
    | 
   ::: /home/hjansen/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.20/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ---- required by this bound in `tokio::task::spawn::spawn`
    |
    = help: within `tmq::socket_types::request_reply::RequestReceiver`, the trait `std::marker::Sync` is not implemented for `*mut std::ffi::c_void`
    = note: required because it appears within the type `zmq::Socket`
    = note: required because it appears within the type `tmq::socket::SocketWrapper`
    = note: required because it appears within the type `std::option::Option<tmq::socket::SocketWrapper>`
    = note: required because it appears within the type `tokio::io::poll_evented::PollEvented<tmq::socket::SocketWrapper>`
    = note: required because it appears within the type `tmq::poll::ZmqPoller`
    = note: required because it appears within the type `tmq::socket_types::request_reply::RequestReceiver`
    = note: required because of the requirements on the impl of `std::marker::Send` for `&tmq::socket_types::request_reply::RequestReceiver`
    = note: required because it appears within the type `[closure@tmq::socket_types::request_reply::RequestReceiver::recv::{{closure}}#0::{{closure}}#0 0:&tmq::socket_types::request_reply::RequestReceiver]`
    = note: required because it appears within the type `for<'r, 's> {std::future::ResumeTy, tmq::socket_types::request_reply::RequestReceiver, [closure@tmq::socket_types::request_reply::RequestReceiver::recv::{{closure}}#0::{{closure}}#0 0:&'r tmq::socket_types::request_reply::RequestReceiver], futures_util::future::poll_fn::PollFn<[closure@tmq::socket_types::request_reply::RequestReceiver::recv::{{closure}}#0::{{closure}}#0 0:&'s tmq::socket_types::request_reply::RequestReceiver]>, ()}`
    = note: required because it appears within the type `[static generator@tmq::socket_types::request_reply::RequestReceiver::recv::{{closure}}#0 0:tmq::socket_types::request_reply::RequestReceiver for<'r, 's> {std::future::ResumeTy, tmq::socket_types::request_reply::RequestReceiver, [closure@tmq::socket_types::request_reply::RequestReceiver::recv::{{closure}}#0::{{closure}}#0 0:&'r tmq::socket_types::request_reply::RequestReceiver], futures_util::future::poll_fn::PollFn<[closure@tmq::socket_types::request_reply::RequestReceiver::recv::{{closure}}#0::{{closure}}#0 0:&'s tmq::socket_types::request_reply::RequestReceiver]>, ()}]`
    = note: required because it appears within the type `std::future::from_generator::GenFuture<[static generator@tmq::socket_types::request_reply::RequestReceiver::recv::{{closure}}#0 0:tmq::socket_types::request_reply::RequestReceiver for<'r, 's> {std::future::ResumeTy, tmq::socket_types::request_reply::RequestReceiver, [closure@tmq::socket_types::request_reply::RequestReceiver::recv::{{closure}}#0::{{closure}}#0 0:&'r tmq::socket_types::request_reply::RequestReceiver], futures_util::future::poll_fn::PollFn<[closure@tmq::socket_types::request_reply::RequestReceiver::recv::{{closure}}#0::{{closure}}#0 0:&'s tmq::socket_types::request_reply::RequestReceiver]>, ()}]>`
    = note: required because it appears within the type `impl std::future::Future`
    = note: required because it appears within the type `impl std::future::Future`
    = note: required because it appears within the type `{std::future::ResumeTy, std::sync::Arc<zmq::Context>, std::sync::Arc<config::Config>, tmq::socket_types::request_reply::RequestReceiver, impl std::future::Future, (), tmq::message::Multipart, tmq::socket_types::request_reply::RequestSender, impl std::future::Future}`
    = note: required because it appears within the type `[static generator@src/bin/sxsim/control/process/appmngr.rs:28:26: 50:2 context:std::sync::Arc<zmq::Context>, config:std::sync::Arc<config::Config> {std::future::ResumeTy, std::sync::Arc<zmq::Context>, std::sync::Arc<config::Config>, tmq::socket_types::request_reply::RequestReceiver, impl std::future::Future, (), tmq::message::Multipart, tmq::socket_types::request_reply::RequestSender, impl std::future::Future}]`
    = note: required because it appears within the type `std::future::from_generator::GenFuture<[static generator@src/bin/sxsim/control/process/appmngr.rs:28:26: 50:2 context:std::sync::Arc<zmq::Context>, config:std::sync::Arc<config::Config> {std::future::ResumeTy, std::sync::Arc<zmq::Context>, std::sync::Arc<config::Config>, tmq::socket_types::request_reply::RequestReceiver, impl std::future::Future, (), tmq::message::Multipart, tmq::socket_types::request_reply::RequestSender, impl std::future::Future}]>`
    = note: required because it appears within the type `impl std::future::Future`
    = note: required because it appears within the type `impl std::future::Future`
    = note: required because it appears within the type `for<'r, 's> {std::future::ResumeTy, &'r std::sync::Arc<zmq::Context>, std::sync::Arc<zmq::Context>, &'s std::sync::Arc<config::Config>, std::sync::Arc<config::Config>, impl std::future::Future, ()}`
    = note: required because it appears within the type `[static generator@src/bin/sxsim/control/server.rs:57:46: 60:10 context:std::sync::Arc<zmq::Context>, config:std::sync::Arc<config::Config> for<'r, 's> {std::future::ResumeTy, &'r std::sync::Arc<zmq::Context>, std::sync::Arc<zmq::Context>, &'s std::sync::Arc<config::Config>, std::sync::Arc<config::Config>, impl std::future::Future, ()}]`
    = note: required because it appears within the type `std::future::from_generator::GenFuture<[static generator@src/bin/sxsim/control/server.rs:57:46: 60:10 context:std::sync::Arc<zmq::Context>, config:std::sync::Arc<config::Config> for<'r, 's> {std::future::ResumeTy, &'r std::sync::Arc<zmq::Context>, std::sync::Arc<zmq::Context>, &'s std::sync::Arc<config::Config>, std::sync::Arc<config::Config>, impl std::future::Future, ()}]>`
    = note: required because it appears within the type `impl std::future::Future`

Is any of the underlying crates I use incompatible with Tokio/async, or can this be remedied?

Check tokio::spawn signature tokio::spawn - Rust
It requires Send + 'static
So you need to wrap your pointers in type that implements Send

The error is saying that you are storing a &RequestReceiver somewhere, but RequestReceiver is not Sync. Notice the ampersand! Storing a RequestReceiver is fine, because the type is Send, but you can't keep a &RequestReceiver across an .await.

After communicating with the tmq developers (https://github.com/cetra3/tmq/issues/9#issuecomment-626249435)
the problem seems to be that tmq is not ready yet for multi-threaded communication (the sockets are Send but not Sync; the examples only use tokio::spawn_local(...)). Practically, their advice is to use tmq at the (client comm) edges of the application.

[Old: Therefore, I'll probably use tokio's mpmc channels or Tcp for internal comm (unless there's another async ipc crate/facility available)].
Update: I just found parity_tokio_ipc - Rust.

Thanks for your help!

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.