How to realize http Handle

fn handle_submissions(
    client: Client,
    rx: mpsc::UnboundedReceiver<SubmissionParameters>,
    tx_submit_data: mpsc::UnboundedSender<SubmissionParameters>,
    executor: &Handle,
) {
    async move {
        let stream = PrioRetry::new(rx, Duration::from_secs(3));
        while let Some(submission_params) = stream.next().await {
            let tx_submit_data = tx_submit_data.clone();
            client
                .clone()
                .submit_nounce(&submission_params)
                .await
                .map(|res| {
                    match res {
                        Ok(res) => {
                            if submission_params.deadline != res.deadline {
                                do_some();
                            } else {
                                do_some();
                            }
                        }
                        Err(FetchError::Pool(e)) => {
                            // Very intuitive, if some pools send an empty message they are
                            // experiencing too much load expect the submission to be resent later.
                            if e.message.is_empty() || e.message == "limit exceeded" {
                                do_some();
                                let res = tx_submit_data.unbounded_send(submission_params);
                                if let Err(e) = res {
                                    error!("can't send submission params: {}", e);
                                }
                            } else {
                                do_some();
                            }
                        }
                        Err(FetchError::Http(x)) => {
                            do_some();
                            let res = tx_submit_data.unbounded_send(submission_params);
                            if let Err(e) = res {
                                error!("can't send submission params: {}", e);
                            }
                        }
                    }
                })
                .for_each(|_| Ok(()))
                .map_err(|e| error!("can't handle submission params: {:?}", e));
        }
        executor.spawn(stream);
    };
}

some errors are listed below

error[E0277]: the trait bound `future::prio_retry::PrioRetry<futures_channel::mpsc::UnboundedReceiver<wallet::client::SubmissionParameters>>: core::future::future::Future` is not satisfied
   --> src/requests.rs:136:28
    |
136 |             executor.spawn(stream);
    |                            ^^^^^^ the trait `core::future::future::Future` is not implemented for `future::prio_retry::PrioRetry<futures_channel::mpsc::UnboundedReceiver<wallet::client::SubmissionParameters>>`
error[E0308]: mismatched types
  --> src/requests.rs:95:29
   |
77 |                         match res {
   |                               --- this expression has type `wallet::api::SubmitNonceResponse`
...
95 |                             Err(FetchError::Pool(e)) => {
   |                             ^^^^^^^^^^^^^^^^^^^^^^^^ expected struct `wallet::api::SubmitNonceResponse`, found enum `std::result::Result`

A few things I notice:

  1. You must await or spawn async blocks. You code currently does something like this:
fn handle_submissions(...) {
    // Create an async block, but don't execute it yet.
    let async_block = async move {
        ...
    };
    // Instead of executing it with spawn/await, destroy the async block.
    drop(async_block);
}
  1. You can't spawn/run a stream. Only a future. To "run" a stream, you use a while let loop like the one you already have.

  2. SubmitNonceResponse is not a Result, so you can't match on it using the Ok or Err variants from Result.

    pub async fn submit_nounce(
        &self,
        submission_data: &SubmissionParameters,
    ) -> Result<SubmitNonceResponse, FetchError> 

i am confused, because the submit_nounce has already return the Result<SubmitNonceResponse, FetchError>, when using .await , how to parse the error type? if i using .map().for_each().map_err() the error type will be lose.
i am not familiar with the Futures 0.3, very thanks.

The submit_nonce method returns a future with the output Result<SubmitNonceResponse, FetchError> when executed. Since .await executes the future, the value of the expression below is Result<SubmitNonceResponse, FetchError>:

client
    .clone()
    .submit_nounce(&submission_params)
    .await

so the map method you are calling is actually Result::map, which is used to change the ok-type in a result. The variable you called res is not a result — it is the ok-type of the result.

how can i get the Result<> type ? use then ?
or use something in await{ xxx } because futures has updated, plese help!!

You can match on it directly.

let res = client
    .clone()
    .submit_nounce(&submission_params)
    .await;

match res {
    Ok(res) => { ... },
    Err(FetchError::Pool(e)) => { ... },
    Err(FetchError::Http(x)) => { ... },
}

but the for_each and map_err can be realize in chain rule.

It seems like they are not necessary in this example. The purpose of for_each is to do something for every item in a stream, but you already have the while let loop for that purpose.

I don't actually know what you wanted for_each to do in that example, because submit_nonce doesn't return a stream?

what about the map_err(e) ?

That would be the Err(...) cases in your match.

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.