I am getting this error:
error[E0599]: no method named `start_send` found for type `packet::inbound::sink_stage1::Stage1Sink<'stage1, 'driver, 'packet>` in the current scope
--> hyxewave_net\src\packet\inbound\stage_driver.rs:38:33
|
38 | self.stage1.start_send(packet);
| ^^^^^^^^^^
|
::: hyxewave_net\src\packet\inbound\sink_stage1\mod.rs:12:1
|
12 | pub struct Stage1Sink<'stage1, 'driver: 'stage1, 'packet: 'driver> where Self: 'stage1 {
| -------------------------------------------------------------------------------------- method `start_send` not found for this
|
= help: items from traits can only be used if the trait is implemented and in scope
= note: the following traits define an item `start_send`, perhaps you need to implement one of them:
candidate #1: `futures::sink::Sink`
candidate #2: `futures_sink::Sink`
Just for making sense of the lifetimes, stage1 <= driver <= packet
Okay, so this is the calling function's code:
impl<'stage1, 'driver: 'stage1, 'packet: 'driver> StageDriver<'stage1, 'driver, 'packet> {
/// Creates a new Stage1Inner. There should only be 1 in existence during runtime
pub fn new<'a>() -> Self {
Self {raw_inbound: VecDeque::new(), processed_inbound: VecDeque::new(), stage1: Stage1Sink::new(), _phantom: Default::default()}
}
/// drives mutable references up one stage
fn drive(mut self: Pin<&mut Self>, cx: &mut Context) {
//let mut to_stage1 = Vec::new();
//let mut to_stage2 = Vec::new();
for (idx, packet) in self.processed_inbound.iter_mut().enumerate() {
match packet.stage {
PacketStage::Stage1 => {
self.stage1.start_send(packet);
},
PacketStage::Stage2 => {
//to_stage2.push(packet);
}
PacketStage::NeedsDelete => {
self.processed_inbound.remove(idx);
}
_ => {
panic!("Invalid stage!")
}
}
}
//self.to_stage1_tx.send_all(&mut futures::stream::iter_ok(to_stage1));
//self.stage1.send_all(&mut futures2::stream::iter(to_stage1));
self.stage1.poll_flush(cx);
//self.stage1.send_all(to_stage1);
}
}
Within StageDriver
's structure, stage1
is defined as: stage1: Stage1Sink<'stage1, 'driver, 'packet>
And finally, the implementation for Sink
for Stage1Sink
impl<'stage1, 'driver: 'stage1, 'packet: 'driver> Sink<&'stage1 mut RawInboundItem<'packet>> for Stage1Sink<'stage1, 'driver, 'packet> {
type Error = ();
fn poll_ready(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn start_send(mut self: Pin<&mut Self>, item: &'stage1 mut RawInboundItem<'packet>) -> Result<(), Self::Error> {
Ok(self.fifo.push_back(item))
}
fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Self::Error>> {
let len = self.fifo.len();
self.fifo.drain(0..len).map(|_| {
// determine packet action
println!("Packet made it to stage 2.. END (for now)");
});
Poll::Ready(Ok(()))
}
fn poll_close(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
}
The error is happening in the drive
function posted herein. Why am I getting the error that I am?