Conditional channel receive with multiple receivers

(pseudo-code follows after description)

Hello, I am hoping to create a "signer pool" (like a thread pool) for web3. The basic idea is that the user of the signer pool can initialize it with a set of signers (signer being a web3 wallet that holds some ETH for sending transactions). From that point on, the signer pool struct does all the managing of the signers, and the user only needs to send requests to the signer pool for broadcasting transactions.

The way I am thinking of implementing the signer pool is using channels: when the signer pool gets initialized, we create a channel and we spawn a task for each signer wherein we indefinitely listen for transactions being sent down the channel. This would be a multiple producer, multiple consumer scenario: transactions would be queued up into the channel whenever the user interfaces with the signer pool, and those transactions would be received by signer tasks that are currently not doing other things (like waiting for a broadcasted transaction to confirm).

I'm sure this all sounds quite standard up to this point but where I am stuck is that I'd like the channel receive to be conditional: not every signer would be able to take on every transaction. Each transaction would have a different size, and would therefore consume different amounts of ETH to broadcast and confirm. So only signers(wallets) that have an ETH balance big enough to broadcast a transaction should take that on. Here are some ideas I've thought of:

  • if a signer receives a transaction from the channel that it immediately realizes it cannot actually process, it can just queue it back up into the channel. But there's no guarantee that the same signer wouldn't receive the transaction from the channel again, and therefore create a potentially infinite loop. I'd like for the same signer to not double-try the same transaction, and also to have some way of determining that none of the signers in the pool can actually handle a particular transaction—which would be reported back to the user of the signer pool as an error for that particular transaction.
  • inside each signer task, instead of just calling recv() on the receiver, calling iter() followed by peekable() to try and get a peekable iterator on the receiver so I can ensure that the signer can actually process the transaction before recv()-ing it. This idea was based on this discussion.. BUT, I am not sure how that approach works with multiple tasks treating the receiver as a peekable iterator in terms of synchronizing. A transaction must only be broadcast once. And the problem of knowing if none of the signers can actually handle the transaction still persists.

Here's some pseudocode in case it helps. Apologies I don't have real code as I am still trying to finalize the architecture before jumping into implementation. I have put !!COMMENTS!! emphasizing the parts of the code that I can't quite get right:

struct SignerPool {
    tx_sender: Sender<SignerPoolTransaction>,
}

struct SignerPoolTransaction {
    tx: Transaction,
    result: oneshot::Sender<SomeResult>
}

impl SignerPool {
    fn new(signers: Vec<Signer>) -> SignerPool {
        let (sender, receiver) = channel<SignerPoolTransaction>();
        for each signer {
            tokio::spawn(async move { // move signer and receiver clone
                while receiver.recv() {
                    // !!ENSURE SIGNER CAN HANDLE TRANSACTION BEFORE RECV-ING!!
                    // broadcast transaction and wait
                    // use one-shot channel to send back result
                }
            })
        }
        SignerPool { tx_sender: sender }
    }

    async fn broadcast_transactions(&self, transactions: Vec<Transaction>) -> Vec<SomeResult> {
        let mut oneshot_recvs = vec![];

        for each transaction {
            let (sender, receiver) = oneshot::channel<SomeResult>();
            oneshot_recvs.push(receiver);
            self.tx_sender.send(SignerPoolTransaction {
                tx: transaction, result: sender 
            }).await;
        }

        // await all oneshot_receivers and return contained results to user
        // !!NEED SOME WAY TO DETERMINE THAT A TRANSACTION COULDN'T BE FULFILLED BY ANY SIGNER!!
    }
}

If I understand correctly:

  • SignerPool knows the amount of currency in each signer's wallet
  • All messages go through this centralized SignerPool
  • Any signer worker can undertake any task as long as it is idle and its currency balance is big enough

In that case, it would make sense for SignerPool to do dispatch handling: maintain a vec of all the signers, including the signer's idle status and currency balance, and then when a new task comes in, iterate through this vec and find the first idle signer with a big enough balance. If no worker exists, return an error. If a signer is found, send a message to that signer, mark it as busy, and when the signer returns with a result, re-mark it as idle and notify the user.

4 Likes

Thanks so much for taking the time to read, understand and respond @hax10 !

Your suggestion definitely takes me closer to a working model. Ideally we'd want to fetch the balance of each signer's wallet directly through the signer right before attempting to send a transaction using that signer. This would ensure we're not using a stale value in case external transactions also happen using that signer. That should be easy to accommodate—the SignerPool could just query each signer's balance as it's iterating through all of them to find a suitable one for the transaction at hand. We could also make the signers always return their latest balance whenever they finish processing a transaction, and use some sort of timestamp based caching to decide whether or not we should fetch the latest balance while iterating/finding a suitable signer.

