Closure annotation response


#1

Hello, I try to create a example with tokio-zmq and I have problem with response annotation
This is my function

type Sink = (Map<SplitStream<MultipartSinkStream<Rep>>, Fn() -> Multipart>, SplitSink<MultipartSinkStream<Rep>>);

fn run_client(ctx: Arc<zmq::Context>, addr: &str) -> impl Future<Item = Sink, Error = Error> + Send {
    let client = Rep::builder(ctx).connect(addr).build();

    client.and_then(|rep| {
        let (sink, stream) = rep.sink_stream(25).split();

        stream
            .map(|multipart|{
                for msg in &multipart {
                    if let Some(msg) = msg.as_str() {
                        println!("{:?}", msg);
                    }
                }

                rep.send(multipart).and_then(|item| {
                    Ok(item)
                });

                multipart
            })
            .forward(sink)
    })
}

The error show this:

error[E0271]: type mismatch resolving `<futures::AndThen<std::boxed::Box<dyn futures::Future<Item=tokio_zmq::Rep, Error=tokio_zmq::Error> + std::marker::Send>, std::boxed::Box<futures::stream::Forward<futures::stream::Map<futures::stream::SplitStream<tokio_zmq::async_types::MultipartSinkStream<tokio_zmq::Rep>>, [closure@src/main.rs:41:18: 53:14 rep:_]>, futures::stream::SplitSink<tokio_zmq::async_types::MultipartSinkStream<tokio_zmq::Rep>>>>, [closure@src/main.rs:37:21: 55:6]> as futures::Future>::Item == (futures::stream::Map<futures::stream::SplitStream<tokio_zmq::async_types::MultipartSinkStream<tokio_zmq::Rep>>, std::boxed::Box<(dyn std::ops::Fn() -> tokio_zmq::Multipart + 'static)>>, futures::stream::SplitSink<tokio_zmq::async_types::MultipartSinkStream<tokio_zmq::Rep>>)`
  --> src/main.rs:34:54                                                                                                                 
   |                                                                                                                                    
