Seems like await not waiting for packet is completely read

Hi folks,

I have a trouble with my stream reader. It seems like it reads packet bypassing .await.

Below is the part of my code.

This is where I read the packet:

async fn read_packet(reader: &Arc<Mutex<Option<Reader>>>) -> Result<IncomingPacket, Error> {
    let error = Error::new(ErrorKind::NotFound, "Not connected to TCP");

    if let Some(reader) = &mut *reader.lock().await {
        let result = reader.read().await;
        return match result {
            Ok(packet) => Ok(packet),
            Err(err) => Err(err),
        };
    }

    Err(error)
}

fn handle_read(
    &mut self,
    mut signal_receiver: Receiver<Signal>,
    query_sender: BroadcastSender<HandlerOutput>,
    notify: Arc<Notify>,
) -> JoinHandle<()> {
    let reader = Arc::clone(&self._reader);
    let session = Arc::clone(&self.session);
    let client_flags = Arc::clone(&self._flags);
    let data_storage = Arc::clone(&self.data_storage);

    tokio::spawn(async move {
        loop {
            tokio::select! {
                _ = signal_receiver.recv() => {},
                result = Self::read_packet(&reader) => {
                    // ...
                },
            }
        }
    })
}

this is how my reader looks like:

pub struct Reader {
    _stream: BufReader<OwnedReadHalf>,
    // ...
}

impl Reader {
    pub fn new(reader: OwnedReadHalf) -> Self {
        Self {
            _stream: BufReader::new(reader),
            // ...
        }
    }

    pub fn init(&mut self, session_key: &[u8], warden_crypt: Arc<SyncMutex<Option<WardenCrypt>>>) {
        self._decryptor = Some(Decryptor::new(session_key));
        self._warden_crypt = warden_crypt;
        self._need_sync = true;
    }

    pub async fn read(&mut self) -> Result<IncomingPacket, Error> {
        let (opcode, body) = if let Some(decryptor) = self._decryptor.as_mut() {
            // ...
        } else {
            let opcode = self._stream.read_u8().await?;
            let body = match opcode {
                Opcode::LOGIN_CHALLENGE => {
              LoginChallengeResponse::from_stream(&mut self._stream)
                  .await
                  .map_err(|e| Error::new(std::io::ErrorKind::Other, e))?
          },
                Opcode::LOGIN_PROOF => {
                    LoginProofResponse::from_stream(&mut self._stream)
                        .await
                        .map_err(|e| Error::new(std::io::ErrorKind::Other, e))?
                },
                Opcode::REALM_LIST => {
                    RealmlistResponse::from_stream(&mut self._stream)
                        .await
                        .map_err(|e| Error::new(std::io::ErrorKind::Other, e))?
                },
                _ => vec![],
            };

            (opcode as u16, body)
        };

        Ok(IncomingPacket { opcode, body })
    }
}

what actually happening. After my Reader read the opcode:

let opcode = self._stream.read_u8().await?;

and got first Serializer to use it for reading from stream:

Opcode::LOGIN_CHALLENGE => {
    LoginChallengeResponse::from_stream(&mut self._stream)
       .await
       .map_err(|e| Error::new(std::io::ErrorKind::Other, e))?
},

it continue read opcodes. So, before I read next opcode I got few bytes read by:

let opcode = self._stream.read_u8().await?;

which is not what I expect to have. I put println! into from_stream function and after let opcode = self._stream.read_u8().await?;. The actual output is:

[SUCCESS]: Connected to 127.0.0.1:3724
[SEND]: LOGIN_CHALLENGE as BOT1
[SEND]: LOGIN_CHALLENGE: 38 bytes sent
OPCODE: 0
FROM STREAM !
OPCODE: 186
OPCODE: 163
OPCODE: 30
OPCODE: 153
[RECV]: LOGIN_CHALLENGE
[DEBUG]: Session key created
OPCODE: 160
[RECV]: MSG_MOVE_STOP_STRAFE
[RECV]: CMSG_CHANNEL_INVITE
[RECV]: SMSG_REFER_A_FRIEND_EXPIRED
[RECV]: SMSG_CHANNEL_NOTIFY
OPCODE: 11
[SEND]: LOGIN_PROOF: 75 bytes sent
[RECV]: CMSG_CHANNEL_UNMODERATOR
[RECV]: SMSG_ZONE_MAP
OPCODE: 33
OPCODE: 87
OPCODE: 252
OPCODE: 55
[RECV]: CMSG_BEASTMASTER
[RECV]: CMSG_ITEM_QUERY_MULTIPLE
OPCODE: 63
[RECV]: CMSG_COMPLETE_CINEMATIC
[RECV]: CMSG_CHAR_ENUM
[RECV]: SMSG_TRANSFER_PENDING
OPCODE: 179
OPCODE: 105
OPCODE: 205
OPCODE: 210
OPCODE: 241
[RECV]: SMSG_GAMEOBJECT_CUSTOM_ANIM
[RECV]: CMSG_ADD_FRIEND
[RECV]: MSG_MOVE_SET_RUN_SPEED
[RECV]: MSG_MOVE_SET_SWIM_SPEED_CHEAT
OPCODE: 0
[RECV]: MSG_MOVE_KNOCK_BACK
FROM STREAM !

