Window joins and window functions are implemented by Flink, Spark, and other big data tools. I give an use-case involving 2 streams “metrics” and “events” below.

In an ideal situation, machine-generated data would automatically alert the right set of people when a problem occurs. To do so, we need to do various “aggregations” and “joins” to connect a metric to the relevant employee.

Window computations can make big data pipelines CPU bound or memory bound, based on my experience. I do a quick investigation of why that is the case here.

Having fewer cache misses would make the code more efficient. However, that is somewhat difficult to do with a hash map. Not having a garbage collector may make a C++/Rust implementation better both in terms of CPU and memory.

On large data streams, some joins are not worth the CPU cost. Sometimes it is not worth the I/O cost to even ingest the data.

// Scenario:
// I have a large time series table. It contains metrics.
// The metrics are points that describe a 1 minute window.
//
// (timestamp, metric1, { "name": "avg_cpu_utilization", "device_id": "<uuid>", "location_id": 123123 })
// (timestamp, metric2, { "name": "io_bytes", "device_id": "<uuid>", "location_id": 123123 })
//
// I have a second time series table. It contains events.
//
// (timestamp, {"device_state": "INITIALIZING"}, { "device_id": "<uuid>" })
// (timestamp, {"device_state": "WAITING"}, { "device_id": "<uuid>" })
// (timestamp, {"device_state": "ENCODING"}, { "device_id": "<uuid>" })
//
// When CPU utilization is high, either the initialization code or the encoding code
// may be at fault. The initialization code and encoding code are handled by separate teams.
// In addition, there is an operations team that can simply replace the device.
//
// Can I create an alert that alerts the right teams?
//
// First, I want to extract fields from the JSON. This is a projection.
// Then, I want to assign an interval for the device state.
// The final schema will look like this.
// (12341134233, 12341234233, "INITIALIZING", "<uuid>")
// (12341234233, 12342234233, "ENCODING", "<uuid>")
//
// Now, we want to "join" these intervals together.
//
// Finally, we want to "join" again by alerting the right teams.
//
// What is the cheapest way to support building such a query?
// - Some parts of the query are OLAP and involve a column scan.
// - Some parts of the query can be done in MySQL or Postgres.

use std::collections::{BTreeMap, HashMap};

type TimestampNs = u128;