It's also possible that no idle signer can handle the transaction at the moment, but a currently busy signer would be able to once it's available again. A signer's real balance wouldn't update until the transaction is fully processed and the signer is idle again, but we can estimate how much a transaction would cost. So I suppose, for busy signers, the SignerPool could store a predicted_next_idle_balance and use that value to queue up a transaction for a qualifying busy signer in case none of the idle signers have enough balance. In this case the SignerPool can have a mpsc channel for each of the signers (instead of one channel shared among all the signers in my initial design). It does increase complexity but seems to handle all requirements.

Yeah, I think there are pros and cons of a shared message queue vs one-queue-per-signer. If workers are very likely to have enough money to handle transactions, then a shared queue would work well and you could just auto-retry on the small number of failed cases. But if the probability of not having enough credits is significant, then centralized dispatch makes more sense.

You could consider having each worker periodically send its bank balance to the SignerPool (maybe every few seconds? Not sure how much pressure this would put on the system), and then always work off that. But this delay in updating information does introduce the possibility of workers taking on jobs they can't afford or refusing jobs that they actually can process. So for jobs that fail or get refused for whatever reason, you could auto-retry a few times before sending a failure message back to the client.

1 Like

Okay, so your program interacts with and must acknowledge state that is being mutated by other blockchain transactions. Given that such transactions are inevitably slow to be confirmed (compared to the speed of single-machine computation), this suggests that your program does not need to be multithreaded, because even a single thread will be able to handle the full throughput of signing planning decisions to be made. Therefore, it is an option to not use any channels at all, and instead express your problem as a single-threaded algorithm that makes decisions about what to do using all the information at hand, rather than partitioning the state of the overall algorithm into multiple threads/tasks separated by channels.

The cryptographic and networking operations involved in issuing a transaction might still benefit from additional threads, but those tasks can be spawned on an ordinary thread pool or async executor. I'm proposing that it's a reasonable assumption that the decisions don't need to also be made by those tasks; they can instead be made by a single thread that doesn't need throughput-enhancing design choices like multiple queues. That single thread can afford to look at all the latest data (incoming requests and success or failure of previous requests to the actual signers) and perform whatever it finds to be the current optimal next step.

2 Likes

More great suggestions, thank you! I'm leaning towards the centralized dispatch solution for now to ensure correctness as much as we can, but I'll definitely see if there is room for improvement with the auto-retry mechanism!

Hi @kpreid thanks for the feedback. I agree that blockchain transactions can be slow to be confirmed, however this varies greatly between specific blockchains. L2s and alt-L1s can relay back confirmation in as little as 1-2s. Also, since we have multiple signers in our signer pool, we'd want to take advantage of having multiple transactions in the same block if we can. This could either be a one-transaction-per-signer model, or perhaps even multiple-transactions-per-signer model if we manage the nonces of the each signer carefully. However since we do have multiple signers, a one-transaction-per-signer model per-block seems like the way to go.

As such, I'm only planning to rely on async tasks and green threads—not OS threads. I would like for the user of the signer pool to be able to decide whether the executor uses a single thread or multiple threads. But I do think having concurrency would go a long way in speeding up the confirmation of transactions, especially when the user sends a batch to transactions to the pool.

That's my point — that's an extremely long time compared to the microseconds that it should be possible to run your pool scheduling decisions in. You can certainly use concurrency for issuing transactions to the chain but you do not need concurrency within the algorithm that decides which signer will get which transactions.

3 Likes

Aah, thanks for clarifying. I understand you better now, I think we may be aligned actually. By using a central dispatch task within the SignerPool we're effectively using a singular central sequential scheduling algorithm. This central "manager" will coordinate with concurrent tasks (one per signer) so that we can leverage concurrency for the purpose of sending transactions and waiting for their confirmations within the scope of each signer.

I will hopefully be able to start working on this later this week and I will share code links as I make meaningful progress :slightly_smiling_face:

I was quite excited to start working on the implementation and managed to get a first draft compiling version written. There are several TODOs, mostly around error handling and enhancements/configurability, but the core logic is all there. Here it is in case anyone would like to take a peek (feel free to comment on the commit directly if you feel inclined to do so :slightly_smiling_face:) : GitHub - shaurya947/ethers-signer-pool: Like a thread pool, but for signing and broadcasting transactions

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.