Tokio - keep channels alive to communicate with bg process

use std::process::Stdio;

use tokio::{
    io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter},
    process::{Child, ChildStdin, ChildStdout, Command},
    sync::mpsc::{error::TryRecvError, unbounded_channel, UnboundedReceiver, UnboundedSender},
};

struct LSPClient {
    _process: Child,
}

#[tokio::main]
async fn main() {
    tokio::spawn(trigger()).await.ok();
}

async fn trigger() {
    let (_client, tx_send, _tx_recv) =  start_server().await;

    tx_send.send("initialize".as_bytes().to_vec()).ok();
}

async fn start_server() -> (LSPClient, UnboundedSender<Vec<u8>>, UnboundedSender<Vec<u8>>) {
    let mut process = Command::new("/home/suryateja/.config/mystudio-ide/lsp/rust-analyzer")
        .stdin(Stdio::piped())
        .stdout(Stdio::piped())
        .stderr(Stdio::piped())
        .kill_on_drop(true)
        .spawn()
        .expect("Unable to spawn process");

    let proc_id = process.id().expect("Unable to fetch process id");
    println!("proc id: {}", proc_id);

    let stdin = process.stdin.take().unwrap();
    let stdout = process.stdout.take().unwrap();
    // let stderr = process.stderr.take().unwrap();

    let writer = BufWriter::new(stdin);
    let reader = BufReader::new(stdout);

    let (tx_send, rx_send) = unbounded_channel::<Vec<u8>>();
    let (tx_recv, rx_recv) = unbounded_channel::<Vec<u8>>();

    println!("setting up tx,rx");
    setup_listeners(reader, writer, rx_send, rx_recv).await;

    // Send initialize request
    println!("initialize request");
    tx_send.send("initialize".as_bytes().to_vec()).ok();
    let client = LSPClient { _process: process };

    (client, tx_send, tx_recv)
}

async fn setup_listeners(
    mut reader: BufReader<ChildStdout>,
    mut writer: BufWriter<ChildStdin>,
    mut rx_send: UnboundedReceiver<Vec<u8>>,
    mut rx_recv: UnboundedReceiver<Vec<u8>>,
) {
    let handle_recv = tokio::spawn(async move {
        loop {
            match rx_recv.try_recv() {
                Ok(data) => {
                    println!("rx_recv got: {:?}", String::from_utf8(data));

                    let mut buf = String::new();
                    let _ = reader.read_line(&mut buf);
                }
                Err(err) => {
                    eprintln!("rx_recv failed '{}'", err);
                    if err == TryRecvError::Disconnected {
                        println!("rx_recv: quitting loop");
                        break;
                    }
                }
            }
        }
    });

    let handle_send = tokio::spawn(async move {
        loop {
            match rx_send.try_recv() {
                Ok(data) => {
                    println!("rx_send: got something");
                    let str = String::from_utf8(data).expect("invalid data for parse");
                    println!("rx_send: sending to LSP: {:?}", str);

                    _ = writer.write_all(str.as_bytes());
                }
                Err(err) => {
                    eprintln!("rx_send failed '{}'", err);
                    if err == TryRecvError::Disconnected {
                        println!("rx_recv: quitting loop");
                        break;
                    }
                }
            }
        }
    });
    _ = tokio::spawn(async move {
        tokio::spawn(handle_send);
        tokio::spawn(handle_recv);
    }).await;
}

(Playground)

Errors:

   proc id: 5269
setting up tx,rx
rx_recv failed 'receiving on an empty channel'
rx_recv failed 'initialize request
receiving on an empty channel'
rx_recv failed 'receiving on a closed channel'
rx_recv: quitting loop
rx_send failed 'receiving on an empty channel'
rx_send: got something
rx_send: sending to LSP: "initialize"
rx_send failed 'receiving on a closed channel'
rx_recv: quitting loop

It's unclear what your question is, but I notice that you're calling try_recv in a loop. That amounts to blocking the thread. Don't do it. Call the async recv method instead.

1 Like

Hi,

