Is this use of unsafe sound?

Hi,

I'm trying to repeatedly poll an API using an async function, with some delay in between.
Unfortunately I was not able to implement this without the use of unsafe.
I've recreated a simplified version of the problem for this quesiton (see below).

My current approach uses a Vec of Futures called futures.
I add a new future to futures whenever either 50 milliseconds pass, or one of the API requests (some_function in the simplified case) completes. This is achieved by select and select_all.

  • If a future completes, I obtain the remaining_futures and assign them to the futures that are still to complete, and that I call select_all on in the next iteration of the loop.

  • Otherwise, if 50 milliseconds pass, I have to assign something to futures, since the futures array was moved by the call to select_all in the previous iteration.

I would like to just assign the previous futures, or call select_all(&futures) or select_all(futures.clone()), but neither of these work, since select_all actually wants to consume the futures and futures aren't clone-able.

My workaround is to use mem::transmute on the SelectAll object returned by select_all(futures) to obtain access to the struct's private fields.
I'm not sure if this is safe, since there may be some reason why inner is not public.

If there is a reason why SelectAll's inner is private, what is it?

Here's my full code:

use std::time::Duration;
use std::mem;
use std::time::SystemTime;

use tokio;
use async_std::task;
use futures::future::select_all;
use futures::future::Either;
use futures::future::select;
use futures::future::FutureExt;
use rand;
use rand::Rng;

mod liberal_futures {
    /// Future for the [`select_all`] function.
    #[derive(Debug)]
    #[must_use = "futures do nothing unless you `.await` or poll them"]
    pub struct SelectAll<Fut> {
        pub inner: Vec<Fut>,
    }
}

// Just some task to emulate API requests that can fail
async fn some_function() -> Result<(), ()> {
    task::sleep(Duration::from_millis(3000)).await;
    let now = SystemTime::now();
    println!("Finished @ {:?}!", now);
    // Sometimes succeeds, often doesn't
    if rand::thread_rng().gen::<f64>() < 1.0/20.0 {
        println!("Succeeded");
        Ok(())
    } else {
        Err(())
    }
}

#[tokio::main]
async fn main() {
    let mut futures = Vec::new();

    loop {
        futures.push(some_function().boxed());
        match select(
            task::sleep(Duration::from_millis(50)).boxed(), 
            select_all(futures)
        ).await {
            Either::Left((_, select_all_future)) => {
                // want to assign something to futures
                // select_all_future.inner contains the `futures` Vec but is private :(
                let liberal_future: liberal_futures::SelectAll<_> = unsafe { mem::transmute(select_all_future) };
                futures = liberal_future.inner;
            },
            Either::Right(((result, _, remaining_futures), _)) => {
                futures = remaining_futures;
                match result {
                    Ok(_) => { return; } // completed 
                    Err(_) => {},
                }
            }
        }
    }
}

Additionally, here's my Cargo.toml:

[package]
name = "select_all_example"
version = "0.1.0"
authors = ["ambiso <ambiso@invalid>"]
edition = "2018"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
tokio = { version = "0.2.0-alpha.6", features = ["macros"] }
async-std = "1.4"
futures = "0.3"
rand = "*"

Thanks in advance for any help!

Best,
ambiso

This is not sound because neither type has #[repr(C)] or #[repr(transparent)]. What's wrong with FuturesUnordered?

Also it seems that you are still using tokio alpha, is there any particular reason for this?

I just saw this thread. I would like to also note that FuturesUnordered does not require that the future is Unpin, so you do not need to box it as long as every future is of the same type.

I didn't know about FuturesUnordered that looks like it's probably what I want!

The call to select_all is what's requiring the futures to be Unpin, I don't think this would change by using FuturesUnordered?

It does change because FuturesUnordered is an alternative to select_all, so you should no longer call select_all. I haven't tested this, but it should look approximately like this:

use futures::stream::StreamExt; // for next()

let mut futures = FuturesUnordered::new();
loop {
    futures.push(some_function());
    match select(
        task::sleep(Duration::from_millis(50)),
        futures.next()
    ).await {
         // and so on
    }
}

Aha!
That works great!
Thank you :slight_smile:.

Here's my final function for polling an API, if anyone is interested:

async fn until_success<R, E, T: Future<Output = Result<R, E>> + Send, F: Fn() -> T>(func: F) -> Result<R, E> {
    let mut futures = FuturesUnordered::new();

    loop {
        futures.push(func());
        match select(
            task::sleep(Duration::from_millis(50)).boxed(), 
            futures.next()
        ).await {
            Either::Left((_, _)) => { },
            Either::Right((result, _)) => {
                match result.unwrap() {
                    Ok(x) => return Ok(x),
                    Err(_) => {},
                }
            }
        }
    }
}

It can be called by a function like some_function:

async fn some_function() -> Result<(), ()> {
    Ok(())
}

// ...

    let result = until_success(some_function).await?;

Best,
ambiso

You still do not need to box the futures.

1 Like

Right I forgot to make that change.
I'll edit my post!