but I expect to get smth like:

[SUCCESS]: Connected to 127.0.0.1:3724
[SEND]: LOGIN_CHALLENGE as BOT1
[SEND]: LOGIN_CHALLENGE: 38 bytes sent
OPCODE: 0
FROM STREAM !
[RECV]: LOGIN_CHALLENGE
[DEBUG]: Session key created
OPCODE: 1
[SEND]: LOGIN_PROOF: 75 bytes sent

Could somebody explain what is wrong in my code and how to fix ?

Your Reader::read() is not cancellation safe -- if it has read an opcode and then its future is dropped (cancelled), such as by the select!, then the next read will restart trying to read an opcode again, leading to misinterpretation of the input stream.

To fix this, you should dedicate an async task to nothing but reading the stream and processing the input -- don't let anything interrupt it but closing the stream. You already have a spawned loop, but it has a select! inside it. Get rid of the select!; handle whatever signal_receiver is at a later stage of your program. For example, the spawned task could send all the IncomingPackets on a channel, and then it's fine to select on that channel and other things.

5 Likes

I didn't read your code carefully, but it looks like a classic "select! cancels futures" problem. The issue is, the select! macro polls the futures in all branches while they are pending, but as soon as one of the futures becomes ready, all other futures are dropped. This means that your Self::read_packet function will be polled a few times until signal_receiver.recv() resolves, reading a part of the input, but will be dropped losing remaining data if the signal is received. See the section on cancellation safety for more details.

The solution is to turn your reading future into the one that can be safely dropped and recreated, like this:

async move {
    let mut fut = Self::read_packet(&reader);
    loop {
        tokio::select! {
            _ = signal_receiver.recv() => {},
            result = &mut fut => {
                // ...
            },
        }
    }
}

This way the state of the future is preserved between different select! calls. If you need to read input multiple times, you can wrap all of that into another loop. Alternatively, you can use some higher-level combinators on futures, but I can't recommend anything here without knowing more about the intended handling of the signal.

4 Likes

I use this approach, because my app (which is actually a client) will reconnect to another server. This approach was suggested me here. In few words, for first time my app connects to login server, passes auth steps and next connects to one of the world servers.

Probably this can be implemented in better way ?

Regarding signal_receiver. Message for it sends on reconnect only. So, for my current case it not affects existing behavior:

tokio::select! {
    _ = signal_receiver.recv() => {
        // this line will be not reached in my case, 
        // because I do not reconnected yet
        println!("SIGNAL !");
    },
    result = Self::read_packet(&reader) => {
        match result {
            Ok(packet) => {
                // ...
            },
            Err(err) => {
                // ...
            }
        }
    },
}

Well, if it's not cancellation, then the most likely problem I would expect is incorrectly parsing the input stream.

