Proper use of a Stream<Item = Event> across multiple futures with loop and .next()

I need some clarification on how to properly use Streams. My program is quite extensive, covering about 24 separate files, so I will try to break this down as much as possible while remaining within my NDA. The problem occurs in file_ops.rs and chunker.rs where initially the error says impl Stream<Item = Event> cannot be unpinned. Therefore, I opted to pin the Stream in file_ops using .boxed() to see what happens. This would turn event_stream to a Pin<Box<dyn Stream<Item = _> + Send>> and the error now is with .boxed where impl Stream<Item = Event> cannot be sent between threads safely.
How do I properly create the Stream and then enable it to be used by multiple futures with .next(). The crux of what I am doing is that many futures will be checking the stream for an InboundRequest that is eq to the file_alias being provided.

#[WORKSPACE]
image

[dependencies]

    libp2p = "0.43.0"
    async-std = { version = "1.12.0", features = ["attributes"] }
    futures = "0.3.2"
    async-trait = "0.1.58"
    env_logger = "0.9.1"

[main.rs]

use std::error::Error;
use futures::executor::block_on;
use crate::library::run_application;
mod library;

#[async_std::main]
async fn main() -> Result<(), Box<dyn Error>> {
    env_logger::init();
    block_on(run_application());
    Ok(())
}

[library.rs]

use std::{path::PathBuf, error::Error};
pub mod ops;
pub mod network;
pub mod chunker;

pub async fn run_application() -> Result<(), Box<dyn Error>> {
    let (mut client, mut event_stream) = network::launch::new().await?;
    let file_command = FileCommand { path: PathBuf::from("test.txt"), name: "test".to_string() };
    ops::run_args::run_arg(&mut client, &mut event_stream, file_command).await;
    Ok(())
}

pub struct FileCommand {
    pub path: PathBuf,
    pub name: String
}

[launch.rs]

use std::{collections::HashSet, error::Error};
use futures::{Stream, channel::{mpsc, oneshot}, SinkExt};
use libp2p::{request_response::ResponseChannel, PeerId};

pub async fn new() -> Result<(Client, impl Stream<Item = Event>), Box<dyn Error>> {
    let (command_sender, command_receiver) = mpsc::channel(0);
    let (event_sender, event_receiver) = mpsc::channel(0);
    Ok((Client { sender: command_sender }, event_receiver))
}

pub struct Client {
    pub sender: mpsc::Sender<Command>
}

impl Client {
    /// List oneself as a provider of the given file or chunk on the DHT
    pub async fn start_providing(&mut self, file_name: String) {
        let (sender, receiver) = oneshot::channel();
        self.sender.send(Command::StartProviding { file_name, sender }).await.expect("Command receiver not to be dropped.");
        receiver.await.expect("Sender not to be dropped.");
    }
    /// Respond with the provided file or chunk to the requesting peer.
    pub async fn respond_file(&mut self, file: Vec<u8>, channel: ResponseChannel<FileResponse>) {
        self.sender.send(Command::RespondFile { file, channel }).await.expect("Command receiver not to be dropped.");
    }
}

pub struct FileResponse(pub Vec<u8>);

pub enum Event {
    InboundRequest {
        request: String,
        channel: ResponseChannel<FileResponse>
    }
}

pub enum Command {
    StartProviding {
        file_name: String,
        sender: oneshot::Sender<HashSet<PeerId>>
    },
    RespondFile {
        file: Vec<u8>,
        channel: ResponseChannel<FileResponse>
    }
}

[run_args.rs]

use futures::Stream;
use crate::library::{FileCommand, network::launch::{Event, Client}};
use super::file_ops::handle_file_command;

pub async fn run_arg(
    client: &mut Client,
    event_stream: &mut impl Stream<Item = Event>,
    file_command: FileCommand
) {
    handle_file_command(client, event_stream, file_command).await
}

[file_ops.rs]

use std::error::Error;
use async_std::stream::StreamExt;
use futures::Stream;
use crate::library::{network::launch::{Event, Client}, FileCommand, chunker::chunk_file};

pub async fn handle_file_command (
    client: &mut Client,
    event_stream: &mut impl Stream<Item = Event>,
    file_command: FileCommand
) {
    provide_file(client, event_stream, file_command).await;
}

