[Solved] Issue while trying to abstract Repository to a trait `async/await`

Hello all,
Yesterday I thought I would give my issue a try separated from the project I am working on.
The result I want to achieve is to have a ChannelRepository trait, which then can be implemented by PostgresChannelRepository (implemented with bb8) and MemoryChannelRepository ( a simple vec ) with async/await.
For that I managed to use Pin<Box<Future<Output=Result<T, RepositoryError>> + Send>>;(as async fn are not allowed in traits) for the Future that every repository will return (not only the Channel one) as I want to use the new Future preview. Everything worked great until I tried to switch from r2d2 to bb8 for async posgres pool & connection. I managed to make it compile, but I get a runtime error and I think the cause is the return of the bb8 future using compat() (to compat it ot Futures 0.3) and then something along the lines just blows internally because of it.

thread 'tokio-runtime-worker-2' panicked at 'not yet implemented: async-await-preview currently only supports futures 0.1. Use the compatibility layer of futures 0.3 instead, if you want to use futures 0.3.', /home/elpiel/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-async-await-0.1.7/src/compat/backward.rs:77:5

even with Backtrace = full, I couldn't follow it. I'm still an entry level Rust developer.

The full code can be found in this branch here and the possible problem that is coming from is here.

Sorry I can't quite pin-point what exactly is going on and I am not sure if the thing I am trying to do as PoC can be done at all like this. In the whole application I do want to use Futures 0.3 and any library that is working on Futures 0.1 to be wrapped and to use the compat layer.

Thanks in advance!

The error occurs because futures(0.3)::compat tries to do bidirectional mapping between std::task::Waker and futures(0.1)::task::Task. So when bb8's underlying futures(0.1)::task::Task is notified the compat layer will attempt to wake the std::task::Waker it was provided. Unfortunately tokio-async-await doesn't support this and panics with that error instead.

Personally I would recommend sticking with just one or the other of futures(0.3)::compat or tokio-async-await, the interaction of how the two attempt to provide compatibility causes issues like this. futures(0.3)::compat includes conversion from std::future::Future -> futures::Future which you can use when spawning the futures onto the Tokio executor.

Thanks for the answer!
Can you give me a bit more details about what you mean?
Do you mean that I need to choose between using Futures 0.3 with std::await! macro or use tokio-async-await (which provides some extension implementations for for streams & futures and other stuff including the tokio::await! macro) with Futures 0.1?

I managed to remove the tokio-async-await feature and I am planning to use it with Futures 0.3. So I guess I am on a right track like this?

Latest PoC commit: https://github.com/elpiel/repository-trait-rs/commit/5f4f1ac00e53018727cf2e3141f4c027a8b47270#diff-639fbc4ef05b315af92b4d836c31b023R17

I only couldn't figure out how I can remove the async fn handle() and directly call do the handle_request() in tokio::spawn inside fn spawn. I tried with lazy (the new Futures 0.3 lazy fn.) but it gave me that generator closures cannot have explicit arguments ( as I am calling await! inside and the Futures 0.3 lazy closure takes Context)

It would be very helpful if you could check the working example (it does work now with the bb8 future and everything) and if you can tell me how I can do the handle_request directly in the now called fn spawn.

Thank you very very much! The whole thing with Old & new Futures and tokio and compatibility layer and etc. is really hard, especially since I started working full-time + with them specifically 1 month ago...

What I have found is that I can use futures 0.3 exclusively in my projects, and whenever a library gives me a future/stream/sink that is 0.1, I use compat from futures 0.3 to convert it.

I even spawn futures from tokio on the executors from futures 0.3 (LocalPool, ThreadPool). The futures have their own access to the tokio reactor and it works without ever needing to use a tokio runtime or executor.

It's the workflow that has given me the least headaches.

ps: .await syntax has landed in nightly, so you don't even need a macro anymore! hurray