I would suggest recording the actual bytes you got from the server (you'll need to wrap the stream or something to make sure you get all bytes accurately), and then parsing them "by hand" and looking for the very first moment your code deviated from the intended parse.

I say this because your wrong output has the flavor of a parser getting desynchronized from the input, so that it is taking random bytes as opcodes.

1 Like

could you clarify, what do you mean by "wrapping" the stream ? I mean, in general I understand what it should be, but it's not clear how to implement this correctly. Could you provide me with a little code example ?

You'd replace the OwnedReadHalf with another type that contains it, and has the extra behavior. You're using Tokio, right? It looks like tokio_util::io::InspectReader can be an appropriate wrapper already.

impl Reader {
    pub fn new(reader: OwnedReadHalf) -> Self {
        let wrapped = tokio_util::io::InspectReader(
            read_half, 
            |bytes| {
                eprintln!("read {bytes:?}");
            }
        );

        Self {
            _stream: BufReader::new(wrapped), // You'll have to change the field type
            // ...
        }
    }

Or instead of just printing them textually it might be more useful to write the bytes to a file as-is and you can view them with a hex-dumper or whatever. Up to you.

2 Likes
pub struct Reader {
    _stream: InspectReader<BufReader<OwnedReadHalf>, fn(&[u8])>,
    _decryptor: Option<Decryptor>,
    _warden_crypt: Arc<SyncMutex<Option<WardenCrypt>>>,
    _need_sync: bool,
}

impl Reader {
    pub fn new(reader: OwnedReadHalf) -> Self {
        let buf_reader = BufReader::new(reader);
        let inspect_reader = InspectReader::new(buf_reader, |bytes| println!("READ: {bytes:?}"));

        Self {
            _stream: inspect_reader,
            _decryptor: None,
            _warden_crypt: Arc::new(SyncMutex::new(None)),
            _need_sync: false,
        }
    }

    // ...

when I compile the code above, I got an error:

Type mismatch [E0308] 
expected `InspectReader<BufReader<OwnedReadHalf>, fn(&[u8])>`, 
but found `InspectReader<BufReader<OwnedReadHalf>, fn(&[u8])>`

expected and found are same, you tell what does this error mean?

Trouble with the lifetime on the fn parameter, probably. Try constructing the closure with an explicit type:

let inspect_fn: fn(&[u8]) = |bytes| println!("READ: {bytes:?}");
let inspect_reader = InspectReader::new(buf_reader, inspect_fn);
1 Like

this helps, thank you. But now I have another error on compile:

error[E0308]: mismatched types
  --> src/primary/client/auth/login_proof.rs:18:14
   |
18 |     #[derive(LoginPacket, Serialize, Deserialize, Debug)]
   |              ^^^^^^^^^^^ one type is more general than the other
   |
   = note: expected mutable reference `&mut InspectReader<_, fn(&_)>`
              found mutable reference `&mut InspectReader<_, for<'a> fn(&'a _)>`

and this is my trait which I changed for testing:

#[async_trait]
pub trait StreamReader {
    async fn read_from(stream: &mut InspectReader<BufReader<OwnedReadHalf>, fn(&[u8])>) -> Result<Self, FieldError>
        where Self: Sized;
}

#[async_trait]
impl StreamReader for u8 {
    async fn read_from(stream: &mut InspectReader<BufReader<OwnedReadHalf>, fn(&[u8])>) -> Result<Self, FieldError>
        where Self: Sized,
    {
        stream.read_u8().await.map_err(|e| FieldError::CannotRead(e, "u8".to_string()))
    }
}

// ...

in my macros (where error actually occured) I changed the method which I use for reading from stream:

// ...
let async_initializers = fields
        .iter()
        .map(|f| {
            let field_name = f.ident.clone();
            let field_type = f.ty.clone();

            if dynamic_fields.contains(&field_name) {
                let async_field_name = format_ident!("async_{}", field_name.unwrap());
                quote!{ Self::#async_field_name(&mut stream, &mut cache).await }
            } else {
                quote! {
                    {
                        let value: #field_type = #stream_reader::read_from(&mut stream).await?;
                        cache.#field_name = value.clone();
                        value
                    }
                }
            }
        });

// ...
let output = quote! {
    impl #ident {
        // ...

        pub async fn from_stream(mut stream: &mut #inspect_reader<#buf_reader<#owned_read_half>, fn(&[u8])>) -> #result<Vec<u8>>
        {
            let mut cache = Self {
                #(#field_names: Default::default()),*
            };

            let mut instance = Self {
                #(#field_names: #async_initializers),*
            };

            Ok(instance._build_body()?)
        }
    }
};

and this is a struct where I applied the macros:

with_opcode! {
    @login_opcode(Opcode::LOGIN_PROOF)
    #[derive(LoginPacket, Serialize, Deserialize, Debug)]
    pub struct LoginChallengeResponse {
        unknown: u8,
        code: u8,
        #[serde(serialize_with = "crate::primary::serializers::array_serializer::serialize_array")]
        server_ephemeral: [u8; 32],
        g_len: u8,
        #[dynamic_field]
        g: Vec<u8>,
        n_len: u8,
        #[dynamic_field]
        n: Vec<u8>,
        #[serde(serialize_with = "crate::primary::serializers::array_serializer::serialize_array")]
        salt: [u8; 32],
    }

    impl LoginChallengeResponse {
        fn g<R: BufRead>(mut reader: R, cache: &mut Self) -> Vec<u8> {
            let mut buffer = vec![0u8; cache.g_len as usize];
            reader.read_exact(&mut buffer).unwrap();
            buffer
        }

        async fn async_g(stream: &mut InspectReader<BufReader<OwnedReadHalf>, fn(&[u8])>, cache: &mut Self) -> Vec<u8>
        {
            let mut buffer = vec![0u8; cache.g_len as usize];
            stream.read_exact(&mut buffer).await.unwrap();
            buffer
        }

        fn n<R: BufRead>(mut reader: R, cache: &mut Self) -> Vec<u8> {
            let mut buffer = vec![0u8; cache.n_len as usize];
            reader.read_exact(&mut buffer).unwrap();
            buffer
        }

        async fn async_n(stream: &mut InspectReader<BufReader<OwnedReadHalf>, fn(&[u8])>, cache: &mut Self) -> Vec<u8>
        {
            let mut buffer = vec![0u8; cache.n_len as usize];
            stream.read_exact(&mut buffer).await.unwrap();
            buffer
        }
    }
}

could you please tell what can be wrong here ?

P.S. just in case this is my imports:

pub struct Imports {
    pub async_read: TokenStream2,
    pub binary_converter: TokenStream2,
    pub buf_read: TokenStream2,
    pub byteorder_be: TokenStream2,
    pub byteorder_le: TokenStream2,
    pub byteorder_write: TokenStream2,
    pub cursor: TokenStream2,
    pub deflate_decoder: TokenStream2,
    pub hash_map: TokenStream2,
    pub json_formatter: TokenStream2,
    pub read: TokenStream2,
    pub result: TokenStream2,
    pub serialize: TokenStream2,
    pub stream_reader: TokenStream2,
    pub buf_reader: TokenStream2,
    pub inspect_reader: TokenStream2,
    pub owned_read_half: TokenStream2,
}

impl Imports {
    pub fn get() -> Self {
        Self {
            async_read: quote!(tokio::io::AsyncRead),
            binary_converter: quote!(crate::traits::BinaryConverter),
            buf_read: quote!(tokio::io::AsyncBufRead),
            byteorder_be: quote!(byteorder::BigEndian),
            byteorder_le: quote!(byteorder::LittleEndian),
            byteorder_write: quote!(byteorder::WriteBytesExt),
            cursor: quote!(std::io::Cursor),
            deflate_decoder: quote!(flate2::read::DeflateDecoder),
            hash_map: quote!(std::collections::HashMap),
            json_formatter: quote!(idewave_formatters::JsonFormatter),
            // TODO: need to reorganize constants
            read: quote!(std::io::Read),
            result: quote!(anyhow::Result),
            serialize: quote!(serde::Serialize),
            stream_reader: quote!(crate::primary::traits::StreamReader),
            buf_reader: quote!(tokio::io::BufReader),
            inspect_reader: quote!(tokio_util::io::InspectReader),
            owned_read_half: quote!(tokio::net::tcp::OwnedReadHalf),
        }
    }
}

Sorry, this is too complex for me to have any further suggestions. If you could reduce your type-error to something self-contained enough to test on https://play.rust-lang.org/ then I could give it another look.

2 Likes

do you know if there any way to run proc-macro in that sandbox or I should create test repo for such case ?

You cannot define proc macros in the Playground, but macros can't cause type errors that couldn't happen without macros. You should reduce the problematic code to an example that does not contain a macro — not just because it is necessary here, but because it is useful to reduce programs to improve understanding. I'll be able to understand it better, and so will you, as you discover what is not relevant to the problem and therefore removable.

2 Likes

Could you check why I get the error here:

error[E0599]: the method `read_u8` exists for mutable reference `&mut InspectReader<R, fn(&[u8])>`, but its trait bounds were not satisfied
  --> src/main.rs:29:16
   |
29 |           stream.read_u8().await.map_err(|e| FieldError::CannotRead(e, "u8".to_string()))
   |                  ^^^^^^^ method cannot be called on `&mut InspectReader<R, fn(&[u8])>` due to unsatisfied trait bounds
   |
  ::: /playground/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-util-0.7.10/src/io/inspect.rs:9:1
   |
9  | / pin_project! {
10 | |     /// An adapter that lets you inspect the data that's being read.
11 | |     ///
12 | |     /// This is useful for things like hashing data as it's read in.
...  |
17 | |     }
18 | | }
   | | -
   | | |
   | |_doesn't satisfy `InspectReader<R, fn(&'life1 [u8])>: AsyncReadExt`
   |   doesn't satisfy `InspectReader<R, fn(&'life1 [u8])>: AsyncRead`
   |
   = note: the following trait bounds were not satisfied:
           `InspectReader<R, fn(&'life1 [u8])>: AsyncRead`
           which is required by `InspectReader<R, fn(&'life1 [u8])>: AsyncReadExt`
           `&'life0 mut InspectReader<R, fn(&'life1 [u8])>: AsyncRead`
           which is required by `&'life0 mut InspectReader<R, fn(&'life1 [u8])>: AsyncReadExt`

You over-complicated your trait signature by putting InspectReader in it. The whole point of InspectReader is that it also implements AsyncRead, so you don't need to bake that into the trait. I changed the trait StreamReader and its impls to use this signature:

async fn read_from<R>(stream: &mut R) -> Result<Self, FieldError>
     where Self: Sized, R: AsyncBufRead + Unpin + Send;

and now the code compiles. No need to get the fn type involved.

Note also that I removed R: AsyncRead; it's unnecessary because AsyncRead is a supertrait of AsyncBufRead.

2 Likes

this helped, thank you. But now I have another error:

error[E0308]: mismatched types
   --> src/main.rs:105:44
    |
105 |         let response = Packet::from_stream(&self._stream).await;
    |                        ------------------- ^^^^^^^^^^^^^ types differ in mutability
    |                        |
    |                        arguments to this function are incorrect
    |
    = note: expected mutable reference `&mut _`
                       found reference `&InspectReader<tokio::io::BufReader<tokio::net::tcp::OwnedReadHalf>, for<'a> fn(&'a [u8])>`
note: associated function defined here
   --> src/main.rs:84:18
    |
84  |     pub async fn from_stream<R>(stream: &mut R) where R: AsyncBufRead + Unpin + Send {
    |                  ^^^^^^^^^^^    --------------

this is updated sandbox.

Could you please check ?

Look at the very outermost part of the types in the error message. I'll remove the details to highlight the important parts:

    = note: expected mutable reference `&mut _`
                       found reference `&_`

You used &self._stream where you need &mut self._stream.

1 Like

oh, I thought if self passed by &mut reference then &self._stream should be also mut. Well, this error I fixed and now I have another error:

error[E0277]: the trait bound `InspectReader<tokio::io::BufReader<tokio::net::tcp::OwnedReadHalf>, for<'a> fn(&'a [u8])>: AsyncBufRead` is not satisfied
   --> src/main.rs:105:44
    |
105 |         let response = Packet::from_stream(&mut self._stream).await;
    |                        ------------------- ^^^^^^^^^^^^^^^^^ the trait `AsyncBufRead` is not implemented for `InspectReader<tokio::io::BufReader<tokio::net::tcp::OwnedReadHalf>, for<'a> fn(&'a [u8])>`
    |                        |
    |                        required by a bound introduced by this call
    |
    = help: the following other types implement trait `AsyncBufRead`:
              Box<T>
              tokio_util::io::StreamReader<S, B>
              Either<L, R>
              tokio::io::Empty
              std::io::Cursor<T>
              tokio::io::Take<R>
              tokio::io::util::chain::Chain<T, U>
              tokio::io::BufWriter<W>
            and 5 others
note: required by a bound in `Packet::from_stream`
   --> src/main.rs:84:58
    |
84  |     pub async fn from_stream<R>(stream: &mut R) where R: AsyncBufRead + Unpin + Send {
    |                                                          ^^^^^^^^^^^^ required by this bound in `Packet::from_stream`

here is updated sandbox.

Ah, I was wondering about that, but there just wasn't enough code to get to that problem. BufReader implements AsyncBufRead, but InspectReader doesn't also implement AsyncBufRead — it just passes bytes through and doesn't keep a buffer. So you should simply reverse the order of the wrappers:

pub struct Reader {
    _stream: BufReader<InspectReader<OwnedReadHalf, fn(&[u8])>>, // reversed
}

impl Reader {
    pub fn new(reader: OwnedReadHalf) -> Self {
        let inspect_fn: fn(&[u8]) = |bytes| println!("READ: {bytes:?}");
        Self {
            _stream: BufReader::new(InspectReader::new(reader, inspect_fn)), // reversed
        }
    }
2 Likes

thank you very much, this helped. Now I will debug the packets !

1 Like