pub async fn provide_file (
    client: &mut Client,
    event_stream: &mut impl Stream<Item = Event>,
    file_command: FileCommand
) -> Result<(), Box<dyn Error>> {
    client.start_providing(file_command.name.clone()).await;
    loop {
        match event_stream.next().await {
            Some(Event::InboundRequest { request, channel }) => {
                if request == file_command.name {
                    chunk_file(client, event_stream, &file_command.path, 1024);
                    // Upon completion of chunker, respond to original file request with a header
                    // Either respond with header here or at the end of the chunk_file function
                    // client.respond_file(header, channel).await;
                }
            }
        }
    }
    Ok(())
}

[chunker.rs]

use std::{error::Error, path::Path};
use futures::Stream;
use super::network::launch::{Client, Event};

pub async fn chunk_file(
    client: &mut Client,
    event_stream: &mut impl Stream<Item = Event>,
    path: &Path,
    chunk_size: usize
) -> Result<(), Box<dyn Error>> {
    // Check that the file exists, chunk_size is > 0, and file.len() is > 0
    // Open the file
    let mut len = path.metadata()?.len() as usize;
    let mut chunk_count = 0;
    // Get a count of how many chunks the file will create
    while len > chunk_size {
        chunk_count += 1;
        len -= chunk_size;
    }
    if len > 0 {
        chunk_count += 1;
    }
    for n in 1..chunk_count + 1 {
    // Bunch of code here that will:
        // Fill a BufReader.with_capacity(chunk_size, &file)
        // Create a new chunk_name and chunk_path
        let chunk_path = "chunks/";
        let chunk_name = format!("{}{:03}.dat", path.file_stem().unwrap().to_str().unwrap(), n);
        let chunk_path = format!("{}{}", chunk_path, chunk_name);
        // If chunk exists 
            // Skip and consume BufReader 
        // Else
            // Create new file with chunk_name at chunk_path
            // Write BufReader to chunk_file and consumes
            client.start_providing(chunk_name).await;
            loop {
                match event_stream.next().await {
                    Some(Event::InboundRequest { request, channel }) => {
                        if request == chunk_name {
                            client.respond_file(std::fs::read(&chunk_path)?, channel).await;
                        }
                    }
                    None => todo!()
                }
            }
    }
    Ok(())
}

This topic may leave more questions then answers. This is my first time using Rust, and so much of the technicalities and programming quirks of the language have and are eluding me. I am open to a one on one meeting if you think the help I need extends beyond just general help and would be better as a code review from an experienced Rustacean

The best thing you can do to get help is condense your problematic code into a simple, standalone (doesn't depend on anything you can't share) β€œreproducible” example that actually almost-compiles, so that we can study and experiment with the problem you're having. Replace irrelevant types and functions with struct Foo; and fn foo() {} placeholders, and so on.

It's very hard to understand problems, or give clear advice, starting from non-compilable snippets. The process of preparing that example may also give you more insight into the problem.

Also, here is how to format code blocks on the forum with "```".

Okay thank you. I will do my best to make it replicable. Would you prefer I start a new topic and remove this one or just edit this one?
Already running into the reliance of each of the above 6 files on one or more of the other 18 files, such as my NetworkBehaviour, NetworkSwarm, NetworkClient, RequestResponseCodec, etc. I will try to limit these as much as possible but it may affect the end result of my error propagation

Finished rewriting the problem to include the bare minimum. The code will do nothing once the error in question is fixed since I have no Command or Event handler, Transport, Behaviour, Swarm, RequestResponseCodec, or command_receiver/event_sender for an event_loop. Hopefully this will be enough to help answer my question!

I also opted to retain the function and variable names as I myself get confused once there are too many obfuscated terms

And what is the full compiler error message that you get? Please copy the entire text, from cargo check and not from any IDE you may be using (it has useful hints and context).

error[E0277]: `impl Stream<Item = Event>` cannot be unpinned
   --> src\library\ops\file_ops.rs:23:28
    |
