Picking a single item from a futures stream into the current thread (with my own event loop)

#1

Hey :wave: new Rust user here with a C++ and Go background! Rewriting an application plugin that was C++ in Rust and I’m trying to do it with tokio+futures. The plugin is just a HTTP client so I chose reqwest, tokio and futures.

Quick overview of the application’s architecture:

  • It has an embedded event-driven scripting language - my plugin provides functions to this language
  • It has it’s own “event loop” which is essentially a function that’s called every “tick” of the scripting language’s runtime.
  • The application is single threaded and the scripting language runtime is too.

So the original C++ plugin spawned a new thread for each request, dropped the response on a queue and then the event loop function fired which attempted to get a mutex lock on the queue and call the necessary callbacks in the scripting runtime.

In Rust, I didn’t want to just rewrite the C++ version, I wanted to try doing it The Rust Way with futures and stuff. I’ve learnt a lot so far but hit a road block in the final piece of the puzzle.

I need to pick a single item off the mpsc queue into the main thread. I’ve read a lot of the docs for tokio and can’t figure out how to do this. All the examples focus on doing work in the future using lambdas but I can’t do that, I need to bring the response object from a queue of completed requests into the main thread and send it into the scripting language runtime via a callback event.

So, anyway, on to the code. Luckily, given it’s a rewrite of an existing codebase, I already have a test harness set up and all the code is open source!

This is the function that’s called from the embedded scripting language to fire off a request.

This is where it does the request using tokio, as far as I could tell from the tests, this all works fine and it gets the response and places it on the async mpsc queue:

Now, this might be where I’m going wrong, I’ve attempted to implement the Stream trait for my RequestClient type so it can be polled for the most recent response:

And finally, the part of the code that’s actually failing at runtime:

And the error is:

thread '<unnamed>' panicked at 'no Task is currently running',

Relevant travis logs: https://travis-ci.org/Southclaws/pawn-requests/jobs/482577490#L905

Now, I think I understand why this error appears: it’s not being called from either inside a future combinator or it’s not being called with runtime::spawn or tokio::run. The problem is, I don’t know how to do what I want to do with the tokio/futures API currently.

All I want to do is pick one item (or all, all would be fine too!) off the mpsc that’s inside request_client.rs into the main thread (not into some “future” closure).

I’d appreciate any pointers here, I’m really liking the language so far but just getting hung up on this API right now. Thanks! (if you help me out and you’re going to FOSDEM, I’ll buy you a beer!)

#2

Have you tried using std::sync::mpsc channel to send the response over to the main thread? In process_tick you can try_recv on the Receiver side of the channel to emulate the polling (non-blocking) nature of the futures-based channel.

#3

Oh god it’s so simple! I completely forgot there was an mpsc in the standard library! Thanks!