Passing a writer half

I'm trying to pass the writer half of a socket, within a Mutex in an Arc, to an async function, but I can't figure out what the parameter type should be. The compiler tells me I'm using wrong type, but I've tried just copy'n'paste:ing the type it suggest, and at some point I get an error which states that I'm attempting to use a private name.

// dependencies:
// tokio = { version = "0.2.6", features = ["full"] }

use std::{
  sync::{Arc,Mutex}
};

//use tokio::prelude::*;
use tokio::io;
use tokio::spawn;
use tokio::net::{TcpListener, TcpStream};
use tokio::io::WriteHalf;
//use tokio::net::tcp::WriteHalf;
use tokio::io::AsyncBufReadExt;

#[tokio::main]
async fn main() -> io::Result<()> {
  let mut listener = TcpListener::bind("127.0.0.1:8080").await?;
  loop {
    let (socket, _) = listener.accept().await?;
    spawn(handle_connection(socket));
  }
}

async fn handle_connection(socket: TcpStream) -> io::Result<()> {
  let (mut recv, mut send) = io::split(socket);
  let reader = io::BufReader::new(recv);
  let mut lines = reader.lines();

  let writer = io::BufWriter::new(send);
  let writer = Arc::new(Mutex::new(writer));

  while let Some(line) = lines.next_line().await? {
    let fut = echo(line.clone(), Arc::clone(&writer));
    spawn(fut);
  }

  Ok(())
}

async fn echo(line: String,
      writer: Arc<Mutex<io::BufWriter<WriteHalf<TcpStream>>>>) {
  let mut wsock = writer.lock().unwrap();
  wsock.write(&line.as_bytes()).await;
}

/* vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :*/

What's the correct type for the writer parameter in echo?

use tokio::io::AsyncWriteExt; fixes the private name.
wsock is a MutexGuard so first try deref (&mut *wsock).write(. Then errors with not Send.
What is likely needed is to use a async Mutex instead.
https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=0114f21fa8673eb5beecf1cf102983e5

1 Like

The code on the playground builds, but when I run it it panics (in main's block_on(async { panic!() }). I'm not quite sure about the logic here -- it is supposed to run panic!() in an async block if the tokio runtime fails to initialize? If so, would this likely be a platform-specific issue?

If switch back to using tokio::main then the program appears to run, but while I can connect to it using a TCP client, and I can read data coming in the from the client -- when I write something it doesn't appear to ever reach the client.

I added a bunch of trace outputs, and it the echo async function is running, and the await'ed write line does pass.

As a client I'm using:

#!/usr/bin/env python3

import socket

HOST = '127.0.0.1'  # The server's hostname or IP address
PORT = 8080        # The port used by the server

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
  s.connect((HOST, PORT))
  s.sendall(b'Hello, world\n')
  data = s.recv(1024)

print('Received', repr(data))

# vim: set ft=python et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

If I go back to the non-tokio::main code, where the runtime is explicitly initialized, and I simply remove the block_on() part, then I get the same behavior as if I use the tokio::main version. I.e. data appears to arrive at the server, but data written from the server never reaches the client.

Update: My primary development platform is macOS, I tried this on a Linux system as well and got the same results. It doesn't appear that the data written in the echo function actually finds its way to the wire even though the write's await passes. I also replaced the write with a write_all, which shouldn't be necessary for such a simple case, but I did anyway and there wasn't any difference.

Update2: Culprit is BufWrite. If I go for "unbuffered" writes then it works fine. Don't know yet if there's something about BufWrite I don't understand or if this is a bug.

Well, it's buffered, so maybe you need to flush it in time?

1 Like

Hmm... I see what you're saying. I'm so used to libevent and its evbuffers that I have a hard time wrapping my head around what BufWriter would actually do.

In libevent one writes data to an evbuffer. The internals use platform specific callbacks (kqueue, epoll, etc) to determine when more data can be written. If there's data to be written, it takes data off the associated evbuffer and writes it to the socket.

I blindly assumed that using a BufWriter in this context would yield the same functionality as an evbuffer in libevent. I.e. mio reports that data can be written, and the tokio runtimes finds that a BufWriter was used to write so it automatically pulls from its buffers.

So "flush" in BufWriter is essentially like "commit" -- saying "whatever I have written is really what I want to write to the inner writer?

Update: Indeed. Need to manually flush after writes. I think I understand why as well.

(I now see macro tokio::main works on playground, it wasn't enabled in the past, so workaround. panic! was just added to avoid waiting for playgrounds timeout.)

1 Like