Tokio - keep channels alive to communicate with bg process

use std::process::Stdio;

use tokio::{
    io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter},
    process::{Child, ChildStdin, ChildStdout, Command},
    sync::mpsc::{error::TryRecvError, unbounded_channel, UnboundedReceiver, UnboundedSender},
};

struct LSPClient {
    _process: Child,
}

#[tokio::main]
async fn main() {
    tokio::spawn(trigger()).await.ok();
}

async fn trigger() {
    let (_client, tx_send, _tx_recv) =  start_server().await;

    tx_send.send("initialize".as_bytes().to_vec()).ok();
}

async fn start_server() -> (LSPClient, UnboundedSender<Vec<u8>>, UnboundedSender<Vec<u8>>) {
    let mut process = Command::new("/home/suryateja/.config/mystudio-ide/lsp/rust-analyzer")
        .stdin(Stdio::piped())
        .stdout(Stdio::piped())
        .stderr(Stdio::piped())
        .kill_on_drop(true)
        .spawn()
        .expect("Unable to spawn process");

    let proc_id = process.id().expect("Unable to fetch process id");
    println!("proc id: {}", proc_id);

    let stdin = process.stdin.take().unwrap();
    let stdout = process.stdout.take().unwrap();
    // let stderr = process.stderr.take().unwrap();

    let writer = BufWriter::new(stdin);
    let reader = BufReader::new(stdout);

    let (tx_send, rx_send) = unbounded_channel::<Vec<u8>>();
    let (tx_recv, rx_recv) = unbounded_channel::<Vec<u8>>();

    println!("setting up tx,rx");
    setup_listeners(reader, writer, rx_send, rx_recv).await;

    // Send initialize request
    println!("initialize request");
    tx_send.send("initialize".as_bytes().to_vec()).ok();
    let client = LSPClient { _process: process };

    (client, tx_send, tx_recv)
}

async fn setup_listeners(
    mut reader: BufReader<ChildStdout>,
    mut writer: BufWriter<ChildStdin>,
    mut rx_send: UnboundedReceiver<Vec<u8>>,
    mut rx_recv: UnboundedReceiver<Vec<u8>>,
) {
    let handle_recv = tokio::spawn(async move {
        loop {
            match rx_recv.try_recv() {
                Ok(data) => {
                    println!("rx_recv got: {:?}", String::from_utf8(data));

                    let mut buf = String::new();
                    let _ = reader.read_line(&mut buf);
                }
                Err(err) => {
                    eprintln!("rx_recv failed '{}'", err);
                    if err == TryRecvError::Disconnected {
                        println!("rx_recv: quitting loop");
                        break;
                    }
                }
            }
        }
    });

    let handle_send = tokio::spawn(async move {
        loop {
            match rx_send.try_recv() {
                Ok(data) => {
                    println!("rx_send: got something");
                    let str = String::from_utf8(data).expect("invalid data for parse");
                    println!("rx_send: sending to LSP: {:?}", str);

                    _ = writer.write_all(str.as_bytes());
                }
                Err(err) => {
                    eprintln!("rx_send failed '{}'", err);
                    if err == TryRecvError::Disconnected {
                        println!("rx_recv: quitting loop");
                        break;
                    }
                }
            }
        }
    });
    _ = tokio::spawn(async move {
        tokio::spawn(handle_send);
        tokio::spawn(handle_recv);
    }).await;
}

(Playground)

Errors:

   proc id: 5269
setting up tx,rx
rx_recv failed 'receiving on an empty channel'
rx_recv failed 'initialize request
receiving on an empty channel'
rx_recv failed 'receiving on a closed channel'
rx_recv: quitting loop
rx_send failed 'receiving on an empty channel'
rx_send: got something
rx_send: sending to LSP: "initialize"
rx_send failed 'receiving on a closed channel'
rx_recv: quitting loop

It's unclear what your question is, but I notice that you're calling try_recv in a loop. That amounts to blocking the thread. Don't do it. Call the async recv method instead.

1 Like

Hi,

