Rust Iced Subscription Stream i.e. Understanding mpsc better

I need some helping understanding how to properly create a subscription stream that will accept a multitude of subscriptions from various parts of my application and backend network logic.
My backend is using libp2p and so contains a network_event_stream that I need my application to keep track of, on top of potential application specific events.
I believe this implies I create an mpsc channel between the main app bodies fn subscription, and the nested page fn subscription(), instead of just propagating back. Is this assumption correct? Or is there a better solution.
The example given by the docs of a subsciption channel is: (not sure how to modify this to work with my program structure)

use iced_native::subscription::{self, Subscription};
use iced_native::futures::channel::mpsc;
use iced_native::futures::sink::SinkExt;

pub enum Event {
    Ready(mpsc::Sender<Input>),
    WorkFinished,
    // ...
}

enum Input {
    DoSomeWork,
    // ...
}

enum State {
    Starting,
    Ready(mpsc::Receiver<Input>),
}

fn some_worker() -> Subscription<Event> {
    struct SomeWorker;

    subscription::channel(std::any::TypeId::of::<SomeWorker>(), 100, |mut output| async move {
        let mut state = State::Starting;

        loop {
            match &mut state {
                State::Starting => {
                    // Create channel
                    let (sender, receiver) = mpsc::channel(100);

                    // Send the sender back to the application
                    output.send(Event::Ready(sender)).await;

                    // We are ready to receive messages
                    state = State::Ready(receiver);
                }
                State::Ready(receiver) => {
                    use iced_native::futures::StreamExt;

                    // Read next input sent from `Application`
                    let input = receiver.select_next_some().await;

                    match input {
                        Input::DoSomeWork => {
                            // Do some async work...

                            // Finally, we can optionally produce a message to tell the
                            // `Application` the work is done
                            output.send(Event::WorkFinished).await;
                        }
                    }
                }
            }
        }
    })
}

My program is currently split into several nested parts and my current subscription method is invalid as it only subscribes to subscriptions on pages that are in focus, instead of running constantly throughout the lifetime of the application.
My implementation looks like this:

#[derive(Debug)]
pub struct ProjectGui {
    pages: Pages,
}

#[derive(Debug, Clone)]
pub enum Message {
    PageMessage(PageMessage),
    // skipped code
}

impl Application for ProjectGui {
    type Executor = executor::Default; // Can I expand on this
    type Message = Message;
    type Theme = Theme;
    type Flags = ();
    // skipped code
    fn update(&mut self, message: Self::Message) -> Command<Message> {
        match message {
            Message::PageMessage(message) => {
                return self.pages.update(message);
            },
        }
        Command::none()
    }
    // skipped code
    fn subscription(&self) -> Subscription<Message> {
        self.pages.subscription().map(|message| Message::PageMessage(message))
    }
}
#[derive(Debug)]
pub struct Pages {
    pub pages: Vec<Page>,
    pub current: usize,
    pub debug: bool,
    pub theme: Theme,
}

impl<'a> Pages {
    // skipped code
    pub fn update(&mut self, message: PageMessage) -> Command<Message> {
        return self
            .pages[self.current]
            .update(message, &mut self.debug, &mut self.theme)
            .map(|message| Message::PageMessage(message));
    }
    // skipped code
    pub fn subscription(&self) -> Subscription<PageMessage>{
        self.pages[self.current].subscription()
    }
}
#[derive(Debug)]
pub enum Page {
    SoftDemo(SoftDemo),
    // skipped code
}

#[derive(Debug, Clone)]
pub enum PageMessage {
    SoftDemo(SoftDemoMessage),
    // skipped code
}

impl<'a> Page {
    pub fn update(&mut self, message: PageMessage, debug: &mut bool, theme: &mut Theme) -> Command<PageMessage> {
        match message {
            PageMessage::SoftDemo(message) => {
                if let Page::SoftDemo(soft_demo) = self {
                    return soft_demo
                        .update(message)
                        .map(|message| PageMessage::SoftDemo(message));
                }
            },
        }
        Command::none()
    }
    // skipped  code
    pub fn subscription(&self) -> Subscription<PageMessage> {
        match self {
            Page::SoftDemo(soft_demo) =>{
                soft_demo
                    .subscription()
                    .map(|message| PageMessage::SoftDemo(message))
                },
            _ => Subscription::none()
        }
    }
}
// skipped code
#[derive(Debug)]
pub struct SoftDemo {
    pub simulation: Simulation,
    pub is_playing: bool,
    pub queued_ticks: usize,
    pub speed: usize,
    pub next_speed: Option<usize>,
    pub version: usize,
}

#[derive(Debug, Clone)]
pub enum SoftDemoMessage {
    Tick(Instant),
    // skipped code
}