34 | fn run_client(ctx: Arc<zmq::Context>, addr: &str) -> impl Future<Item = Sink, Error = Error> + Send {                              
   |                                                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ expected closure, found struct `std::boxed::Box`
   |                                                                                                                                    
   = note: expected type `(futures::stream::Map<futures::stream::SplitStream<tokio_zmq::async_types::MultipartSinkStream<tokio_zmq::Rep>>, [closure@src/main.rs:41:18: 53:14 rep:_]>, futures::stream::SplitSink<tokio_zmq::async_types::MultipartSinkStream<tokio_zmq::Rep>>)`
              found type `(futures::stream::Map<futures::stream::SplitStream<tokio_zmq::async_types::MultipartSinkStream<tokio_zmq::Rep>>, std::boxed::Box<(dyn std::ops::Fn() -> tokio_zmq::Multipart + 'static)>>, futures::stream::SplitSink<tokio_zmq::async_types::MultipartSinkStream<tokio_zmq::Rep>>)`
   = note: the return type of a function must have a statically known size                                                              
                                                                                                                                        
error: aborting due to previous error                                                                                                   
                                                                                                                                        
For more information about this error, try `rustc --explain E0271`.                                                                     
error: Could not compile `zeromq_client`.

Any suggestion?


#2

In this case change Fn -> MultiPart to fn() -> MultiPart, and that should solve it.


#3

In that case I get this error:

error[E0271]: type mismatch resolving `<futures::AndThen<std::boxed::Box<dyn futures::Future<Item=tokio_zmq::Rep, Error=tokio_zmq::Error> + std::marker::Send>, futures::stream::Forward<futures::stream::Map<futures::stream::SplitStream<tokio_zmq::async_types::MultipartSinkStream<tokio_zmq::Rep>>, [closure@src/main.rs:40:18: 52:14 rep:_]>, futures::stream::SplitSink<tokio_zmq::async_types::MultipartSinkStream<tokio_zmq::Rep>>>, [closure@src/main.rs:36:21: 54:6]> as futures::Future>::Item == (futures::stream::Map<futures::stream::SplitStream<tokio_zmq::async_types::MultipartSinkStream<tokio_zmq::Rep>>, fn() -> tokio_zmq::Multipart>, futures::stream::SplitSink<tokio_zmq::async_types::MultipartSinkStream<tokio_zmq::Rep>>)`
  --> src/main.rs:33:54                                                         
   |                                                                            
33 | fn run_client(ctx: Arc<zmq::Context>, addr: &str) -> impl Future<Item = Sink, Error = Error> + Send {
   |                                                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ expected closure, found fn pointer
   |                                                                            
   = note: expected type `(futures::stream::Map<futures::stream::SplitStream<tokio_zmq::async_types::MultipartSinkStream<tokio_zmq::Rep>>, [closure@src/main.rs:40:18: 52:14 rep:_]>, futures::stream::SplitSink<tokio_zmq::async_types::MultipartSinkStream<tokio_zmq::Rep>>)`
              found type `(futures::stream::Map<futures::stream::SplitStream<tokio_zmq::async_types::MultipartSinkStream<tokio_zmq::Rep>>, fn() -> tokio_zmq::Multipart>, futures::stream::SplitSink<tokio_zmq::async_types::MultipartSinkStream<tokio_zmq::Rep>>)`
   = note: the return type of a function must have a statically known size

#4

This should solve it.

- type Sink = (Map<SplitStream<MultipartSinkStream<Rep>>, Fn() -> Multipart>, SplitSink<MultipartSinkStream<Rep>>);
+ type Sink = (Map<SplitStream<MultipartSinkStream<Rep>>, Box<Fn() -> Multipart + 'static>>, SplitSink<MultipartSinkStream<Rep>>);

#5

With you solution I get also this fail:

type mismatch resolving `<futures::AndThen<std::boxed::Box<dyn futures::Future<Item=tokio_zmq::Rep, Error=tokio_zmq::Error> + std::marker::Send>, futures::stream::Forward<futures::stream::Map<futures::stream::SplitStream<tokio_zmq::async_types::MultipartSinkStream<tokio_zmq::Rep>>, [closure@src/main.rs:41:18: 53:14 rep:_]>, futures::stream::SplitSink<tokio_zmq::async_types::MultipartSinkStream<tokio_zmq::Rep>>>, [closure@src/main.rs:37:21: 55:6]> as futures::Future>::Item == (futures::stream::Map<futures::stream::SplitStream<tokio_zmq::async_types::MultipartSinkStream<tokio_zmq::Rep>>, std::boxed::Box<(dyn std::ops::Fn() -> tokio_zmq::Multipart + 'static)>>, futures::stream::SplitSink<tokio_zmq::async_types::MultipartSinkStream<tokio_zmq::Rep>>)`
  --> src/main.rs:34:54                                                                                                                                                                                                                       
   |                                                                                                                                                                                                                                          
34 | fn run_client(ctx: Arc<zmq::Context>, addr: &str) -> impl Future<Item = Sink, Error = Error> + Send {                                                                                                                                    
   |                                                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ expected closure, found struct `std::boxed::Box`                                                                                     
   |                                                                                                                                                                                                                                          
   = note: expected type `(futures::stream::Map<futures::stream::SplitStream<tokio_zmq::async_types::MultipartSinkStream<tokio_zmq::Rep>>, [closure@src/main.rs:41:18: 53:14 rep:_]>, futures::stream::SplitSink<tokio_zmq::async_types::MultipartSinkStream<tokio_zmq::Rep>>)`
              found type `(futures::stream::Map<futures::stream::SplitStream<tokio_zmq::async_types::MultipartSinkStream<tokio_zmq::Rep>>, std::boxed::Box<(dyn std::ops::Fn() -> tokio_zmq::Multipart + 'static)>>, futures::stream::SplitSink<tokio_zmq::async_types::MultipartSinkStream<tokio_zmq::Rep>>)`
   = note: the return type of a function must have a statically known size                                                                                                                                                                    
                                                                                                                                                                                                                                              
error: aborting due to previous error                                                                                                                                                                                                         
                                                                                                                                                                                                                                              
For more information about this error, try `rustc --explain E0271`.                                                                                                                                                                           
error: Could not compile `zeromq_client`.

But anyway I have realized that tokio-zmq use GPL3 license instead of MIT and I cannot use in the project which I want to work that also will have MIT license. I found this better option https://github.com/cetra3/tmq


#6

The error is about the Sink type (alias).

Fn is a trait, so Fn() -> Multipart is a trait object of unknown size (because each Fn closure could hold a different number of variables). And Rust functions can’t return things of arbitrary sizes, because the calling function couldn’t know how much data to receive.

Box<dyn Fn() -> Multipart> has a fixed size, because Box is a fixed-sized pointer. That works as a return type.

But now, since you started requring Box in the Sink type, you actually have to make that box.

So wrap your closure in Box::new()