Rust streaming grpc with background tasks

What is the idiomatic Rust way to solve my task

AS IS: i have multiple grpc servers, my rust app on start is connecting to them all via different clients and i wanna store that connection in some struct lets name it Holder, which is stored as Vec and then passed to my logic layer of the application

since my rpc method stream(request and response) my Holder is storing tx and rx, what i want is to run background task for each Holder listening on rx and send it via grpc client to server, so i need to pass tx or entire Holder struct to my logic layer and here's my question all begins, i used to Go/C pointers which for this tasks i could just use pefectly, but Rust want allow me this and i having multiple borrow checker errors, on this level i started thinking im not doing this in "Rust idiomatic way" if its possible ofc, so asking for some help and examples maybe, below some abstract code of wha i am trying to do:

let holders: Vec<Holder> = match HolderCollection::create(hosts).await {
        Ok(holders ) => holders,
        Err(err ) => {
            panic!("error creating data holders: {}", err)
        }
    };

create looks like

let mut res = HolderCollection {
            holders: Vec::with_capacity(storages.len())
        };

        for v in storages {
            let mut client = TransferServiceClient::connect(v).await?;

            let (tx, rx) = mpsc::unbounded_channel();
            let rx_stream = UnboundedReceiverStream::new(rx);

            res.holders.push(Holder {
                tx,
                rc: rx_stream,
                client
                // some filed which should be updated on every response after sending grpc message
                stat: 0.0,
            })
        }

        Ok(res)
    }

background task:

pub fn run(&self) {
        for mut h in self.holders {
            tokio::spawn(async move {
                // cause of the stream, i cant just pass channel as argument
                match h.client.put_data(h.rc).await {
                    Ok(res) => {
                        if let Some(msg) = res.into_inner().message().await {
                            h.stat = msg.new_stat
                            }
                        }
                    }
                    Err(err) => {
                        eprintln!("{}", err)
                    }
                }
            });
        }
    }

after that im trying to use those structs in the other component which somehow(its not important) generates messages, e.g:

// also not important logic of retrieving one of the holders
let holder = self.holders.get_available()
/*
do some logic here
*/

holder.tx.send(some_data)

is my approach acceptable in Rust, cause number of borrow checker errors im receiving is huge and i am thinking im doing something wrong

Channels are meant to be cloned, which just bumps their reference count, before moving them to a different thread/task. You clone them into a variable before starting the thread/task, and that variable is then automatically moved into the closure for the thread/task. For example (if I understand your variable types):

        for mut h in self.holders {
            let rc = h.rc.clone(); // <<<<< add this
            tokio::spawn(async move {
                // cause of the stream, i cant just pass channel as argument
                match h.client.put_data(rc).await {
                                   //   ^^ use the cloned var here

i've tried, but UnboundedReceiverStream(or just ReceiverStream) is not clonnable by itself, and even being put in the Arc, Rc, whatever is not satisfiy stream trait by grpc methods

thats tonic exactly, it is in tonic::codegen::tokio_stream::wrappers

upd:

with Arc<UnboundReceiverStream> i can do clone of the rc, and then cloned.into_streaming_request and that Type will satisfy streaming trait, but i still receive error on cloned.into_streaming_requeist(cannot move cloned)

Please post the code showing what you're doing, including the types, and the full error (run Cargo in the terminal and copy/paste the whole thing).

pub fn run(self) {
        for h in self.holders {
            let rc: Arc<UnboundedReceiverStream<RuntimeData>> = h.rc.clone();
            let req: Request<UnboundedReceiverStream<RuntimeData>> = rc.into_streaming_request();
            let mut cl: ServiceClient<Channel> = h.client.clone();
            tokio::spawn(async move {
                match cl.put_data(req).await {
                    Ok(res) => {
                        if let Some(msg) = res.into_inner().message().await.unwrap() {
                            match h.stat.lock() {
                                Ok(mut data) => {
                                    *data = msg.data_estimated_delay_sec
                                }
                                Err(err) => {
                                    eprintln!("{}", err);
                                    return;
                                }
                            }
                        }
                    }
                    Err(err) => {
                        eprintln!("{}", err)
                    }
                }
            });
        }
    }
error[E0507]: cannot move out of an `Arc`
  --> src/holder.rs:63:23
   |
63 |             let req = rc.into_streaming_request();
   |                       ^^ ------------------------ value moved due to this method call
   |                       |
   |                       move occurs because value has type `UnboundedReceiverStream<RuntimeData>`, which does not implement the `Copy` trait

The UnboundedReceiver can't be cloned. So there can only be one task that owns, and has access to, the receiver.

Since you want the receiver to be used in the task you're spawning, the receiver must be moved into the closure for that task. You can't store the receiver in the holder structure, and also move it into the closure. It can't be shared.

So you probably shouldn't create the holder structure at all, unless you just want to store the sender in it.

And this line goes inside the closure/task, not outside, since rc is owned by that task:

let req: Request<UnboundedReceiverStream<RuntimeData>> = rc.into_streaming_request();