I need some clarification on how to properly use Streams. My program is quite extensive, covering about 24 separate files, so I will try to break this down as much as possible while remaining within my NDA. The problem occurs in file_ops.rs and chunker.rs where initially the error says impl Stream<Item = Event> cannot be unpinned. Therefore, I opted to pin the Stream in file_ops using .boxed() to see what happens. This would turn event_stream to a Pin<Box<dyn Stream<Item = _> + Send>> and the error now is with .boxed where impl Stream<Item = Event> cannot be sent between threads safely.
How do I properly create the Stream and then enable it to be used by multiple futures with .next(). The crux of what I am doing is that many futures will be checking the stream for an InboundRequest that is eq to the file_alias being provided.
#[WORKSPACE]
[dependencies]
libp2p = "0.43.0"
async-std = { version = "1.12.0", features = ["attributes"] }
futures = "0.3.2"
async-trait = "0.1.58"
env_logger = "0.9.1"
[main.rs]
use std::error::Error;
use futures::executor::block_on;
use crate::library::run_application;
mod library;
#[async_std::main]
async fn main() -> Result<(), Box<dyn Error>> {
env_logger::init();
block_on(run_application());
Ok(())
}
[library.rs]
use std::{path::PathBuf, error::Error};
pub mod ops;
pub mod network;
pub mod chunker;
pub async fn run_application() -> Result<(), Box<dyn Error>> {
let (mut client, mut event_stream) = network::launch::new().await?;
let file_command = FileCommand { path: PathBuf::from("test.txt"), name: "test".to_string() };
ops::run_args::run_arg(&mut client, &mut event_stream, file_command).await;
Ok(())
}
pub struct FileCommand {
pub path: PathBuf,
pub name: String
}
[launch.rs]
use std::{collections::HashSet, error::Error};
use futures::{Stream, channel::{mpsc, oneshot}, SinkExt};
use libp2p::{request_response::ResponseChannel, PeerId};
pub async fn new() -> Result<(Client, impl Stream<Item = Event>), Box<dyn Error>> {
let (command_sender, command_receiver) = mpsc::channel(0);
let (event_sender, event_receiver) = mpsc::channel(0);
Ok((Client { sender: command_sender }, event_receiver))
}
pub struct Client {
pub sender: mpsc::Sender<Command>
}
impl Client {
/// List oneself as a provider of the given file or chunk on the DHT
pub async fn start_providing(&mut self, file_name: String) {
let (sender, receiver) = oneshot::channel();
self.sender.send(Command::StartProviding { file_name, sender }).await.expect("Command receiver not to be dropped.");
receiver.await.expect("Sender not to be dropped.");
}
/// Respond with the provided file or chunk to the requesting peer.
pub async fn respond_file(&mut self, file: Vec<u8>, channel: ResponseChannel<FileResponse>) {
self.sender.send(Command::RespondFile { file, channel }).await.expect("Command receiver not to be dropped.");
}
}
pub struct FileResponse(pub Vec<u8>);
pub enum Event {
InboundRequest {
request: String,
channel: ResponseChannel<FileResponse>
}
}
pub enum Command {
StartProviding {
file_name: String,
sender: oneshot::Sender<HashSet<PeerId>>
},
RespondFile {
file: Vec<u8>,
channel: ResponseChannel<FileResponse>
}
}
[run_args.rs]
use futures::Stream;
use crate::library::{FileCommand, network::launch::{Event, Client}};
use super::file_ops::handle_file_command;
pub async fn run_arg(
client: &mut Client,
event_stream: &mut impl Stream<Item = Event>,
file_command: FileCommand
) {
handle_file_command(client, event_stream, file_command).await
}
[file_ops.rs]
use std::error::Error;
use async_std::stream::StreamExt;
use futures::Stream;
use crate::library::{network::launch::{Event, Client}, FileCommand, chunker::chunk_file};
pub async fn handle_file_command (
client: &mut Client,
event_stream: &mut impl Stream<Item = Event>,
file_command: FileCommand
) {
provide_file(client, event_stream, file_command).await;
}
pub async fn provide_file (
client: &mut Client,
event_stream: &mut impl Stream<Item = Event>,
file_command: FileCommand
) -> Result<(), Box<dyn Error>> {
client.start_providing(file_command.name.clone()).await;
loop {
match event_stream.next().await {
Some(Event::InboundRequest { request, channel }) => {
if request == file_command.name {
chunk_file(client, event_stream, &file_command.path, 1024);
// Upon completion of chunker, respond to original file request with a header
// Either respond with header here or at the end of the chunk_file function
// client.respond_file(header, channel).await;
}
}
}
}
Ok(())
}
[chunker.rs]
use std::{error::Error, path::Path};
use futures::Stream;
use super::network::launch::{Client, Event};
pub async fn chunk_file(
client: &mut Client,
event_stream: &mut impl Stream<Item = Event>,
path: &Path,
chunk_size: usize
) -> Result<(), Box<dyn Error>> {
// Check that the file exists, chunk_size is > 0, and file.len() is > 0
// Open the file
let mut len = path.metadata()?.len() as usize;
let mut chunk_count = 0;
// Get a count of how many chunks the file will create
while len > chunk_size {
chunk_count += 1;
len -= chunk_size;
}
if len > 0 {
chunk_count += 1;
}
for n in 1..chunk_count + 1 {
// Bunch of code here that will:
// Fill a BufReader.with_capacity(chunk_size, &file)
// Create a new chunk_name and chunk_path
let chunk_path = "chunks/";
let chunk_name = format!("{}{:03}.dat", path.file_stem().unwrap().to_str().unwrap(), n);
let chunk_path = format!("{}{}", chunk_path, chunk_name);
// If chunk exists
// Skip and consume BufReader
// Else
// Create new file with chunk_name at chunk_path
// Write BufReader to chunk_file and consumes
client.start_providing(chunk_name).await;
loop {
match event_stream.next().await {
Some(Event::InboundRequest { request, channel }) => {
if request == chunk_name {
client.respond_file(std::fs::read(&chunk_path)?, channel).await;
}
}
None => todo!()
}
}
}
Ok(())
}