Stream signature anomaly

I'm making a stream to send over an http connection. Look for PROBLEM SECTION below.

Basically the issue is I have a function that has a return type of
impl Stream<Item = Result<Bytes, Box<dyn std::error::Error + 'static>>>. I can pass the resulting stream into

  let res = awc::Client::new()
    .post("meh")
    .cookie(cookie)
    .send_body(awc::body::BodyStream::new(ch));

And it compiles.

But if I do the same thing in a second function, sync_stream, then I can't pass it into BodyStream::new.

A fuller code listing (look for PROBLEM SECTION ):

pub async fn sync_to_remote(
  conn: Arc<Connection>,
  user: &User,
  callbacks: &mut Callbacks,
  // ) -> Result<PrivateReplyMessage, Box<dyn std::error::Error>> {
) -> Result<PrivateReplyMessage, zkerr::Error> {

  // BOILERPLATE, question section is below:
  let extra_login_data = sqldata::read_user_by_id(&conn, user.id)?;

  // get previous sync.
  let after = prev_sync(&conn, &user, &extra_login_data.zknote)
    .await?
    .map(|cs| cs.now);

  println!("\n\n start sync, prev_sync {:?} \n\n", after);

  let (c, url) = match (user.cookie.clone(), user.remote_url.clone()) {
    (Some(c), Some(url)) => (c, url),
    _ => return Err("can't remote sync".into()),
  };

  let now = now()?;
  // let user_url  = Into::<awc::http::Uri>::into(url)?;
  let user_url = awc::http::Uri::try_from(url).map_err(|x| zkerr::Error::String(x.to_string()))?;
  let mut parts = awc::http::uri::Parts::default();
  parts.scheme = user_url.scheme().cloned();
  parts.authority = user_url.authority().cloned();
  parts.path_and_query = Some(awc::http::uri::PathAndQuery::from_static("/stream"));
  let url = awc::http::Uri::from_parts(parts).map_err(|x| zkerr::Error::String(x.to_string()))?;

  let client = awc::Client::new();
  let cookie = cookie::Cookie::parse(c)?;

  // PROBLEM SECTION
  // This code doesn't compile!!
  // let ss = sync_stream(conn, user.id, after, callbacks);
  // let res = awc::Client::new()
  //   .post(url)
  //   .cookie(cookie)
  //   .send_body(awc::body::BodyStream::new(ss));
  // END PROBLEM SECTION

  // The code below compiles!
  // Building the exact same thing, but here in this function instead of sync_stream
  let zns = ZkNoteSearch {
    tagsearch: TagSearch::SearchTerm {
      mods: Vec::new(),
      term: "".to_string(),
    },
    offset: 0,
    limit: None,
    what: "".to_string(),
    resulttype: ResultType::RtNote,
    archives: false,
    created_after: after,
    created_before: None,
    changed_after: after,
    changed_before: None,
    synced_after: after,
    synced_before: None,
    ordering: None,
  };

  let uid = user.id;

  let znsstream = search_zknotes_stream(conn.clone(), uid, zns);
  let azkls = sqldata::read_archivezklinks_stream(conn, uid, after);
  let ch = znsstream.chain(azkls);

  let res = awc::Client::new()
    .post("meh")
    .cookie(cookie)
    .send_body(awc::body::BodyStream::new(ch));

  Ok(PrivateReplyMessage {
    what: PrivateReplies::SyncComplete,
    content: serde_json::Value::Null,
  })
}

// Make a stream of all the records needed to sync the remote.
pub async fn sync_stream(
  conn: Arc<Connection>,
  uid: i64,
  after: Option<i64>,
  callbacks: &mut Callbacks,
) -> impl Stream<Item = Result<Bytes, Box<dyn std::error::Error + 'static>>> {
  let getnotes = true;
  let getlinks = true;
  let getarchivenotes = true;
  let getarchivelinks = true;

  // TODO: get time on remote system, bail if too far out.

  let zns = ZkNoteSearch {
    tagsearch: TagSearch::SearchTerm {
      mods: Vec::new(),
      term: "".to_string(),
    },
    offset: 0,
    limit: None,
    what: "".to_string(),
    resulttype: ResultType::RtNote,
    archives: false,
    created_after: after,
    created_before: None,
    changed_after: after,
    changed_before: None,
    synced_after: after,
    synced_before: None,
    ordering: None,
  };

  // Can send from here.
  // let znsstream = search_zknotes_stream(conn.clone(), 12, zns.clone());
  // let res = awc::Client::new()
  //   .post("meh")
  //   .send_body(awc::body::BodyStream::new(znsstream));

  search_zknotes_stream(conn, uid, zns)
}

The search_zknotes_stream function is here

Its not the end of the world, but it would be nice to have a second function where I build the stream, since I want to chain several streams together. That doesn't seem possible at this point but I don't see why. Probably some invisible attribute that isn't specified in the function signature but is actually there from the compiler's point of view.

Last but not least, the error message if I uncomment the PROBLEM SECTION above:


error[E0277]: the trait bound `impl futures_util::Future<Output = impl futures_util::Stream<Item = Result<bytes::Bytes, Box<(dyn StdError + 'static)>>>>: futures_util::Stream` is not satisfied
   --> server-lib/src/sync.rs:753:43
    |
753 |     .send_body(awc::body::BodyStream::new(ss));
    |                -------------------------- ^^ the trait `futures_util::Stream` is not implemented for `impl futures_util::Future<Output = impl futures_util::Stream<Item = Result<bytes::Bytes, Box<(dyn StdError + 'static)>>>>`
    |                |
    |                required by a bound introduced by this call
    |
    = help: the following other types implement trait `futures_util::Stream`:
              Multipart
              local_channel::mpsc::Receiver<T>
              AsyncStream<T, U>
              actix_multipart::Field
              futures::futures_channel::mpsc::Receiver<T>
              futures::futures_channel::mpsc::UnboundedReceiver<T>
              PollSemaphore
              actix_codec::framed::Framed<T, U>
            and 98 others
note: required by a bound in `BodyStream::<S>::new`
   --> /home/bburdette/.cargo/registry/src/index.crates.io-6f17d22bba15001f/actix-http-3.5.1/src/body/body_stream.rs:27:8
    |
27  |     S: Stream<Item = Result<Bytes, E>>,
    |        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ required by this bound in `BodyStream::<S>::new`
...
31  |     pub fn new(stream: S) -> Self {
    |            --- required by a bound in this associated function

I think[1] you want something like

  let ss = sync_stream(conn, user.id, after, callbacks)
        .await // <---
        .chain( sqldata::read_archivezklinks_stream(conn, user.id, after) );

Note that

pub async fn sync_stream(...) -> impl Stream<...>

Is roughly

// no `async fn`, returns a `Future`
pub fn sync_stream(...) -> impl Future<Output = impl Stream<...>>

so if you want the actual impl Stream, you need to .await the Future (Futures don't .await themselves automatically).


  1. there's a bit of missing information and 3 edits while I wrote this up ↩︎

Ok right! Its a function that returns a future that returns a stream, not a function that returns the stream itself. Removing the 'async' from sync_stream fixed the compile error. Nice!