Calling shutdown on resolution of Tokio Copy future

I'm writing a proxy based heavily on the this example in the Tokio 0.2 alpha repo. My version is designed to work in pairs (forming a tunnel). It works, but leaves sockets open after each client/server session happens. This is because when the client or server closes its connection to the proxy, that doesn't "propagate" from that proxy to its peer.

In its current form, the proxy works by copying between ReadHalf and WriteHalf objects like so:

try_join(
     client_reader.copy(&mut server_writer),
     server_reader.copy(&mut client_writer),
)
.await?;

What I'm trying to do instead is something like:

try_join(
    client_reader.copy(&mut server_writer).and_then(|r| {
          server_writer.shutdown();
     }),
     server_reader.copy(&mut client_writer).and_then(|r| {
         client_writer.shutdown();
     }),
        )
     .await?;

I've tried several variations, but I can't make work. Passing server_writer to copy() and using it in the closure constitute multiple mutable borrows, which the borrow checker. I tried using Arc to make clones, but that gave more involved compile errors, which can show if needed.

Am I on the right track here? Maybe there's another approach that I'm not seeing.

In my program, I use Sender's and Receiver's to communicate with individual streams. There is an outbound_rx, which receives data from a clonable sender for sending data outbound, and then there is an inbound_sink, which takes inbound packets and forwards them to a receiver elsewhere in the program. Finally, there is a dc_rc_tube_rx which receives byte-sized signals which, as you can see in the code, stops the stream from running from elsewhere in the program:

    use futures::sync::mpsc::{UnboundedReceiver, UnboundedSender, Receiver};
    use futures::{Future, Stream, Sink};


    use crate::packet::misc::ConnectError;
    use tokio::codec::Framed;
    use crate::prelude::Base64Codec;
    use hyxe_netdata::packet::RawInboundPacket;
    use tokio::prelude::{AsyncRead, AsyncWrite};
    use hyxe_netdata::connection::NetStreamMetadata;

    /// Useful for toggling between Input types
    pub type OutboundItem = Vec<u8>;

    /// Useful for toggling between Output types
    pub type RawInboundItem = RawInboundPacket;

     /// `stream`: The data source to listen upon
    /// `outbound_rx`: The receiver which then sends information across the stream
    /// `inbound_sink`: The location for sending information after reception from stream
    /// `dc_rc_tube_rx`: The receiver for kill signals
    pub fn base64<'cxn, 'a: 'cxn, S: 'a + AsyncRead + AsyncWrite>(stream: S, metadata: NetStreamMetadata, outbound_rx: UnboundedReceiver<OutboundItem>, inbound_sink: UnboundedSender<RawInboundItem>, dc_rc_tube_rx: Receiver<u8>) -> impl Future<Item=(), Error=ConnectError> + 'cxn {
        let framed = Framed::<S,Base64Codec<OutboundItem>>::new(stream, Base64Codec::new(metadata));

        let (outbound_sink, inbound_stream) = framed.split();

        let inbound = inbound_stream.map_err(|err| ConnectError::Generic(err.to_string()))
            .forward(inbound_sink.sink_map_err(|err| ConnectError::Generic(err.to_string())));

        let outbound = outbound_rx.map_err(|_| ConnectError::Generic("Outbound R/X not working".to_string())).forward(outbound_sink.sink_map_err(|err| ConnectError::Generic(err.to_string())));

        let stopper = dc_rc_tube_rx.map_err(|_| ConnectError::Generic("[Error] DC Tube Error (unknown)".to_string())).for_each(move |cmd| {
            match cmd {
                super::super::STREAM_SHUTDOWN => {
                    println!("[AsyncStreamHandler] Shutting down stream");
                    Err(ConnectError::Shutdown)
                },

                super::super::STREAM_RESTART => {
                    println!("[AsyncStreamHandler] Restarting stream");
                    // TODO: Handle restart signal
                    Err(ConnectError::Restart)
                },

                _ => {
                    println!("[AsyncStreamHandler] unknown command!");
                    Ok(())
                }
            }
        }).map_err(|err| err);


        inbound.select2(outbound).map(|_| ()).select2(stopper).map(|_| ()).and_then(|_| {
            //insert shutdown expression here (one possible location)
            Ok(())
        }).map_err(|_| ConnectError::Generic("Stream ending".to_string()))
    }

I use a custom base64 codec, but you can used one of tokio's codecs in its place just fine.

For performance reasons, the inbound_sink receiver is shared for all possible streams to unify packets globally into a single place. From there, the packets are asynchronously handled

That suggests a slightly different approach, at least with respect to waiting for the future completions Mine has try_join(), which waits for both to complete, and doesn't account for any "coupling" between the two of them. What's needed is for the resolution of one future to trigger an action that hastens the resolution of the other one. So maybe a select() models that more accurately.

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.