How to organize a client app using async? Please, suggest materials

I'm writing an app that talks to a server over TCP connection, and the API is that we exchange binary packets. For update request, there's a packet size limit, so it must be split in multiple ones. For every packet, the client must wait for the acknoledgement packet in response. But the server also sends other updates of its state, so the client has to do this concurrently.

This looks like a good fit for async, but I can't wrap my head around this. How can I write an intermediate library so that it puts all these procedures (send, then wait for a specific packet response) in a future?

const MAX_ITEMS_IN_PACKET = 100;
async fn do_update(client: &mut Client, mut items: Vec<Item>) {
	while items.len() > 0 {
		let pack: Vec<Item> = items.drain(0..MAX_ITEMS_IN_PACKET).collect();
		let status = client
              .upload(UploadPacket { pack, request_id: 1 })
              .await; // MAGIC HERE
		if status.is_err() {
			panic!()
		}
	}
}

struct UploadPacket {
	request_id: u16,
	pack: Vec<Item>
}

Behind this line, let status = client.upload(pack).await; a lot should happen:

  • Client must send a packet, and store a request id (the server response to this packet will contains this request id, to identify),
  • when Client notices a packet with request id in some waiting list, it puts the response in that specific future, and the caller will get the awaited return value

Right now, trying to code this in blocking code, I know what to do, but the code that calls Client is big and split across callbacks -- and it must have more objects, to store the state.

struct Uploader {
	upload_list: Vec<Item>,
	last_request_id: u16,
}

impl Uploader {
	fn new(client: &mut Client, mut items: Vec<Item>) -> 
               anyhow::Result<Self> {
		let pack = items.drain(0..MAX_ITEMS_IN_PACKET).collect();
		client.send(UploadPacket { pack, request_id: 1 })?;
		Self { upload_list: items, last_request_id: 1 }
	}

	fn handle_packets(&mut self, client: &mut Client, packet: &IncomingPacket) -> 
             Result<Subscription> {
		if packet.request_id != self.last_request_id {
            return Ok(Subscription::Stay)
        }
		if self.upload_list.len() == 0 { return Ok(Subscription::Unsubscribe) }
		let pack = self.upload_list.drain(0..MAX_ITEMS_IN_PACKET).collect();
		self.last_request_id += 1;
		client.send(UploadPacket { pack, request_id: self.last_request_id })?;
		Ok(Subcsription::Stay)
	}
}

enum Subscription { Stay, Unsubscribe }

So, async makes sense in this context. But I can't understand how to organize and operate futures to accomplish this.

I tried duckduckgoing (googling), but I get generic materials in response. Please suggest what to read on this.

make a future for the reply only, you generate a request and the future for what to do once you get the reply, give both to a task that handles networking. the task stores the two in a queue and sends the request, once it gets a reply it gets the corresponding future and tells the executor to run it

I'm not networking expert by any means, but why do you need to wait for acknowledgement for each packet? TCP already provides reliability guarantees. What protocol are you implementing on top of TCP? Maybe you want to use UDP as a transport layer instead.

As to your question I would recommend starting with something simple. Actor pattern is easy to implement and reason about. You can see this famous blog post if you have not encountered it before: Actors with Tokio by Alice Ryhl.

Thanks for the link! The acnoledgement is not about TCP, it's whether the data could be saved correctly, and to see if the client app sent something unacceptable.

Does the app open a single connection to the server, or many connections?

What is the client? Is this the app? Is it some state, passively sitting around? Is a client a thread? Or an async task?

What to do with the other updates? Where should they go?

Sounds to me like a multiplexer or some sort of router inside the application?

First of all, make sure you understand what problem you're trying to solve. Don't make the mistake of taking a tool (async), searching for a solution pattern, and guessing if the pattern solves your goal.

A good indicator that you've reached an understanding of the problem you're trying to solve is when you can describe it precisely to a non-expert.

Gut feeling says "I'd agree".

That sounds like a warning smell to me. Changing the shape to remove that distributed complexity is probably going to be a good idea. (ideas on how below)

This sounds like a use case of a custom Sink

If I understand that right it means there can be multiple "clients" following through the same connection to a server and each needs to be pinged when their specific update is acknowledged(?)

I'd probably model this around a struct with the rough shape:

