Implementing Sink and lack of trait function visibility

I am getting this error:

error[E0599]: no method named `start_send` found for type `packet::inbound::sink_stage1::Stage1Sink<'stage1, 'driver, 'packet>` in the current scope
  --> hyxewave_net\src\packet\inbound\stage_driver.rs:38:33
   |
38 |                     self.stage1.start_send(packet);
   |                                 ^^^^^^^^^^
   | 
  ::: hyxewave_net\src\packet\inbound\sink_stage1\mod.rs:12:1
   |
12 | pub struct Stage1Sink<'stage1, 'driver: 'stage1, 'packet: 'driver> where Self: 'stage1 {
   | -------------------------------------------------------------------------------------- method `start_send` not found for this
   |
   = help: items from traits can only be used if the trait is implemented and in scope
   = note: the following traits define an item `start_send`, perhaps you need to implement one of them:
           candidate #1: `futures::sink::Sink`
           candidate #2: `futures_sink::Sink`

Just for making sense of the lifetimes, stage1 <= driver <= packet

Okay, so this is the calling function's code:

impl<'stage1, 'driver: 'stage1, 'packet: 'driver> StageDriver<'stage1, 'driver, 'packet> {
    /// Creates a new Stage1Inner. There should only be 1 in existence during runtime
    pub fn new<'a>() -> Self {
        Self {raw_inbound: VecDeque::new(), processed_inbound: VecDeque::new(), stage1: Stage1Sink::new(), _phantom: Default::default()}
    }

    /// drives mutable references up one stage
    fn drive(mut self: Pin<&mut Self>, cx: &mut Context) {
        //let mut to_stage1 = Vec::new();
        //let mut to_stage2 = Vec::new();
        for (idx, packet) in self.processed_inbound.iter_mut().enumerate() {
            match packet.stage {
                PacketStage::Stage1 => {
                    self.stage1.start_send(packet);
                },

                PacketStage::Stage2 => {
                    //to_stage2.push(packet);
                }

                PacketStage::NeedsDelete => {
                    self.processed_inbound.remove(idx);
                }
                _ => {
                    panic!("Invalid stage!")
                }
            }
        }

        //self.to_stage1_tx.send_all(&mut futures::stream::iter_ok(to_stage1));
        //self.stage1.send_all(&mut futures2::stream::iter(to_stage1));
        self.stage1.poll_flush(cx);
        //self.stage1.send_all(to_stage1);
    }
}

Within StageDriver's structure, stage1 is defined as: stage1: Stage1Sink<'stage1, 'driver, 'packet>

And finally, the implementation for Sink for Stage1Sink

impl<'stage1, 'driver: 'stage1, 'packet: 'driver> Sink<&'stage1 mut RawInboundItem<'packet>> for Stage1Sink<'stage1, 'driver, 'packet> {
    type Error = ();

    fn poll_ready(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(()))
    }

    fn start_send(mut self: Pin<&mut Self>, item: &'stage1 mut RawInboundItem<'packet>) -> Result<(), Self::Error> {
        Ok(self.fifo.push_back(item))
    }

    fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Self::Error>> {
        let len = self.fifo.len();
        self.fifo.drain(0..len).map(|_| {
            // determine packet action
            println!("Packet made it to stage 2.. END (for now)");
        });

        Poll::Ready(Ok(()))
    }

    fn poll_close(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(()))
    }
}

The error is happening in the drive function posted herein. Why am I getting the error that I am?

This is very likely because impl StageDriver and trait Sink are in separate modules and you forgot to use the trait in impl module

I tried moving the two into the same module, and i get the same error. Here are my imports for Stage1Sink's mod:

use std::collections::VecDeque;
use futures2::Sink; // futures2 = futures 0.3
use async_std::task::{Context, Poll};
use std::pin::Pin;
use crate::connection::stream_wrappers::old::RawInboundItem;
use std::marker::PhantomData;

And for StageDriver

use std::collections::VecDeque;
use crate::connection::stream_wrappers::old::RawInboundItem;
use async_std::task::{Context, Poll};
use std::pin::Pin;
use hyxe_netdata::packet::{ProcessedPacketHeader, PacketStage};
use crate::packet::inbound::sink_stage1::Stage1Sink;
use std::marker::PhantomData;
use futures2::Sink;

Can you show the definition of Sink trait? Or, better yet, give me a link to some online repo?

This might be because of trait's own restrictions, like trait Foo: Clone {}

Is this the trait you're using?

1 Like

Yes, this is the trait I was using

@CreepySkeleton, as requested here is the full impl for Stage1Sink

use std::collections::VecDeque;
use futures2::Sink;
use async_std::task::{Context, Poll};
use std::pin::Pin;
use crate::connection::stream_wrappers::old::RawInboundItem;
use std::marker::PhantomData;

/// All packets that have either a DO_CONNECT, CONNECT_ALIVE, DO_DISCONNECT, etc, will be forwarded here
pub mod stage1_login_sink;

/// Stage 1: At this point, we have received packets which have a valid layout, and can now begin pushing each into the appropriate subsink
pub struct Stage1Sink<'stage1, 'driver: 'stage1, 'packet: 'driver>  {
    fifo: VecDeque<&'stage1 mut RawInboundItem<'packet>>,
    _phantom: PhantomData<&'driver Self>
}

impl<'stage1, 'driver: 'stage1, 'packet: 'driver> Stage1Sink<'stage1, 'driver, 'packet> {
    /// Creates a new stage 1 sink
    pub fn new() -> Self {
        Self { fifo: VecDeque::new(), _phantom: Default::default() }
    }
}

impl<'stage1, 'driver: 'stage1, 'packet: 'driver> Sink<&'stage1 mut RawInboundItem<'packet>> for Stage1Sink<'stage1, 'driver, 'packet> {
    type Error = ();

    fn poll_ready(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(()))
    }

    fn start_send(mut self: Pin<&mut Self>, item: &'stage1 mut RawInboundItem<'packet>) -> Result<(), Self::Error> {
        Ok(self.fifo.push_back(item))
    }

    fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Self::Error>> {
        let len = self.fifo.len();
        self.fifo.drain(0..len).map(|_| {
            // determine packet action
            println!("Packet made it to stage 1.. END (for now)");
        });

        Poll::Ready(Ok(()))
    }

    fn poll_close(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(()))
    }
}

For the record, this problem doesn't occur when i switch to Sink from futures 0.1. However, I am not allowed to specify lifetimes under the trait, and as such the program doesn't work as needed

The error is because the type you're calling poll_flush and start_send on does not match the signature of the trait. In general, when you get an error about a trait not being implemented that you don't understand, the first thing you should do is switch to UFCS: <Stage1Sink<'stage1, 'driver, 'packet> as Sink<_>>::poll_flush(&mut self.stage1, cx). This will give you an error message explaining that Pin<&mut Self> was expected, not &mut Self. Assuming Stage1Sink: Unpin, Pin::new(&mut self.stage1).poll_flush(cx) will do what you need.

1 Like