impl<'a> SoftDemo {
    pub fn new() -> Self {
        Self {
            simulation: Simulation::new(),
            is_playing: Default::default(),
            queued_ticks: Default::default(),
            speed: 5,
            next_speed: Default::default(),
            version: Default::default()
        }
    }
    
    pub fn update(&mut self, message: SoftDemoMessage) -> Command<SoftDemoMessage> {
        match message {
            SoftDemoMessage::Tick(_) | SoftDemoMessage::Next => {
                self.queued_ticks = (self.queued_ticks + 1).min(self.speed);

                if let Some(task) = self.simulation.tick(self.queued_ticks) {
                    if let Some(speed) = self.next_speed.take() {
                        self.speed = speed;
                    }

                    self.queued_ticks = 0;

                    let version = self.version;

                    return Command::perform(task, move |message| {
                        SoftDemoMessage::Simulation(message, version)
                    });
                }
            },
        }
        Command::none()
    }
    // skipped code
    pub fn subscription(&self) -> Subscription<SoftDemoMessage> {
        if self.is_playing {time::every(Duration::from_millis(1000 / self.speed as u64))
                .map(|instant| SoftDemoMessage::Tick(instant))
        } else {
            Subscription::none()
        }
    }
}
// skipped code

There is a lot of code here, but it mostly looks like noise rather than useful information. I don't mean it's useless, I just don't have any context for what a Page is in your application. Or a Pages, or a SoftDemo. I think ProjectGui is pretty self-explanatory, since it's the top-level application.

I can read what the code does, but it isn't at all clear what you are trying to do with it.

This sounds reasonable. The iced::Application trait only gives you a single message type, so every potential message needs to be accounted for here.

I am unsure of why you qualify the statement with "instead of just propagating back". Isn't the whole purpose of the channel to propagate messages back to the event loop? In other words, you need some method to produce the events that become Application::Message values, and the channel is that method. Where is the "instead of..."? FWIW, the Subscription<T> is the channel in question.

The example shows how to create a bidirectional channel with Subscription. Don't let that confuse you. If you don't need to send messages back to SoftDemo, then you don't need bidirectional communications. If you do need the bidi channel, then the example code is a good reference for how to do that. The websocket demo also uses the same pattern.

Is this the actual question? "How to create a subscription that aggregates messages from multiple sources?"

You do this by creating a SubScription<Message> at the top-level (i.e. in ProgramGui itself, not at the leaf like SoftDemo) and the closure given to subscription::channel() needs to create channels for all sources of interest (one for each "page"). And because it's at the top-level, it should trivially have access to every leaf below it.

That closure is responsible for not only dispatching messages back to the event loop, but it is also responsible for establishing the communication paths to each source. This is exactly how the websocket demo works. But instead of the closure creating a socket, it delegates that to some other "page" by using a channel. And it also delegates SoftDemo ticks through a second channel. And so on.

2 Likes

There is a lot of code here, but it mostly looks like noise rather than useful information. I don't mean it's useless, I just don't have any context for what a Page is in your application. Or a Pages, or a SoftDemo. I think ProjectGui is pretty self-explanatory, since it's the top-level application.

I can read what the code does, but it isn't at all clear what you are trying to do with it.

  • Apologies, I am essentially trying to replicate the logic used in the Iced tour example, and include subscriptions. The top-level application state, struct ProjectGui{} is simply pages: Pages. Pages is what contains the shared state of all windows of my application, and is debug: bool, theme: Theme, current: usize, and a vector containing all of my pages. So Page is in essence a window, or think of the top-level as the descriptor of a book, Pages as the book itself, and Page as each individual page. The app is currently traversed by incrementing current and going to that position in the pages: Vec[Page].

I am unsure of why you qualify the statement with "instead of just propagating back". Isn't the whole purpose of the channel to propagate messages back to the event loop? In other words, you need some method to produce the events that become Application::Message values, and the channel is that method. Where is the "instead of..."? FWIW, the Subscription<T> is the channel in question.

  • Yes it is, the question more surrounds how to do this properly. In my current message you see that my propagation of the subscription is handled by consecutive fn subscription() that return and map the output to the correct top-level Message format (i.e. SoftDemoMessage -> PageMessage -> Message). With my current setup I am only subscribing to the page in focus as the subscription fn refers to the current page that is in focus.
  • I am going to try your suggestion at the bottom. This was roughly my thoughts on how to make it work correctly but is obviously more complicated and Rust hasn't been the easiest learning curve. I will do my best and probably be back with another question shortly!

P.S. How do you highlight specific parts of a persons message as you have?

1 Like

Use your mouse to highlight a passage of text, and a small popup should appear over the highlighted section with the word "Quote". Click on that.

2 Likes

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.