fn sliding_window_join(
    interval_table1: Vec<(TimestampNs, TimestampNs, f64, String)>,
    interval_table2: Vec<(TimestampNs, TimestampNs, String, String)>,
) -> Vec<(
    u128,
    u128,
    (u128, u128, f64, String),
    (u128, u128, String, String),
)> {
    // 1. Scanning partition files. The device IDs should be dictionary encoded.
    //  - For an alert, we select partition files for a 30 minute interval.
    //    So, we can avoid scanning all partition files.
    //
    // This can be done in parallel and is I/O bound.
    let before: u128 = 200;
    let mut t1_filtered: Vec<(TimestampNs, TimestampNs, f64, String)> = interval_table1
        .into_iter()
        .filter(|(start_ts, _, _, _)| *start_ts >= before)
        .collect();
    let mut t2_filtered: Vec<(TimestampNs, TimestampNs, String, String)> = interval_table2
        .into_iter()
        .filter(|(start_ts, _, _, _)| *start_ts >= before)
        .collect();

    // 2. We do a modified sort-merge join. It is also called a window join.
    // There is a chance the query will OOM.
    //
    // We can hash by (or group by) device id, making both the sort and merge steps parallelizable.
    t1_filtered.sort_by_key(|(start_ts, _, _, _)| *start_ts);
    t2_filtered.sort_by_key(|(start_ts, _, _, _)| *start_ts);

    let mut out: Vec<(
        u128,
        u128,
        (u128, u128, f64, String),
        (u128, u128, String, String),
    )> = Vec::new();
    // The naive join can be quite costly with default data structures.
    // It can easily OOM and does not have good cache locality.
    //
    // For this data, if we do 1 main memory reference per row in L1+L2, then our
    // performance is (8+8+8+16) * 10 MB/s = 400MB/s.
    // We want 200MB/s/core.
    //
    // So we allow ourself at most 2 L2 cache misses. However, there are probably
    // much more than 2 L2 cache misses in this code.
    //
    // By using a hashmap, we introduce 3 L2 misses. Since the last value is
    // a vector, are there 4?
    //
    // The wider the data, the less these are problems. Nevertheless, scanning the
    // data is generally far better performance-wise.
    //
    // 200MB/s is possible there much more easily. The standard sort-merge join
    // also is ok, but is probably 2x-5x slower?
    //
    // Is arithmetic rarely the problem? The problem seems to be L2 cache misses?
    //
    // So right away we are in a situation where CPU is the bottleneck.
    let mut window_hash: HashMap<String, Vec<(TimestampNs, TimestampNs, f64, String)>> =
        HashMap::new();
    let mut window_btree: BTreeMap<TimestampNs, (TimestampNs, TimestampNs, f64, String)> =
        BTreeMap::new();
    let mut i = 0;
    for r2 in t2_filtered {
        println!("r2{:?}", r2.clone()); // TODO:
                                        // Add new rows that could overlap r1.start_ts <= r2.end_ts
        while i < t1_filtered.len() {
            let r1: (u128, u128, f64, String) = t1_filtered[i].clone();
            if r1.0 > r2.1 {
                break;
            }
            window_btree.insert(r1.1, r1.clone());
            if !window_hash.contains_key(&r1.3) {
                window_hash.insert(r1.3.clone(), Vec::new());
            }
            let v = window_hash.get_mut(&r1.3).unwrap();
            v.push(r1.clone());
            i += 1;
        }

        let inspect = |a: &str| {
            // TODO:
            println!("{} {:?}", a, window_hash.clone());
            println!("{} {:?}", a, window_btree.clone());
        };
        inspect("a");

        // Expire rows in the window with r1.end_ts < r2 start_timestap
        let mut to_delete = Vec::new();
        for (end_ts, old_r1) in window_btree.range(0..r2.0) {
            to_delete.push((end_ts.clone(), old_r1.clone()));
        }
        for (end_ts, old_r1) in to_delete {
            window_btree.remove(&end_ts);
            if let Some(v) = window_hash.get_mut(&old_r1.3) {
                for i in (0..v.len()).rev() {
                    if v[i].1 == end_ts {
                        v.swap_remove(i);
                    }
                }
            }
        }

        let inspect = |a: &str| {
            // TODO:
            println!("{} {:?}", a, window_hash.clone());
            println!("{} {:?}", a, window_btree.clone());
        };
        inspect("b");

        if window_hash.contains_key(&r2.3) {
            // produce 1+ rows
            let v = window_hash.get(&r2.3).unwrap();
            for r1 in v {
                let start_ts = std::cmp::max(r1.0, r2.0);
                let end_ts = std::cmp::min(r1.1, r2.1);

                out.push((start_ts, end_ts, r1.clone(), r2.clone()))
            }
        }
    }
    return out;
}

// Window joins
//
// We want to utilize all 10GB/s ethernet links.
// We want to run queries in parallel, but also share the memory in the node.
//
// To avoid I/O being the bottleneck we want to process multiple alerts in
// a single scan, avoiding unnecessary scans. If I/O is the bottleneck, then
// there is not much we can do.
//
// To avoid CPU being the bottleneck, we want to be efficient with aggregations,
// and other hot paths. If CPU is the bottleneck, then run queries in parallel.
//
// To avoid memory being the bottleneck, we need to be careful about
// hash joins, windows, counters, etc. If memory is the bottleneck, then we
// need to duplicate I/O on separate nodes.
//
// Ideally, each CPU is at 100% utilization and we are constantly
// pulling new data into SSD buffers.
//
// CPU utilization can become the bottleneck.
//
//
// In a cloud environment,
// You have 10 Gbps per 32 cores. So you have 10/8 * 5 = 6.25 GB/s bandwidth on
// decompresed data.
// Each core needs to be processing 200MB/s to be I/O bound.
// For some workloads, CPU becomes the bottleneck.
// For example, JSON parsing often is ~20MB/s/core.
//
// When we can cache the data in memory, we now have a reason to go
// from 200MB/s to 1GB/s+. We are obviously CPU bound.
//
// For some time series data, we just let the hot data sit in memory and
// create a huge system of alerts from it? No, we are usually working with
// new data. So 200MB/s is still the goal.
//
// Writing 200MB/s code is not the easiest.
// Main memory reference = 100ns.
//
// Scanning vectory with 1e7 elements.

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn stuff() {
        let interval_table1 = vec![(
            1000,
            2000,
            82.7,
            "d2e095e2-a547-494c-847f-147bf10ca78c".into(),
        )];

        let interval_table2 = vec![
            (
                500,
                2500,
                "initializing".into(),
                "d2e095e2-a547-494c-847f-147bf10ca78c".into(),
            ),
            (
                2500,
                3500,
                "waiting".into(),
                "d2e095e2-a547-494c-847f-147bf10ca78c".into(),
            ),
        ];

        let out = sliding_window_join(interval_table1, interval_table2);
        println!("{:?}", out);
    }
}