How to Check and Prevent Redundant Data in Redis Streams?

Hello everyone!

I am developing an application that uses Redis Streams to store large volumes of data collected through web scraping. One of the challenges I'm facing is how to check and prevent the insertion of redundant data into the Stream without need to have a "SQL thoughtment". Something simple, already exists in Redis

Simple example add to stream

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = redis::Client::open("redis://127.0.0.1/")?;
    let mut conn = client.get_multiplexed_async_connection().await?;

    let title = "BULLONERIE GALVIT2";
    let group_name = "raworld2";
    let description = "[IA generated] BULLONERIE GALVIT is a company specializing in the production and distribution of fasteners and metal components. Known for its high-quality standards, the company offers a wide range of products including bolts, nuts, and screws, catering to various industries such as construction and manufacturing. Their focus on innovation and customer service has established them as a trusted name in the sector.";

    // Criando uma chave única baseada nos dados
    let mut hasher = Sha256::new();
    hasher.update(format!("{}{}{}", title, group_name, description));
    let hash = hasher.finalize();

    // Verificando se já existe
    let exists: bool = conn.sismember("exploit_hashes", hash.as_slice()).await?;

    if !exists {
        // Adiciona o hash ao set de controle
        conn.sadd("exploit_hashes", hash.as_slice()).await?;

        // Adiciona os dados ao stream
        conn.xadd_maxlen(
            "queue_exploit",
            StreamMaxlen::Approx(1000),
            "*",
            &[
                ("title", title),
                ("group_name", group_name),
                ("description", description)
            ]
        ).await?;
        
        println!("Dados adicionados ao stream");
    } else {
        println!("Dados duplicados encontrados");
    }

    Ok(())
}

I would like to know if exists this approach is the most efficient or if there are other techniques that could be utilized to manage data duplication in Redis Streams. Additionally, how can I ensure that the verification logic works well with the high volume of information I am collecting?

Thank you in advance for any suggestions and guidance!

Let me first say I don't have any experience with Redis Streams (just old school Redis, long before all of the licensing rigmarole). I'll let someone more familiar with the topic comment on that.

I have a few observations to share regarding the sample code you provided.

First, I personally like to be able to build the code locally (and maybe other forum users do as well), but this one is missing some critical information. The declaration for the Sha256 type is missing, which is the biggest point to highlight as far as building the code. I was able to track it down (I think) to an old version of a public crate: sha2@0.9.9. I could be wrong, but this looks like the best candidate. So, I have to make an assumption and hope for the best. Let alone the time it took to actually find a compatible type! I suggest including all of the information needed to test sample code when posted.

Second, the fn_call(format!("{}{}{}", ...)) sticks out as an antipattern. This will copy a potentially large amount of data into a new heap allocation. In this specific case, that overhead is entirely unnecessary (bear with me, I'm basing this on the belief that my assumption of the old sha2 is correct). Sha256::update() takes data: &[u8] and according to the docs:

This method can be called repeatedly for use with streaming messages.

Thus, you can replace the format string with:

    hasher.update(title);
    hasher.update(group_name);
    hasher.update(description);

And it will save you some CPU by avoiding an allocation and all of the copying. I was able to verify this with a simple test:

    let mut hasher = Sha256::new();
    hasher.update(format!("{}{}{}", title, group_name, description));
    let h1 = hasher.finalize();

    let mut hasher = Sha256::new();
    hasher.update(title);
    hasher.update(group_name);
    hasher.update(description);
    let h2 = hasher.finalize();

    assert_eq!(h1, h2);

Third, you are using the Redis SET type (the sadd command, I remember that much from my Redis days) to deduplicate hashes. While you probably have your reasons for using Redis to do this, it should be noted that Rust also has a native Set type for doing this kind of filtering on the local host. For a large number of hashes, I would instead recommend a bloom filter [1] to probabilistically deduplicate hashes locally before using Redis for distributed hash deduplication (assuming you have multiple clients doing all of this hashing work concurrently). Whether you choose to locally deduplicate is up to you, but it could be a nice win depending on the shape of your data.

Fourth, the Redis sadd command returns the number of elements added and which were not already present. This should be used as the conditional for the Redis Streams part, not the combination of sismember + sadd. Such a two-step check-then-set is non-atomic and will allow multiple clients to run the condition body multiple times when collisions occur simultaneously. It also adds extra network traffic and CPU load for no good reason. Do this instead:

    let added: i64 = conn.sadd("exploit_hashes", hash.as_slice()).await?;

    if added == 0 {
        conn.xadd_maxlen("queue_exploit", ...).await?;

        println!("Dados adicionados ao stream");
    } else {
        println!("Dados duplicados encontrados");
    }

  1. I do not have any recommendation for a bloom filter crate. A quick search turns up some good looking candidates: fastbloom for a SIMD implementation, and the more popular bloomfilter. ↩︎

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.