Async, tokio & future

Dear all,
I am trying my hands on tokio and async/await (i have no prior understanding of async programming). I wanted to write a function that sends data on tokio::mpsc::channel. The channel reciever sends the same data on TcpStream. I wanted the function to work in Synchronous and asynchronous context as well. Below is what I wrote -

pub fn send_to_target(&self, msg: String) {
// below reads a property file and get the property set. default is true
        let synchronous_write = get_property(SYNCHRONOUS_WRITE);
// self.responder is Arc<tokio::mpsc::Sender<String>>
        let responder = Arc::clone(self.responder);
// My understanting is that spawned task is awaited by tokio executor so I dont need to await on it
        let future = tokio::spawn(async move {
            responder.send(msg).await.unwrap();
        });
// i want to wait until the future is finished if the SYNCHRONISED_WRITE property is true
        if synchronous_send {
            loop {
                 println!("not finished future");
                 if future.is_finished() {
                     println!("future finished");
                     break;
            }
        }
   }
}

Above function is called from trait method which is not async to i cannot put async in the caller. I want to be able to call above method without declaring my caller as async as lot of my code is not async.

Problem: the future.is_finished() call never breaks out of the loop. I think my understanding flawed somewhere but I don't know where. Could someone take a look and help?

It's possible that busy waiting like that is starving the runtime threads, you should really use a sync Receiver or something like that to let your sync thread sleep rather than peg a CPU core checking if the future has finished.

That being said unless your machine is single core, or you're using the single threaded tokio runtime it seems more likely that something else is wrong. The send method suspends until there is space in the buffer. How much space did you request from channel? Do you have a receiver task set up to remove sent items from the buffer?

Thanks semicoleon! machine is multi-core and runtime is also multi-threaded. I am just sending one message (String) for testing. The requested capacity is 32. If I just remove the loop or set SYNCHRONISED_WRITE = false is properties then it works fine.
Yes I have async receiver task running on the executor that takes the message out of channel and send it across TcpStream.

Are you calling send_to_target from your async code? That might be blocking one of the futures that needs to complete

The Tokio mpsc channel has a blocking_send function that does what you're trying to do. If that method panics, then you should read this article.

Hi Alice,
Yes, blocking_send panics unless i spawn std::thread and use blocking_send in it. I don’t wanna do it ‘cuz i feel its just too heavy to spawn a std::thread for sending a message. The code is part of library that i intend to use for reading and writing to tcp stream in performant way. Thanks for the suggestions! I will go through the link!

Regards,

If blocking_send panics, then that means that you are in an async context. Blocking in such a context is simply not possible. Restructure your code to work in a different way.

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.