Rants: An async NATS client library

Rants is an async NATS client library. It uses async/await syntax and the new async ecosystem.

I was impressed with how straightforward it was to setup the infrastructure for directly dealing with NATS client protocol messages (opposed to low-level parsing details). I used nom to write the underlying parser and integrated it into a custom tokio::codec::Decoder. The new codec provides a high-level stream for dealing with all incoming messages from the NATS server. It was a lot of fun to work through this process and seems like a very powerful set of tools for abstraction.

Any feedback on the code itself or potential use of the library would be much appreciated, thanks!

1 Like

That looks great.

It's not clear to me how I would use all that async/await/tokio stuff from our "regular" code and it's threads. Any hints?

We are looking to collect data published by remote devices. We really need TLS for that.

You could use the current_thread runtime. For example to run each future in the example individually, you could do something like:

use env_logger;
use futures::stream::StreamExt;
use rants::Client;
use tokio::runtime::current_thread::Runtime;

fn main() {
    env_logger::init();
    let mut rt = Runtime::new().expect("to create Runtime");

    // A NATS server must be running on `127.0.0.1:4222`
    let address = "127.0.0.1:4222".parse().unwrap();
    let client = Client::new(vec![address]);

    // Configure the client to receive messages even if it sent the message
    rt.block_on(client.connect_mut()).echo(true);

    // Connect to the server
    rt.block_on(client.connect());

    // Create a new subject called "test"
    let subject = "test".parse().unwrap();

    // Subscribe to the "test" subject
    let (_, mut subscription) = rt.block_on(client.subscribe(&subject, 1024)).unwrap();

    // Publish a message to the "test" subject
    rt.block_on(client.publish(&subject, b"This is a message!"))
        .unwrap();

    // Read a message from the subscription
    let message = rt.block_on(subscription.next()).unwrap();
    let message = String::from_utf8(message.into_payload()).unwrap();
    println!("Received '{}'", message);

    // Disconnect from the server
    rt.block_on(client.disconnect());
}

TLS support is definitely on the roadmap.