OK, I've attempted to implement using 2 channels to send/recv from my RPCClient... I'm pretty close, but a lot of this is just guessing. Again, I get confused on what's actually going on because the types are all synthetic, and so the error messages are near useless... at least to me
HTTP server: https://github.com/wspeirs/logstore/blob/03b7166c49a37c3b512b4ad98a7b2a3f9ceb5290/src/http_server.rs#L49
I'm struggle to handle the send/recv to the client.
let result = clients.values().map(move |rpc_client| {
let (tx, rx) = rpc_client.get_connection();
// make a bogus request, would come from GET request
let req = RequestMessage::Get(
String::from("method"),
LogValue::String(String::from("GET")),
);
tx.send(req).and_then(|_| rx.into_future())
});
For some reason the compiler thinks there should be a tuple. I have no idea where that would be coming from:
error[E0271]: type mismatch resolving `<futures::stream::StreamFuture<futures::sync::mpsc::Receiver<rpc_codec::ResponseMessage>> as futures::IntoFuture>::Error == futures::sync::mpsc::SendError<rpc_codec::RequestMessage>`
--> src/http_server.rs:58:34
|
58 | tx.send(req).and_then(|_| rx.into_future())
| ^^^^^^^^ expected tuple, found struct `futures::sync::mpsc::SendError`
|
= note: expected type `((), futures::sync::mpsc::Receiver<rpc_codec::ResponseMessage>)`
found type `futures::sync::mpsc::SendError<rpc_codec::RequestMessage>`
For some reason it also believes result
to be a Map
instead of a future, even though the last thing that's returned is rx.into_future()
from tx.send(...).and_then(...)
:
error[E0277]: the trait bound `futures::stream::Map<futures::stream::MapErr<futures::stream::IterOk<std::iter::Map<std::collections::hash_map::Values<'_, u32, rpc_server::RPCClient>, [closure@src/http_server.rs:50:51: 59:18]>, std::io::Error>, [closure@src/http_server.rs:77:34: 77:57]>, [closure@src/http_server.rs:78:30: 82:26]>: futures::Future` is not satisfied
--> src/http_server.rs:75:17
|
75 | / Box::new(
76 | | stream::iter_ok(result)
77 | | .map_err(|e| hyper::Error::Io(e))
78 | | .map(|rpc_rsp| {
... |
82 | | }),
83 | | )
| |_________________^ the trait `futures::Future` is not implemented for `futures::stream::Map<futures::stream::MapErr<futures::stream::IterOk<std::iter::Map<std::collections::hash_map::Values<'_, u32, rpc_server::RPCClient>, [closure@src/http_server.rs:50:51: 59:18]>, std::io::Error>, [closure@src/http_server.rs:77:34: 77:57]>, [closure@src/http_server.rs:78:30: 82:26]>`
|
= note: required for the cast to the object type `futures::Future<Error=hyper::Error, Item=hyper::Response<std::boxed::Box<futures::Stream<Item=hyper::Chunk, Error=hyper::Error>>>>`
Conceptually I understand what a Future
is, and what a Stream
/Sink
is... but I'm blind to what's actually going on because of all these stacked and synthesized types. Besides just continuing to bang my head on this, how can I learn to understand what's going on here? I feel like >50% of the time I'm just randomly trying combinations of things in the hopes that it will compile. The rest of my Rust code I have a pretty solid understanding of what's going on at each line, but this Tokio stuff just has me lost.