I am trying to run a bg process (LSP server) till the parent process quits. In order to communicate with this bg process, I have created 2 channels to send & receive unbounded_channel to the std{in,out} streams.

However rx_send & rx_recv channels appear to be "closed" as soon as the program runs.

Note: I am new to async Rust.

Use recv().await instead of try_recv as Alice mentioned.

Your problem currently is that you aren't keeping the server future alive, you spawn a bunch of tasks and never await them. So the channels get dropped and then everything exits.

If we make both of those changes, and slightly adjust how start_server works, things start working.

use std::{future::Future, process::Stdio};

use tokio::{
    io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter},
    process::{Child, ChildStdin, ChildStdout, Command},
    sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
};

struct LSPClient {
    _process: Child,
}

#[tokio::main]
async fn main() {
    // The spawn here wasn't doing anything for you.
    trigger().await;
}

async fn trigger() {
    let (_client, tx_send, _tx_recv, wait) = start_server();

    tx_send.send("initialize".as_bytes().to_vec()).ok();

    // Wait for the handlers to exit. Currently this will never happen
    wait.await
}

fn start_server() -> (
    LSPClient,
    UnboundedSender<Vec<u8>>,
    UnboundedSender<Vec<u8>>,
    impl Future<Output = ()>,
) {
    let mut process = Command::new("/home/suryateja/.config/mystudio-ide/lsp/rust-analyzer")
        .stdin(Stdio::piped())
        .stdout(Stdio::piped())
        .stderr(Stdio::piped())
        .kill_on_drop(true)
        .spawn()
        .expect("Unable to spawn process");

    let proc_id = process.id().expect("Unable to fetch process id");
    println!("proc id: {}", proc_id);

    let stdin = process.stdin.take().unwrap();
    let stdout = process.stdout.take().unwrap();
    // let stderr = process.stderr.take().unwrap();

    let writer = BufWriter::new(stdin);
    let reader = BufReader::new(stdout);

    let (tx_send, rx_send) = unbounded_channel::<Vec<u8>>();
    let (tx_recv, rx_recv) = unbounded_channel::<Vec<u8>>();

    println!("setting up tx,rx");
    // This future waits for handlers to exit, we don't want to await it here.
    // Return it instead so the caller can await it.
    let wait = setup_listeners(reader, writer, rx_send, rx_recv);

    // Send initialize request
    println!("initialize request");
    tx_send.send("initialize".as_bytes().to_vec()).ok();
    let client = LSPClient { _process: process };

    (client, tx_send, tx_recv, wait)
}

async fn setup_listeners(
    mut reader: BufReader<ChildStdout>,
    mut writer: BufWriter<ChildStdin>,
    mut rx_send: UnboundedReceiver<Vec<u8>>,
    mut rx_recv: UnboundedReceiver<Vec<u8>>,
) {
    let handle_recv = tokio::spawn(async move {
        loop {
            // Wait until a message is available instead of constantly polling for a message.
            match rx_recv.recv().await {
                Some(data) => {
                    println!("rx_recv got: {:?}", String::from_utf8(data));

                    let mut buf = String::new();
                    let _ = reader.read_line(&mut buf);
                }
                None => {
                    println!("rx_recv: quitting loop");
                    break;
                }
            }
        }
    });

    let handle_send = tokio::spawn(async move {
        loop {
            // Wait until a message is available instead of constantly polling for a message.
            match rx_send.recv().await {
                Some(data) => {
                    println!("rx_send: got something");
                    let str = String::from_utf8(data).expect("invalid data for parse");
                    println!("rx_send: sending to LSP: {:?}", str);

                    _ = writer.write_all(str.as_bytes());
                }
                None => {
                    println!("rx_recv: quitting loop");
                    break;
                }
            }
        }
    });

    // Wait for a handler to exit. You already spawned the handlers, you don't need to spawn them again.
    // I'm using select instead of join so we can see any errors immediately.
    tokio::select! {
       send = handle_send => println!("{send:?}"),
       recv = handle_recv => println!("{recv:?}"),
    };
}

Waiting on the handlers isn't the only option, but it was the simplest way to get your example working.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.