Yes, I was doing the same thing. But the problem occurred when I tried to have a existential type ( I think it's called ) of a Future for a Trait and as @Nemo157 said the problem was that the std::task::Waker, but as he also said ( and I also saw this in the code before posting) tokio-async-await does not support this.

PS: .await has landed, I know, isn't this awesome :partying_face:! We only need to wait for 20 something to confirm set the final syntax.

I'm not using tokio-async-await. I think that's the difference, no? I'll use for example TcpStream, but just convert it with compat from futures and from then on treat it as a std future.

1 Like

Yes, not using tokio-async-await solved the issue. That's why I posted the latest commit to check if that's what @Nemo157 meant.

This issue is solved, I just wanted to get some additional feedback for the code itself and the solution at hand.

1 Like

So, I'm going to start quite far back to describe exactly why tokio-async-await and futures(0.3)::compat are incompatible.

First of all, the design of both futures(0.1)::Future and std::future::Future are based on a scheme where you spawn a future as a "task" onto an "executor" to run it. The executor will poll the future once then park it until it gets a notification that the task is ready to run again. Running the future and routing the notification from the underlying async IO provided by the OS are completely decoupled (the latter is commonly called the "reactor", in Tokio's case they allow you to run both on the same thread for better performance).

With futures(0.1) this notification is handled by futures(0.1)::task::Task, the executor ensures that Task::current() will magically (via thread local storage) return a handle that can be used to notify it about readiness of the current task (IMO this naming is confusing because the Task is not the task, it is just a notification handle for the task, I will consistently use code formatting to distinguish them).

With std::futures::Future the notification is handled by std::task::Waker, the executor will pass a reference to a waker for the current task into the future as part of the std::task::Context.

Now, tokio-async-await has one purpose: to allow you to write async fn and async {} using Tokio's IO running on Tokio's executor. Because Task::current() is magic and will implicitly propagate through any intermediate std::future::Future it can ignore the std::task::Waker and just inject a dummy that doesn't work. Once you get down to an IO future that actually needs to register for notifications with the reactor it can just call Task::current() and acquire the handle that the executor stashed away earlier.

With futures(0.3)::compat the aim is to have a fully general bi-directional compatibility layer, allowing you to run std::future::Future on a futures(0.1) based executor, or run futures(0.1)::Future on a std::task based executor, or poll either as part of a larger future constructed from either types. To make this work in all cases it must translate between these two waker systems, when you use .compat() to convert a std::future::Future into a futures(0.1)::Future we add a layer that will call Task::current() to acquire the current notification handle and then wrap that into a type that implements std::task::Waker and pass this into the underlying std::future::Future, so if the underlying future attempts to call cx.waker().wake() this will then call task.notify() so that the futures(0.1) executor will see it. Similarly converting a futures(0.1)::Future into a std::future::Future will add a layer that takes the waker off the context and reconfigures Task::current() to return a Task that will call waker.wake() when task.notify() is called.

One consequence of these compatibility layers is that when you use futures(0.3)::compat to run a futures(0.1) based IO future on a futures(0.1) based executor with a middle layer of async fn we actually use both notification systems, the underlying IO future will call task.notify(), this goes into the Task that the 0.1 -> std compat layer setup which calls waker.wake(), which in turn goes into the Waker added by the std -> 0.1 compat layer which calls task.notify() on the Task setup by the executor.

The incompatibility comes from tokio-async-await assuming that Task::current will always return the Task setup by the executor. Because futures(0.3)::compat overrides this with a Task that proxies through to the Waker it was provided it breaks this underlying assumption.


You should be able to do something very close to your old implementation, just using combinators to convert the std::future::Future that async move gives you to a futures(0.1)::Future.

tokio::spawn(async move { 
    let response = await!(handle_request(pool)).unwrap();
    println!("{}", response);
}.unit_error().boxed().compat());
3 Likes

I will need to read it more times to fully understand it :sweat_smile: But thank you very much for the time you took to explain it!

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.