struct MiddleMan {
    sender: MyCustomSinkToServer,
    receiver: MyCustomStreamFromServer,
    registered_clients: HashMap<ClientId, channel::mpsc::Sender>,
    pending_IDs: Hashmap<ClientId, Option<Uuid>>,
}

Then I'd implement Stream and Sink for MiddleMan and give it register(&mut Self, ClientId, Sender) & deregister(&mut Self, ClientId) methods.

That then gives you the chance to have one place, near the very "top" of your bin-lib-mods hierarchy where you have a driving loop that owns a MiddleMan, creates and registers Clients

====

General points to consider:

  • try to stay runtime agnostic until your main() function: using primitives, macros etc. from futures-rs does this, tokio ones often have a runtime baked in somewhere. It's a great runtime and full-suite but if your lib is not "just for you" then let whoever is creating the runtime binary decide which runtime to use.
  • running select!() in a loop lets you respond to "whatever happens next" across multiple channels / streams
  • do you want the client or the middleman to be responsible for "not sending the next update for client x until the right time"? (is this an implementation detail to abstract away so clients just "fire-and-forget" or a fundamental part of the API where the client needs to be informed & respond appropriately)
  • "don't panic!" (return a Result instead) - then let main handle exiting on errors where nothing in between can recover from them (e.g. the driving loop could log an error & restart the server connection, gracefully inform the clients, retry with backoff, bounce the TCPSocket, etc...)

The above is going to be broken in a number of ways which depend specifically on your use case - but it's the sort of shape I'd go for.
For an example of "something very different but similar" you can take a look at MusicalNinjaDad/splurt, which is ugly WIP, not always well documented but tending towards the above.

The application makes a single connection to the server.

The client that I mention is the app, that I'm trying to write, itself. (Or it can be any other app written in whatever language. The behavior should be the same.)

The server sends other sorts of updates, and I keep track of them in a state object (IDK right now how to organize this, might be one object, might be several). There are other parts that must watch these other updates and also do something (e.g. write to disk), or send requests to the server. That's why it's the question of concurrency. Had it been just one kind of request and response, I'd have coded it in a blocking way without thinking (I actually wrote a test code, that sent requests and watched only this kind of acnoledgements, dropping the other packets, and it worked).

So, yes, it looks like a router inside the app.

So, in general it looks like

  • the state of my app (somehow reflects the state of the server, somehow has its own state, that is used to send requests to the server)
  • a set of event listeners:
    • some should be triggered when certain packets come from the server
    • some should be triggered by schedule
      all the listeners do some checks and may send some packets to the server
  • and what connects them is an open question.

The blocking way, it becomes 1) transport structure, what I called Client. It has 'send' method, and also emits events with some payload, 2) a dispatcher, that manages the events and calls the listeners. And the problem is that if an action needs some specific events in the middle, it becomes complex.

Then it sounds like fn main is very similar to splurt (linked above) and has the shape:

loop {
  select! {
    msg = incoming => match msg { ... }
    tick => ...
  }
}

Sounds to me like you want an actor (as detailed in the article linked by akrauze above) where the actor task does a loop-select over receiving & acting on messages from handles, receiving incoming packets from the server, and acting in response to timeouts.

For the specific case of splitting update packets, one approach would be:

  • When an actor handle sends a message to the actor asking it to perform an update, the actor immediately splits apart the update packet to meet the packet size limits, sends the first packet, stores the rest in a VecDeque, and stores the VecDeque as the value in a HashMap where the key is some transaction ID that the response & request packets share (I don't have enough details about the protocol to say what this would be).

  • When an update packet response is received from the server, the actor derives the transaction ID from the response and looks up the VecDeque in the HashMap. It then pops the next packet from the deque and sends it; depending on the nature of the transaction IDs, you may need to then move the deque to another key.

  • If you want the handle to be notified once the update session is complete, then pair the VecDeque with a oneshot sender whose corresponding receiver was returned to the handle; once the response to the last packet is received, send the relevant response over the channel.

Also, for reasons of both testability and separating asyncness from logic, you may want to consider using a "sans IO" architecture in which the non-IO logic is separated out into a state machine with methods that are called in response to IO events. Here are some (admittedly low-code) links on sans IO programming I've collected over the years:

I've also written a few sans IO state machines in Rust myself; let me know if you want links to any of them.