Internal packet handling topology and Sinks

I have decided that I'll be splitting my program into different stages in order to handle a vast possibility of the type of packets that may be received and used within my blockchain. I am using the relatively low-level future 0.3 Sink async primitive for each stage.

As a packet arrives, it gets sent into sink 1 where its layout is verified (via Zerocopy). Then, it's pushed into a stage 2 sink which is where the packet-action is determined. There are more than a-f possible actions, but the diagram gets the idea across: I need to inevitably push all packets that get sent into stage 2n into a singular stage 3. I can do this very easily by giving an UnboundedSender<Packet> into each stage 2n Sink type, and upon flushing, the packets get pushed through the sender. However, is there a way of handling the control flow I have a better/more-efficient way? Let's assume the case that millions of packets will possibly exist within this entire system.

The reason why I want to stay away from using a cloneable UnboundedSender<Packet> for forwarding: This is yet another Sink that data will need to be sent through. I would prefer to directly send the packet from Stage 2n into stage 3, but, it appears I need a mutable reference of Sink3 distributed in order to do this?

Also, would this topology be poor due to the need of copying the Packet's between each stage?

Another idea I came up with to heavily reduce copying:

Have a PacketHandler that stores a vector of each Packet, and for each packet, add a field called stage. The PacketHandler is like an event loop in the sense that it iterates through each packet in the queue, and each time it polls a packet, it checks on the packet's stage. Then, instead of sending the entire owned version of the packet into Sinks, send instead a mutable reference into each stage's corresponding function

What do y'all think of this idea over having multiple stages of Sinks?

Are any of the stages multi-thread? Can the vec grow and need to be reallocated? How much ratiometric variance (i.e., largest / smallest) is there in packet size? Perhaps you want a logical deque of references or indices to packet buffers allocated from an arena.

For an old comms guy like me a number of approaches come to mind. More details are needed to select among them.

Possibly -although likely- as this uses a futures 0.3 ThreadPool for driving futures to completion.

Indeed. I am using VecDeque<Packet> for the storage type of each sink. I use "first in, first out" handling of the data.

I set a custom MTU of 555 bytes to accommodate the system throughout many different network types (apparently some networks have this low of an MTU). However, may up this depending on the experience of others. The packet header for each packet is 76 bytes, which is a motivation for me to up the custom MTU. As such, answering your question, the variance is quite high, but bounded between 80 bytes and 555 bytes currently.

So the packets are relatively small (< 600B), with about a 7:1 size variance (560B:80B), and you move them between each sink stage (since each sink has its own VecDeque). Is there a reason why you move the packets between stages rather than passing them by some kind of reference or index?

1 Like

Is this a better idea then?

That makes more sense to me. Less data copying usually means higher throughput. Reuse rather than copying also should reduce cache-line thrashing, particularly when you're on a small number of cores. If the max packet size really is < 1kB, unless you are really constrained on memory you can initalize a large array of not-in-use packets and just pass indices between the stages. You do want a Stage enum at the start of each packet, with NotInUse (bikeshed) as one possible value of the enum. That way you are not passing references, thus avoiding fights with the borrow checker, and you can pass the indices between threads using simple channels.

You will need a strategy to respond to input overrun, but that is the case no matter what solution approach you choose.

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.