Code Review Request: a drop-in replacement for Logstash

Hi, could anyone take a look at this? I would greatly appreciate feedback on anything, but I’m mainly interested in whether my use of Tokio / Futures / MPSC channels is correct.

Currently partially working:

  • HttpPollerInput, GeneratorInput, ExecInput
  • GeoipFilter, MutateFilter, CloneFilter
  • StdoutOutput

and I’m working on implementing the other Logstash plugins whenever I can. I have a pattern for implementing each, but I’d like to make sure that pattern is the right way to go about it.

Thank you!

Anyone? Any recommendations on where I can get some feedback? I haven’t been able to find anyone to comment on this yet.

I took a (very) quick look, because I have an interest in not using Logstash. A config-compatible drop-in replacement is an interesting challenge; I keep writing small, purpose-specific loaders directly for particular formats.

The code I found looked nicely written at first glance (ie within each file), but I got a little lost digging further - there are lots of files with lots of structs for config template, and I got a bit lost trying to find how things hang together and where the functionality to connect things together is.

So, my first suggestions:

  • Write some notes on an architecture overview and data flow model (in code terms). I see lots of config structs but was looking for how actual log entries are passed and mutated; it’s (presumably) something more dynamic? In other words, describe the pattern you want to validate / establish.
  • move */mod.rs to the parent dir as (e.g.) input.rs; they’re too hard to spot among all the other files, and in fact I only just noticed them while writing this.
1 Like

If you post some specific snippets about which you are wondering most or give a more complete description of the project, more people might chime in.

1 Like

Thanks for your feedback!

move */mod.rs to the parent dir as (e.g.) input.rs ; they’re too hard to spot among all the other files, and in fact I only just noticed them while writing this.

Makes sense! I’ll do that.

Write some notes on an architecture overview and data flow model (in code terms). I see lots of config structs but was looking for how actual log entries are passed and mutated; it’s (presumably) something more dynamic? In other words, describe the pattern you want to validate / establish.

The current pattern is:

  1. I create a vector of inputs plugins and run them all. Each one of these implements Stream, and the InputBlock struct is a Future which polls all input plugins and sends all messages to a futures mpsc channel.
  2. Filters were a little more complicated. I wanted to asynchronously handle all messages, but the way Logstash filter blocks work is that the first filter plugin sends each messages to the second, then the second plugin sends that to the third, etc. Filter is linear execution, but should be asynchronously handled since each filter plugin could take an unknown amount of time. Because of this, I take the vector of Filter plugins and use fold to add futures mpsc channels to each. In the end, it looks like:

All Input plugins -> First Filter Plugin -> Second Filter Plugin (etc) -> All Output Plugins

  1. The last Filter plugin will send all messages to the OutputBlock.

Every Plugin has all the standard options of every Logstash plugin. For example:

impl<'a> Default for HttpPoller<'a> {
    fn default() -> Self {
        Self {
            user: None,
            password: None,
            automatic_retries: Some(1),
            cacert: None,
            client_cert: None,
            client_key: None,
            connect_timeout: Some(10),
            cookies: Some(true),
            follow_redirects: Some(true),
            keepalive: Some(true),
            keystore: None,
            keystore_password: None,
            keystore_type: Some("JKS"),
            metadata_target: Some("@metadata"),
            pool_max: Some(50),
            pool_max_per_route: Some(25),
            proxy: None,
            request_timeout: Some(60),
            retry_non_idempotent: Some(false),
            schedule: Interval::new_interval(Duration::from_secs(5)),
            socket_timeout: Some(10),
            target: None,
            truststore: None,
            truststore_password: None,
            truststore_type: Some("JKS"),
            urls: vec![],
            validate_after_inactivity: Some(200),
            _sender: None,
        }
    }
}

The only difference is that all Input plugins have a _sender field which is a channel sender, all Filter plugins have _sender and _receiver fields, and all Output plugins have _receiver fields.

Questions I have

  1. For each Filter plugin, the implementation of Stream will consume messages from the previous plugin’s mpsc channel. Example: https://gitlab.com/andrewbanchich/emmett/blob/master/src/filters/geoip.rs. I don’t know if I’m using channels correctly. Is it a good idea to put a channel inside a Stream like that? Returning Ok(Async::Ready(Some(Value))) would be useless for my pattern since it just needs to be sent to the output channel, so I am always returning Ok(Async::Ready(None)) from the Stream and send the message to the _sender channel before that.

  2. This is repeated on all current filters but feels like there should be a better way to do it:

if let Some(message) = try_ready!(process.poll()) {
                if let Some(sender) = self._sender.to_owned() {
                    let mut send = sender.send(message.clone());
                    try_ready!(send.poll().map_err(|_| ()));
                }
            }
  1. I couldn’t figure out how to have all Output plugins consume messages from futures mpsc channels since they are for single consumers and I couldn’t clone / reference them with my program’s structure. I used crossbeam-channels instead, which work, but they are not async. I don’t know if I feel comfortable since it’s kind of a workaround and would prefer it all to be async. Any suggestions?

  2. When I run the program, I’ve noticed both that my MacBook Pro’s fan will turn on and that the memory it uses (looking at ActivityMonitor) goes from about 3MB to 3.5MB to 4MB, etc. without stopping. It increases pretty slowly, but I have seen this regularly. It’s more noticeable if I set it to use the ExecInput / GeneratorInput plugins to create input messages once per millisecond. I haven’t been able to figure out what could cause a memory leak. Any thoughts?

  3. Are there any anti-patterns you see regarding the plugin implementations I do have?

ExecInput
GeneratorInput
HttpPollerInput

CloneFilter
GeoipFilter
MutateFilter

StdoutOutput

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.