Using struct with a collection of `Box<dyn MyTrait>` in tokio tasks

I'm trying to make some observables. The subject that is being observed has a collection of objects that implement its trait. Using the subject in a tokio task causes a compilation error: future created by async block is not "Send"

Code:

use std::time::Duration;

use chrono::{DateTime, Local};
use tokio::time::sleep;

trait TimerObserver<'observer, 'subject: 'observer> {
    fn observe(&'observer mut self, time: &'subject DateTime<Local>);
}

trait Timer {
    fn get_time() -> DateTime<Local>;
}

trait ObservableTimer<'subject, 'observer> {
    fn add_observer(&'subject mut self, observer: Box<dyn TimerObserver<'observer, 'subject>>);
    fn set_time(&'subject mut self, time: DateTime<Local>);
}

#[derive(Clone)] // THIS ALSO ERRORS BECAUSE DYNS CANNOT BE CLONED
struct ATimer<'subject, 'observer> {
    observers: Vec<Box<dyn TimerObserver<'observer, 'subject>>>, // THIS IS THE ERROR
    now: DateTime<Local>,
}
impl<'subject, 'observer> ObservableTimer<'subject, 'observer> for ATimer<'subject, 'observer> {
    fn add_observer(&'subject mut self, observer: Box<dyn TimerObserver<'observer, 'subject>>) {
        self.observers.push(observer);
    }

    fn set_time(&'subject mut self, time: DateTime<Local>) {
        self.now = time;
    }
}
#[derive(Clone)] 
struct MyView<'subject> {
    now: Option<&'subject DateTime<Local>>,
}
impl<'observer, 'subject: 'observer> TimerObserver<'observer, 'subject> for MyView<'observer> {
    fn observe(&'observer mut self, time: &'subject DateTime<Local>) {
        println!("received time: {}", time);
        self.now = Some(time);
    }
}

async fn do_it() {
    let mut view = MyView { now: None };
    let mut timer = ATimer {
        observers: vec![],
        now: Local::now(),
    };
    let observer = Box::new(view);
    timer.add_observer(observer);

    let mut timer_w = timer.clone();

    tokio::spawn(async move {
        loop {
            sleep(Duration::from_secs(1)).await;
            timer_w.clone().set_time(Local::now());
        }
    });

    // give us a few secs to watch
    sleep(Duration::from_secs(10)).await;
}

and the error:

error: future cannot be sent between threads safely
   --> src/example2.rs:55:5
    |
55  |     tokio::spawn(async move {
    |     ^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: the trait `std::marker::Send` is not implemented for `(dyn TimerObserver<'_, '_> + 'static)`
note: captured value is not `Send`
   --> src/example2.rs:56:27
    |
56  |         let mut timer_w = timer_w.clone();
    |                           ^^^^^^^ has type `ATimer<'_, '_>` which is not `Send`
note: required by a bound in `tokio::spawn`
   --> /home/coliny/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ^^^^ required by this bound in `tokio::spawn`

error[E0277]: the trait bound `dyn TimerObserver<'_, '_>: Clone` is not satisfied
   --> src/example2.rs:21:5
    |
19  | #[derive(Clone)]
    |          ----- in this derive macro expansion
20  | struct ATimer<'subject, 'observer> {
21  |     observers: Vec<Box<dyn TimerObserver<'observer, 'subject>>>,
    |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ the trait `Clone` is not implemented for `dyn TimerObserver<'_, '_>`
    |
    = note: required because of the requirements on the impl of `Clone` for `Box<dyn TimerObserver<'_, '_>>`
    = note: 1 redundant requirement hidden
    = note: required because of the requirements on the impl of `Clone` for `Vec<Box<dyn TimerObserver<'_, '_>>>`
note: required by `clone`
   --> /home/coliny/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/clone.rs:122:5
    |
122 |     fn clone(&self) -> Self;
    |     ^^^^^^^^^^^^^^^^^^^^^^^^
    = note: this error originates in the derive macro `Clone` (in Nightly builds, run with -Z macro-backtrace for more info)

For more information about this error, try `rustc --explain E0277`.
error: could not compile `prideandjoy` due to 2 previous errors

Some of the lifetimes may be redundant but I wanted to be explicit for other reasons.

Help :slight_smile:

I am just guessing here, but I would try

trait TimerObserver<'observer, 'subject: 'observer> : Clone

Thanks @geebee22 - I did try that but it didn't help :-(. It feels like I am doing something fundamentally wrong, in terms of the way Rust wants me to work, but how else can one thing observe another thing?

I can make it work with channels, so I guess that's my next step, but I'd rather avoid them as async communication has nothing to do here.

To communicate you have to use a channel or Arc/Mutex, some kind of safe way of communicating.

Sorry, I haven't really managed to understand what you are trying to do here, but I doubt references are the solution.

I'm basically trying to build a graph of producer/consumers without using messaging. I need to use messaging or observers. :slight_smile:

I am not sure what you mean by "observers", but if it means one task can access a variable in another task directly, for that you need Arc and Mutex ( well there are some more obscure options, but ignoring those ).

Thanks @geebee22 - this much pain shows I am working against Rust. Back to the drawing board :slight_smile:

Send is an auto-trait which depends on the implementation details of the underlying type. When you erase that type by putting it into a Box<dyn MyTrait>, those details disappear and the compiler can't tell anymore whether or not it's safe to send the object to another thread.

If you use Box<dyn MyTrait + Send> instead, this issue doesn't arise, because the compiler checked everything when the concrete value was converted into the trait object.


The clone issue is a little trickier to resolve. You can define a clone-like method on the trait that returns a boxed trait object, and then implement Clone for the boxed trait object:

trait MyTrait {
    fn clone_as_boxed_my_trait(&self)->Box<dyn MyTrait + Send>;
}

impl Clone for Box<dyn MyTrait + Send> {
    fn clone(&self)->Self { self.clone_as_boxed_my_trait() }
}

Once you've done this, #[derive(Clone)] should work as normal for structs that contain embedded Box<dyn MyTrait + Send>s.

5 Likes

you star.

Here is a little program I just made, the main thread is "Observing" the child task:

use tokio::time::sleep;
use std::time::Duration;
use tokio::sync::RwLock;
use std::sync::Arc;

#[tokio::main]
async fn main() {
    let observable = Arc::new( RwLock::new( 0 ) );
    let copy = observable.clone();

    tokio::spawn(async move {
        loop {
            sleep(Duration::from_secs(1)).await;
            *copy.write().await += 1;            
        }
    });

    loop
    {
      sleep(Duration::from_secs(1)).await;
      println!("shared={}", observable.read().await);
    }
}

Does that resemble what you want in any way?

1 Like

Thanks George, it is, except I need stateful structs to be the observers, which is where it got really complicated.

Can I ask - why is the Clone trickery necessary?

FWIW - this article really cleared up some idioms which I weren't aware of: Actors with Tokio – Alice Ryhl

1 Like

At the moment, Rust cannot return a dynamically-sized object (like dyn MyTrait) from a function directly; it has to be behind some kind of indirection. This prevents trait objects from implementing Clone directly, as the method signature would be fn(&dyn MyTrait)->dyn MyTrait.

Box<dyn MyTrait> is a fixed-size type, though, so the only obstacle to implementing Clone is actually making the logical copy. By defining the extra trait method, you can defer to the underlying implementations to define how this should happen.

1 Like

Well you can stick whatever you like inside a RwLock, in my little program it's an integer, but it can be a struct, or any type as long as it has a size.

1 Like

Clever - thanks.

Thanks George. Alice's article about Actors with Tokio (further up this thread) described what I was attempting, and why it can't be done easily perfectly. I'm probably not being clear here, but I was letting my decades of OO leak in and trying to have a stateful Actor in the tasks themselves rather than stateless fragments as you've done.

It's all good, and I really appreciate yours and @2e71828's efforts!

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.