23  |         match event_stream.next().await {
    |                            ^^^^ the trait `Unpin` is not implemented for `impl Stream<Item = Event>`
    |
    = note: consider using `Box::pin`
note: required by a bound in `async_std::stream::StreamExt::next`
   --> C:\Users\workstation.local\.cargo\registry\src\github.com-1ecc6299db9ec823\async-std-1.12.0\src\stream\stream\mod.rs:181:15
    |
181 |         Self: Unpin,
    |               ^^^^^ required by this bound in `async_std::stream::StreamExt::next`
help: consider further restricting this bound
    |
18  |     event_stream: &mut impl Stream<Item = Event> + std::marker::Unpin,
    |                                                  ++++++++++++++++++++

error[E0277]: `impl Stream<Item = Event>` cannot be unpinned
  --> src\library\ops\file_ops.rs:23:15
   |
23 |         match event_stream.next().await {
   |               ^^^^^^^^^^^^^^^^^^^ the trait `Unpin` is not implemented for `impl Stream<Item = Event>`
   |
   = note: consider using `Box::pin`
note: required by a bound in `async_std::stream::stream::next::NextFuture`
  --> C:\Users\workstation.local\.cargo\registry\src\github.com-1ecc6299db9ec823\async-std-1.12.0\src\stream\stream\next.rs:9:30
   |
9  | pub struct NextFuture<'a, T: Unpin + ?Sized> {
   |                              ^^^^^ required by this bound in `async_std::stream::stream::next::NextFuture`
help: consider further restricting this bound
   |
18 |     event_stream: &mut impl Stream<Item = Event> + std::marker::Unpin,
   |                                                  ++++++++++++++++++++

error[E0277]: `impl Stream<Item = Event>` cannot be unpinned
  --> src\library\ops\file_ops.rs:23:34
   |
23 |         match event_stream.next().await {
   |                                  ^^^^^^ the trait `Unpin` is not implemented for `impl Stream<Item = Event>`
   |
   = note: consider using `Box::pin`
note: required by a bound in `async_std::stream::stream::next::NextFuture`
  --> C:\Users\workstation.local\.cargo\registry\src\github.com-1ecc6299db9ec823\async-std-1.12.0\src\stream\stream\next.rs:9:30
   |
9  | pub struct NextFuture<'a, T: Unpin + ?Sized> {
   |                              ^^^^^ required by this bound in `async_std::stream::stream::next::NextFuture`
help: consider further restricting this bound
   |
18 |     event_stream: &mut impl Stream<Item = Event> + std::marker::Unpin,
   |                                                  ++++++++++++++++++++

warning: unreachable expression
  --> src\library\ops\file_ops.rs:34:5
   |
22 | /     loop {
23 | |         match event_stream.next().await {
24 | |             Some(Event::InboundRequest { request, channel }) => {
25 | |                 if request == file_command.name {
...  |
32 | |         }
33 | |     }
   | |_____- any code following this expression is unreachable
34 |       Ok(())
   |
1  | use futures::StreamExt;
   |
1  | use futures_lite::stream::StreamExt;
   |
1  | use std::iter::Iterator;
   |
     and 1 other candidate

As mentioned in the question putting,

let event_stream = event_stream.boxed()

on line 20 of file_ops.rs changes these errors to,

`impl Stream<Item = Event>` cannot be sent between threads safely
`impl Stream<Item = Event>` cannot be sent between threads

cannot be sent between threads safely

That's because in the function parameters you wrote event_stream: &mut impl Stream<Item = Event> β€” you specified the event_stream must implement Stream but did not specify it also must implement Send. You'd have to write

    event_stream: &mut (impl Stream<Item = Event> + Send),

However, that's not actually sufficient; it's just the error resulting from using boxed() which insists on Send (there is also a version .boxed_local() which does not). In order to poll a future or stream, it must be pinned by its owner (to ensure it stays pinned for the rest of its life, which is part of the contract of Pin), so no boxing or pinning inside of the functions taking &mut impl Stream is going to solve your problem.

You need to pin the event stream at the top level:

pub async fn run_application() -> Result<(), Box<dyn Error>> {
    let (mut client, event_stream) = network::launch::new().await?;
    futures::pin_mut!(event_stream);   // <---------- new
    let file_command = FileCommand { path: PathBuf::from("test.txt"), name: "test".to_string() };
    ops::run_args::run_arg(&mut client, event_stream, file_command).await;
    Ok(())
}

Docs for pin_mut!.

And finally propagate the pinned type through your function calls all the way to where the stream is polled:

pub async fn run_arg(
    client: &mut Client,
    event_stream: Pin<&mut (impl Stream<Item = Event> + Send)>,
    //            ^^^^ new                            ^^^^^^^^
2 Likes

Thank you for the help!
I was able to get through those errors although I am now faced with another error in my library.rs file on the line:

loop {
    // code
    if {
        // code
    } else {
        run_arg(&mut client, event_stream, file_command).await;
    }
}

and my file_ops.rs file. Same line:

loop {
    match event_stream.next().await {
        //code
    }
{

but now with the error:

error[E0382]: use of moved value: `event_stream`
  --> src\library.rs:91:47
   |
55 |     futures::pin_mut!(event_stream);
   |     ------------------------------- move occurs because `event_stream` has type `Pin<&mut impl Stream<Item = behavior::Event> + std::marker::Send>`, which does not implement the `Copy` trait
...
91 |             run_arg(&connection, &mut client, event_stream, cli_option).await;
   |                                               ^^^^^^^^^^^^ value moved here, in previous iteration of loop
error[E0382]: borrow of moved value: `event_stream`
  --> src\library\ops\file_ops.rs:72:19
   |
55 |         mut event_stream: Pin<&mut (impl Stream<Item = Event> + Send)>,
   |         ---------------- move occurs because `event_stream` has type `Pin<&mut impl Stream<Item = Event> + Send>`, which does not implement the `Copy` trait
...
72 |             match event_stream.next().await {
   |                   ^^^^^^^^^^^^^^^^^^^ value borrowed here after move
...
81 |                             chunk_file(client, event_stream, &file_command.path, CHUNK_SIZE_8MB).await;
   |                                                ------------ value moved here, in previous iteration of loop

Is the core of my problem that I am approaching my intended functionality incorrectly?

In a loop you need to call Pin::as_mut() when you use the variable in the loop.

run_arg(&mut client, event_stream.as_mut(), file_command).await;

This is so that you produce a reborrowed Pin<&mut T> instead of handing away the one you have. (This happens automatically with plain &mut T references, but not for other types containing references.)

1 Like

Thank you once again! I have several follow-up questions but am not sure if I am composing them correctly so I will end this discussion here lest I derivate from the header topic too far. I will definitely start using this forum though for help with this language. I am the only developer in my company using Rust and the project is quite intensive so I greatly appreciate the support I have found thus far!

I wanted to add an error that was caused by the pin_mut!() macro, as well as my solution. The error I encountered was a stack overflow! Something I came to realize, after some very interesting thread debates, has been a thorn in many a side for the past 7 years. I am obviously no expert and so my solution may not be the most proper, the most efficient, or even correct, but it has at least allowed me to continue onward.

pub async fn run_application() -> Result<(), Box<dyn Error>> {
    let (mut client, event_stream) = network::launch::new().await?; <------- old (replace with next line)
    let (mut client, mut event_stream, event_loop) = network::launch::new().await?; <+++++++ new
    futures::pin_mut!(event_stream);   // <---------- old (remove)
    let file_command = FileCommand { path: PathBuf::from("test.txt"), name: "test".to_string() };
    spawn(event_loop.run());
    ops::run_args::run_arg(&mut client, event_stream, file_command).await; <------ old (replace with next line)
    ops::run_args::run_arg(&mut client, event_stream.as_mut(), file_command).await?; <+++++++ new
    Ok(())
}

↓↓↓↓↓↓ Line 1, 2, 4 new/changed ↓↓↓↓↓↓
pub async fn new() -> Result<(Box<Client>, Pin<Box<impl Stream<Item = Event> + Send>>, Box<EventLoop>), Box<dyn Error>> {
    let swarm: Swarm<ComposedBehaviour> = swarm::new(id_keys).await?;
    let (command_sender, command_receiver) = mpsc::channel(0);
    let (event_sender, event_receiver) = mpsc::channel(0);
    Ok((Box::new(Client { sender: command_sender }), Box::pin(event_receiver), Box::new(EventLoop::new(swarm, command_receiver, event_sender))))
}

As it was not part of the original problem I had it excluded, but the creation and spawning of the event_loop which in turn contains the swarm, and the subsequent 2k or so lines of Transport and NetworkBehaviour logic, caused the stack to overflow upon instantiation. This then led to the changes you see above. Hope this helps anyone else who may have never seen a stack overflow!

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.