I am trying to run a bg process (LSP server) till the parent process quits. In order to communicate with this bg process, I have created 2 channels to send & receive unbounded_channel to the std{in,out} streams.

However rx_send & rx_recv channels appear to be "closed" as soon as the program runs.

Note: I am new to async Rust.

Use recv().await instead of try_recv as Alice mentioned.

Your problem currently is that you aren't keeping the server future alive, you spawn a bunch of tasks and never await them. So the channels get dropped and then everything exits.

If we make both of those changes, and slightly adjust how start_server works, things start working.

use std::{future::Future, process::Stdio};

use tokio::{
    io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter},
    process::{Child, ChildStdin, ChildStdout, Command},
    sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
};

struct LSPClient {
    _process: Child,
}

#[tokio::main]
async fn main() {
    // The spawn here wasn't doing anything for you.
    trigger().await;
}

async fn trigger() {
    let (_client, tx_send, _tx_recv, wait) = start_server();

    tx_send.send("initialize".as_bytes().to_vec()).ok();

    // Wait for the handlers to exit. Currently this will never happen
    wait.await
}

fn start_server() -> (
    LSPClient,
    UnboundedSender<Vec<u8>>,
    UnboundedSender<Vec<u8>>,
    impl Future<Output = ()>,
) {
    let mut process = Command::new("/home/suryateja/.config/mystudio-ide/lsp/rust-analyzer")
        .stdin(Stdio::piped())
        .stdout(Stdio::piped())
        .stderr(Stdio::piped())
        .kill_on_drop(true)
        .spawn()
        .expect("Unable to spawn process");

    let proc_id = process.id().expect("Unable to fetch process id");
    println!("proc id: {}", proc_id);

    let stdin = process.stdin.take().unwrap();
    let stdout = process.stdout.take().unwrap();
    // let stderr = process.stderr.take().unwrap();

    let writer = BufWriter::new(stdin);
    let reader = BufReader::new(stdout);

    let (tx_send, rx_send) = unbounded_channel::<Vec<u8>>();
    let (tx_recv, rx_recv) = unbounded_channel::<Vec<u8>>();

    println!("setting up tx,rx");
    // This future waits for handlers to exit, we don't want to await it here.
    // Return it instead so the caller can await it.
    let wait = setup_listeners(reader, writer, rx_send, rx_recv);

    // Send initialize request
    println!("initialize request");
    tx_send.send("initialize".as_bytes().to_vec()).ok();
    let client = LSPClient { _process: process };

    (client, tx_send, tx_recv, wait)
}

async fn setup_listeners(
    mut reader: BufReader<ChildStdout>,
    mut writer: BufWriter<ChildStdin>,
    mut rx_send: UnboundedReceiver<Vec<u8>>,
    mut rx_recv: UnboundedReceiver<Vec<u8>>,
) {
    let handle_recv = tokio::spawn(async move {
        loop {
            // Wait until a message is available instead of constantly polling for a message.
            match rx_recv.recv().await {
                Some(data) => {
                    println!("rx_recv got: {:?}", String::from_utf8(data));

                    let mut buf = String::new();
                    let _ = reader.read_line(&mut buf);
                }
                None => {
                    println!("rx_recv: quitting loop");
                    break;
                }
            }
        }
    });

    let handle_send = tokio::spawn(async move {
        loop {
            // Wait until a message is available instead of constantly polling for a message.
            match rx_send.recv().await {
                Some(data) => {
                    println!("rx_send: got something");
                    let str = String::from_utf8(data).expect("invalid data for parse");
                    println!("rx_send: sending to LSP: {:?}", str);

                    _ = writer.write_all(str.as_bytes());
                }
                None => {
                    println!("rx_recv: quitting loop");
                    break;
                }
            }
        }
    });

    // Wait for a handler to exit. You already spawned the handlers, you don't need to spawn them again.
    // I'm using select instead of join so we can see any errors immediately.
    tokio::select! {
       send = handle_send => println!("{send:?}"),
       recv = handle_recv => println!("{recv:?}"),
    };
}

Waiting on the handlers isn't the only option, but it was the simplest way to get your example working.