Window joins and window functions are implemented by Flink, Spark, and other big data tools. I give an use-case involving 2 streams “metrics” and “events” below.
In an ideal situation, machine-generated data would automatically alert the right set of people when a problem occurs. To do so, we need to do various “aggregations” and “joins” to connect a metric to the relevant employee.
Window computations can make big data pipelines CPU bound or memory bound, based on my experience. I do a quick investigation of why that is the case here.
Having fewer cache misses would make the code more efficient. However, that is somewhat difficult to do with a hash map. Not having a garbage collector may make a C++/Rust implementation better both in terms of CPU and memory.
On large data streams, some joins are not worth the CPU cost. Sometimes it is not worth the I/O cost to even ingest the data.
// Scenario:
// I have a large time series table. It contains metrics.
// The metrics are points that describe a 1 minute window.
//
// (timestamp, metric1, { "name": "avg_cpu_utilization", "device_id": "<uuid>", "location_id": 123123 })
// (timestamp, metric2, { "name": "io_bytes", "device_id": "<uuid>", "location_id": 123123 })
//
// I have a second time series table. It contains events.
//
// (timestamp, {"device_state": "INITIALIZING"}, { "device_id": "<uuid>" })
// (timestamp, {"device_state": "WAITING"}, { "device_id": "<uuid>" })
// (timestamp, {"device_state": "ENCODING"}, { "device_id": "<uuid>" })
//
// When CPU utilization is high, either the initialization code or the encoding code
// may be at fault. The initialization code and encoding code are handled by separate teams.
// In addition, there is an operations team that can simply replace the device.
//
// Can I create an alert that alerts the right teams?
//
// First, I want to extract fields from the JSON. This is a projection.
// Then, I want to assign an interval for the device state.
// The final schema will look like this.
// (12341134233, 12341234233, "INITIALIZING", "<uuid>")
// (12341234233, 12342234233, "ENCODING", "<uuid>")
//
// Now, we want to "join" these intervals together.
//
// Finally, we want to "join" again by alerting the right teams.
//
// What is the cheapest way to support building such a query?
// - Some parts of the query are OLAP and involve a column scan.
// - Some parts of the query can be done in MySQL or Postgres.
use std::collections::{BTreeMap, HashMap};
type TimestampNs = u128;
fn sliding_window_join(
interval_table1: Vec<(TimestampNs, TimestampNs, f64, String)>,
interval_table2: Vec<(TimestampNs, TimestampNs, String, String)>,
) -> Vec<(
u128,
u128,
(u128, u128, f64, String),
(u128, u128, String, String),
)> {
// 1. Scanning partition files. The device IDs should be dictionary encoded.
// - For an alert, we select partition files for a 30 minute interval.
// So, we can avoid scanning all partition files.
//
// This can be done in parallel and is I/O bound.
let before: u128 = 200;
let mut t1_filtered: Vec<(TimestampNs, TimestampNs, f64, String)> = interval_table1
.into_iter()
.filter(|(start_ts, _, _, _)| *start_ts >= before)
.collect();
let mut t2_filtered: Vec<(TimestampNs, TimestampNs, String, String)> = interval_table2
.into_iter()
.filter(|(start_ts, _, _, _)| *start_ts >= before)
.collect();
// 2. We do a modified sort-merge join. It is also called a window join.
// There is a chance the query will OOM.
//
// We can hash by (or group by) device id, making both the sort and merge steps parallelizable.
t1_filtered.sort_by_key(|(start_ts, _, _, _)| *start_ts);
t2_filtered.sort_by_key(|(start_ts, _, _, _)| *start_ts);
let mut out: Vec<(
u128,
u128,
(u128, u128, f64, String),
(u128, u128, String, String),
)> = Vec::new();
// The naive join can be quite costly with default data structures.
// It can easily OOM and does not have good cache locality.
//
// For this data, if we do 1 main memory reference per row in L1+L2, then our
// performance is (8+8+8+16) * 10 MB/s = 400MB/s.
// We want 200MB/s/core.
//
// So we allow ourself at most 2 L2 cache misses. However, there are probably
// much more than 2 L2 cache misses in this code.
//
// By using a hashmap, we introduce 3 L2 misses. Since the last value is
// a vector, are there 4?
//
// The wider the data, the less these are problems. Nevertheless, scanning the
// data is generally far better performance-wise.
//
// 200MB/s is possible there much more easily. The standard sort-merge join
// also is ok, but is probably 2x-5x slower?
//
// Is arithmetic rarely the problem? The problem seems to be L2 cache misses?
//
// So right away we are in a situation where CPU is the bottleneck.
let mut window_hash: HashMap<String, Vec<(TimestampNs, TimestampNs, f64, String)>> =
HashMap::new();
let mut window_btree: BTreeMap<TimestampNs, (TimestampNs, TimestampNs, f64, String)> =
BTreeMap::new();
let mut i = 0;
for r2 in t2_filtered {
println!("r2{:?}", r2.clone()); // TODO:
// Add new rows that could overlap r1.start_ts <= r2.end_ts
while i < t1_filtered.len() {
let r1: (u128, u128, f64, String) = t1_filtered[i].clone();
if r1.0 > r2.1 {
break;
}
window_btree.insert(r1.1, r1.clone());
if !window_hash.contains_key(&r1.3) {
window_hash.insert(r1.3.clone(), Vec::new());
}
let v = window_hash.get_mut(&r1.3).unwrap();
v.push(r1.clone());
i += 1;
}
let inspect = |a: &str| {
// TODO:
println!("{} {:?}", a, window_hash.clone());
println!("{} {:?}", a, window_btree.clone());
};
inspect("a");
// Expire rows in the window with r1.end_ts < r2 start_timestap
let mut to_delete = Vec::new();
for (end_ts, old_r1) in window_btree.range(0..r2.0) {
to_delete.push((end_ts.clone(), old_r1.clone()));
}
for (end_ts, old_r1) in to_delete {
window_btree.remove(&end_ts);
if let Some(v) = window_hash.get_mut(&old_r1.3) {
for i in (0..v.len()).rev() {
if v[i].1 == end_ts {
v.swap_remove(i);
}
}
}
}
let inspect = |a: &str| {
// TODO:
println!("{} {:?}", a, window_hash.clone());
println!("{} {:?}", a, window_btree.clone());
};
inspect("b");
if window_hash.contains_key(&r2.3) {
// produce 1+ rows
let v = window_hash.get(&r2.3).unwrap();
for r1 in v {
let start_ts = std::cmp::max(r1.0, r2.0);
let end_ts = std::cmp::min(r1.1, r2.1);
out.push((start_ts, end_ts, r1.clone(), r2.clone()))
}
}
}
return out;
}
// Window joins
//
// We want to utilize all 10GB/s ethernet links.
// We want to run queries in parallel, but also share the memory in the node.
//
// To avoid I/O being the bottleneck we want to process multiple alerts in
// a single scan, avoiding unnecessary scans. If I/O is the bottleneck, then
// there is not much we can do.
//
// To avoid CPU being the bottleneck, we want to be efficient with aggregations,
// and other hot paths. If CPU is the bottleneck, then run queries in parallel.
//
// To avoid memory being the bottleneck, we need to be careful about
// hash joins, windows, counters, etc. If memory is the bottleneck, then we
// need to duplicate I/O on separate nodes.
//
// Ideally, each CPU is at 100% utilization and we are constantly
// pulling new data into SSD buffers.
//
// CPU utilization can become the bottleneck.
//
//
// In a cloud environment,
// You have 10 Gbps per 32 cores. So you have 10/8 * 5 = 6.25 GB/s bandwidth on
// decompresed data.
// Each core needs to be processing 200MB/s to be I/O bound.
// For some workloads, CPU becomes the bottleneck.
// For example, JSON parsing often is ~20MB/s/core.
//
// When we can cache the data in memory, we now have a reason to go
// from 200MB/s to 1GB/s+. We are obviously CPU bound.
//
// For some time series data, we just let the hot data sit in memory and
// create a huge system of alerts from it? No, we are usually working with
// new data. So 200MB/s is still the goal.
//
// Writing 200MB/s code is not the easiest.
// Main memory reference = 100ns.
//
// Scanning vectory with 1e7 elements.
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn stuff() {
let interval_table1 = vec![(
1000,
2000,
82.7,
"d2e095e2-a547-494c-847f-147bf10ca78c".into(),
)];
let interval_table2 = vec![
(
500,
2500,
"initializing".into(),
"d2e095e2-a547-494c-847f-147bf10ca78c".into(),
),
(
2500,
3500,
"waiting".into(),
"d2e095e2-a547-494c-847f-147bf10ca78c".into(),
),
];
let out = sliding_window_join(interval_table1, interval_table2);
println!("{:?}", out);
}
}