Tokio 0.1: how to spawn a future that takes items from stream until a certain value is returned?

Hey you all,
I am finding myself doing an extension of an app written with tokio 0.1. So far I haven't been successful.

My scenario:

let should_be_a_future =
    This::Produces::a_future()
    .which_then_produces_a_stream()
    .and_then(|stream| {

        // How do we read items until we stumble upon a termination value
        // while also returning a future we can spawn with `tokio::run`?
    });

tokio::run(lazy(|| should_be_a_future));

I tried and_then and take_while and currently banging my head against the wall with loop_fn. I keep getting compilation errors that the trait futures::Future is not implemented for <most-Stream-combinators>. :frowning: Apparently most stream combinators don't return something that implements IntoFuture. I tried a lot of ugly hacks like into_future() chained with map and map_err, so far no dice.

for_each works (because it returns something that implements IntoFuture) but I haven't been successful terminating the program when using it even by yielding an Err value.

Of course this might easily be an XY problem, I am just wondering how to achieve the final result. If having a single future that can be passed to tokio::run isn't the way to go here, I am open to hear how it's done.

Completely stuck right now. Anybody willing to give me a hint?

The for_each method returns a Future. Is that what you're looking for?

I thought I did but then it turned out I can't terminate early even if I return Err in the for_each closure.

That made me look for other combinators, where I keep failing.

You could do something like this:

async fn do_actual_operation(value: YourType) -> bool {}

stream.then(|value| do_actual_operation(value))
    .take_until(|stop| futures::future::ok(*stop))
    .for_each(|_| futures::future::ok(()));

I'll try but using the async keyword on this ancient tokio / futures versions (both 0.1.x) gives me compile errors because impl Future that Rust does nowadays seems incompatible with what they expect. Let me check again and respond after.

Ah, but, what I really meant is this:

fn fn do_actual_operation(value: YourType) -> impl Future<Item = bool, Error = std::convert::Infallible> {}

Not sure but maybe you meant take_while? take_until does not exist. Changed it to that, here's the result:

fn per_response(
    resp: Response,
    terminator_req_id: i32,
) -> impl Future<Item = bool, Error = ()> {
    println!("{:?}", resp);
    match resp {
        Response::ContractDataEndMsg(ContractDataEndMsg {
            req_id: received_req_id,
        }) => {
            println!(
                "expected req_id={}, current req_id={}, keep_going={}",
                terminator_req_id,
                received_req_id,
                received_req_id != terminator_req_id
            );
            futures::future::ok(received_req_id != terminator_req_id)
        }
        other => {
            println!("other={:?}", other);
            futures::future::ok(true)
        }
    }
}

fn async_exec_alice<F>(details: HostAndPort, requests: Vec<Request>) {
    let addr = SocketAddr::from(details);
    let builder = TwsClientBuilder::new(0);
    let client = builder
        .connect(addr, 0) // this returns a Future
        .map_err(|e| eprintln!("Read Error: {:?}", e))
        .map(move |c| c)
        .and_then(move |c| { // this returns a Stream
            println!("version:{}", c.server_version);

            let expected_req_id = req_id(&requests[0]);
            for request in requests {
                c.send_request(request);
            }

            c.then(|response| per_response(response.expect("Get response"), expected_req_id))
                .take_while(|stop| futures::future::ok(*stop))
                .for_each(|_| futures::future::ok(()))
        });

    tokio::run(lazy(|| client));
}

This works like the previous code I had before. Problem is, exactly like it, we still don't have an early termination. The program hangs. I am beginning to suspect the tokio-0.1-enabled library...

EDIT: What I mean by my last is that likely the library does not obey the requirements listed in tokio 0.1's Runtime::shutdown_on_idle. Might take a look or might give up for now. The library itself is public and not a secret company code although there are plans to improve it and modernize it to tokio 1.x in the future.

If you or anybody else is curious, here it is: GitHub - dailypips/async_ib: Tws api rust bind

Oh, well, that's probably because it spawns some sort of background task then.

Yes, it does. It spawns one that listens on a channel that accepts commands (an enum) to execute -- which are in turn sent through a socket to a remote server.

I'll see if I can quickly add a feature to make a full clean shutdown. If not, oh well, the demo/MVP will have to do without a clean shutdown. :slight_smile:

Thanks for your help!

If its listening on the channel, perhaps you only need to drop the sender to cause it to exit?

The problem is that the struct that holds these channels also implements Future and is being constructed and tokio::spawn-ed right on the spot. I can't borrow or clone anything from it except the channels.

But that should be enough indeed so I'll try to just .clone() the sending part of the stream of the background task and put it in the "client" object (the one I have access to) and .close() or drop(...) it. I'll try and report back.

No dice. The channels are mpsc, meaning that dropping one clone of the tx part does not do anything since the first copy is still around in a background task.

EDIT: I can't just put the entire tokio::spawn-able object inside the one I have control over either because spawning involves a move. Hmmm, still thinking through this.

I am drawing the line here due to uncertainty and tight time constraints -- a clean shutdown is not a hard requirement for a demo/MVP.

If you want to look at the near-complete picture, here's a link to that ancient library that I am using, and the exact place in the code where tokio::spawn happens: async_ib/builder.rs at 66ff93eea806f178a6220cae3c3cb46e19503405 · dailypips/async_ib · GitHub

The stage I am leaving at is: how do we link task to client somehow so I can manually initiate shutdown? Granted I can change the library (we have a fork that's 99% identical) but it's an uncertain amount of time and that's an unjustified risk for the next few days.

The TwsTask struct has an exiting boolean field and it's respected but it's only being set to true if the request (the task's rx) channel is closed (I'll see if I can do that quickly and if not I am giving up for now). And the whole thing seems to be wrongly written: you can't just have a background task and a client relating to it without linking them in memory somehow, especially after the runtime expects the background task to be stopped before/during the dropping of the client (you helped me achieve the latter by guiding me to how to do early termination of future-enabled loops). The library seems to require a solid dose of refactoring. :person_shrugging: