Future created by async block is not "Send"

Hi guys, I am new using Rust lang, just started today. I am trying to build something and I have an issue:

yarn install v1.22.17
[1/4] Resolving packages...
success Already up-to-date.
Done in 0.26s.
[0] yarn exited with code 0
[0]     Finished dev [unoptimized + debuginfo] target(s) in 0.13s
[0] cargo build --bin revolt exited with code 0
[0]    Compiling rust-s3 v0.27.0-rc4
[0] error: future cannot be sent between threads safely
[0]    --> /root/.cargo/registry/src/github.com-1ecc6299db9ec823/rust-s3-0.27.0-rc4/src/request.rs:49:50
[0]     |
[0] 49  |       async fn response(&self) -> Result<Response> {
[0]     |  __________________________________________________^
[0] 50  | |         // Build headers
[0] 51  | |         let headers = match self.headers() {
[0] 52  | |             Ok(headers) => headers,
[0] ...   |
[0] 103 | |         Ok(response)
[0] 104 | |     }
[0]     | |_____^ future created by async block is not `Send`
[0]     |
[0]     = help: the trait `Sync` is not implemented for `core::fmt::Opaque`
[0]     = note: required for the cast to the object type `dyn Future<Output = Result<reqwest::Response, anyhow::Error>> + Send`
[0]
[0] error: could not compile `rust-s3` due to previous error
[0] cargo build exited with code 101

Any ideas how can I fix it? I am using rustc 1.60.0-nightly (84322efad 2022-01-23).

extern crate base64;
extern crate md5;

use std::io::Write;

use chrono::{DateTime, Utc};
use maybe_async::maybe_async;
use reqwest::{Client, Response};

use crate::bucket::Bucket;
use crate::command::Command;
use crate::command::HttpMethod;
use crate::request_trait::Request;
use anyhow::anyhow;
use anyhow::Result;

use tokio_stream::StreamExt;

// Temporary structure for making a request
pub struct Reqwest<'a> {
    pub bucket: &'a Bucket,
    pub path: &'a str,
    pub command: Command<'a>,
    pub datetime: DateTime<Utc>,
    pub sync: bool,
}

#[maybe_async]
impl<'a> Request for Reqwest<'a> {
    type Response = reqwest::Response;
    type HeaderMap = reqwest::header::HeaderMap;

    fn command(&self) -> Command {
        self.command.clone()
    }

    fn path(&self) -> String {
        self.path.to_string()
    }

    fn datetime(&self) -> DateTime<Utc> {
        self.datetime
    }

    fn bucket(&self) -> Bucket {
        self.bucket.clone()
    }

    async fn response(&self) -> Result<Response> {
        // Build headers
        let headers = match self.headers() {
            Ok(headers) => headers,
            Err(e) => return Err(e),
        };

        let client = if cfg!(feature = "no-verify-ssl") {
            let client = Client::builder();

            cfg_if::cfg_if! {
                if #[cfg(feature = "tokio-native-tls")]
                {
                    let client = client.danger_accept_invalid_hostnames(true);
                }

            }

            cfg_if::cfg_if! {
                if #[cfg(any(feature = "tokio-native-tls", feature = "tokio-rustls-tls"))]
                {
                    let client = client.danger_accept_invalid_certs(true);
                }

            }

            client.build().expect("Could not build dangerous client!")
        } else {
            Client::new()
        };

        let method = match self.command.http_verb() {
            HttpMethod::Delete => reqwest::Method::DELETE,
            HttpMethod::Get => reqwest::Method::GET,
            HttpMethod::Post => reqwest::Method::POST,
            HttpMethod::Put => reqwest::Method::PUT,
            HttpMethod::Head => reqwest::Method::HEAD,
        };

        let request = client
            .request(method, self.url().as_str())
            .headers(headers)
            .body(self.request_body());

        let response = request.send().await?;

        if cfg!(feature = "fail-on-err") && response.status().as_u16() >= 400 {
            return Err(anyhow!(
                "Request failed with code {}\n{}",
                response.status().as_u16(),
                response.text().await?
            ));
        }

        Ok(response)
    }

    async fn response_data(&self, etag: bool) -> Result<(Vec<u8>, u16)> {
        let response = self.response().await?;
        let status_code = response.status().as_u16();
        let headers = response.headers().clone();
        let etag_header = headers.get("ETag");
        let body = response.bytes().await?;
        let mut body_vec = Vec::new();
        body_vec.extend_from_slice(&body[..]);
        if etag {
            if let Some(etag) = etag_header {
                body_vec = etag.to_str()?.as_bytes().to_vec();
            }
        }
        Ok((body_vec, status_code))
    }

    async fn response_data_to_writer<T: Write + Send>(&self, writer: &mut T) -> Result<u16> {
        let response = self.response().await?;

        let status_code = response.status();
        let mut stream = response.bytes_stream();

        while let Some(item) = stream.next().await {
            writer.write_all(&item?)?;
        }

        Ok(status_code.as_u16())
    }

    async fn response_header(&self) -> Result<(Self::HeaderMap, u16)> {
        let response = self.response().await?;
        let status_code = response.status().as_u16();
        let headers = response.headers().clone();
        Ok((headers, status_code))
    }
}

impl<'a> Reqwest<'a> {
    pub fn new<'b>(bucket: &'b Bucket, path: &'b str, command: Command<'b>) -> Reqwest<'b> {
        Reqwest {
            bucket,
            path,
            command,
            datetime: Utc::now(),
            sync: false,
        }
    }
}

#[cfg(test)]
mod tests {
    use crate::bucket::Bucket;
    use crate::command::Command;
    use crate::request::Reqwest;
    use crate::request_trait::Request;
    use anyhow::Result;
    use awscreds::Credentials;
    use http::header::{HOST, RANGE};

    // Fake keys - otherwise using Credentials::default will use actual user
    // credentials if they exist.
    fn fake_credentials() -> Credentials {
        let access_key = "AKIAIOSFODNN7EXAMPLE";
        let secert_key = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY";
        Credentials::new(Some(access_key), Some(secert_key), None, None, None).unwrap()
    }

    #[test]
    fn url_uses_https_by_default() -> Result<()> {
        let region = "custom-region".parse()?;
        let bucket = Bucket::new("my-first-bucket", region, fake_credentials())?;
        let path = "/my-first/path";
        let request = Reqwest::new(&bucket, path, Command::GetObject);

        assert_eq!(request.url().scheme(), "https");

        let headers = request.headers().unwrap();
        let host = headers.get(HOST).unwrap();

        assert_eq!(*host, "my-first-bucket.custom-region".to_string());
        Ok(())
    }

    #[test]
    fn url_uses_https_by_default_path_style() -> Result<()> {
        let region = "custom-region".parse()?;
        let bucket = Bucket::new_with_path_style("my-first-bucket", region, fake_credentials())?;
        let path = "/my-first/path";
        let request = Reqwest::new(&bucket, path, Command::GetObject);

        assert_eq!(request.url().scheme(), "https");

        let headers = request.headers().unwrap();
        let host = headers.get(HOST).unwrap();

        assert_eq!(*host, "custom-region".to_string());
        Ok(())
    }

    #[test]
    fn url_uses_scheme_from_custom_region_if_defined() -> Result<()> {
        let region = "http://custom-region".parse()?;
        let bucket = Bucket::new("my-second-bucket", region, fake_credentials())?;
        let path = "/my-second/path";
        let request = Reqwest::new(&bucket, path, Command::GetObject);

        assert_eq!(request.url().scheme(), "http");

        let headers = request.headers().unwrap();
        let host = headers.get(HOST).unwrap();
        assert_eq!(*host, "my-second-bucket.custom-region".to_string());
        Ok(())
    }

    #[test]
    fn url_uses_scheme_from_custom_region_if_defined_with_path_style() -> Result<()> {
        let region = "http://custom-region".parse()?;
        let bucket = Bucket::new_with_path_style("my-second-bucket", region, fake_credentials())?;
        let path = "/my-second/path";
        let request = Reqwest::new(&bucket, path, Command::GetObject);

        assert_eq!(request.url().scheme(), "http");

        let headers = request.headers().unwrap();
        let host = headers.get(HOST).unwrap();
        assert_eq!(*host, "custom-region".to_string());

        Ok(())
    }

    #[test]
    fn test_get_object_range_header() -> Result<()> {
        let region = "http://custom-region".parse()?;
        let bucket = Bucket::new_with_path_style("my-second-bucket", region, fake_credentials())?;
        let path = "/my-second/path";

        let request = Reqwest::new(
            &bucket,
            path,
            Command::GetObjectRange {
                start: 0,
                end: None,
            },
        );
        let headers = request.headers().unwrap();
        let range = headers.get(RANGE).unwrap();
        assert_eq!(range, "bytes=0-");

        let request = Reqwest::new(
            &bucket,
            path,
            Command::GetObjectRange {
                start: 0,
                end: Some(1),
            },
        );
        let headers = request.headers().unwrap();
        let range = headers.get(RANGE).unwrap();
        assert_eq!(range, "bytes=0-1");

        Ok(())
    }
}

You'd need to post your code for someone to be able to help you. It is not obvious from the error message what the problem is.

Hi @Aeoris, and welcome :wave:

Indeed, don't forget to try and share more of your code the next time you hit such an issue, it's easier for us to help that way :slightly_smiling_face:

This instance, it turns out that fmt::Opaque mention was enough of a lead for me to follow it and find the culprit:

Interestingly, this is a nightly regression w.r.t. formatting internals:

async
fn foo ()
{
    println!("{} {:?} {}", "", async {}.await, "");
}

const _ASSERT_SEND: fn() = || {
    let _: &dyn Send = &foo();
};
error: future cannot be sent between threads safely
 --> src/lib.rs:8:24
  |
8 |     let _: &dyn Send = &foo();
  |                        ^^^^^^ future returned by `foo` is not `Send`
  |
  = help: the trait `Sync` is not implemented for `core::fmt::Opaque`
  = note: required for the cast to the object type `dyn Send`

@Aeoris, the issue thus stems from performing an .await right inside a formatting macro, such as format! or println!; the workaround is thus to hoist that .await outside the macro, by binding the result to its own variable:

- println!("{:?}", async_fn().await());
+ let thing = async_fn.await;
+ println!("{thing:?}");
3 Likes

Filed

Hello, thank you! Right, I forgot to add the code, I already added it in the first message. I will try the workaround right now.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.