Here is a ~100LOC implementation of support for storing JSON as columns. This is one of the small projects I did while learning Rust.

This implementation is a simpler, unoptimized version based on the Snowflake Data Warehouse paper.

In certain real-world cases, both insert performance and query performance on JSON data can be similar to inserting and querying int64/float64.

The C++ simdjson package supports >1GB/s/core JSON parsing. The Rust package used here should have similar performance. So, in cloud environments where we persist data to block or object storage, network bandwidth may be the bottleneck for writes, not CPU.

Query performance should be similar, if schema inference is implemented. Numeric JSON fields can be stored as int64/float64 and encoded.

Storing log and metric data as JSON in Snowflake leads to 5-10x compression ratios. This is not as good as some time-series databases, but is acceptable for many types of data, since post-insertion data transformation costs typically are much more expensive than insertion costs.

There are a few optimization opportunities that Snowflake probably does not implement that are useful for time-series data:

  1. Efficiently putting JSON data with similar schema in the same partitions. For Prometheus data, we want metrics with the same tags to be clustered in the same partition files. Ideally, good partitioning is done earlier rather than later.
  2. Efficient alerting. When we detect a schema change, data volume change, or data anomoly we can notify a human. Ideally, we can define cheap alerts that trigger at insertion time, compaction time, or post-processing time. These alerts are usually just approximate aggregations.
  3. Alert test cases. Alerts are easy to write incorrectly. Ideally it is possible for a user to test beforehand that an alert would trigger at the right time, at the right interval, and with the right context.
use std::collections::HashMap;

use simd_json::{OwnedValue, StaticNode};

const DEFAULT_MAX_COLUMNS: u32 = 50;

struct SparseJsonColumnStore {
    deltas: Vec<(DeltaOperation, RowId, String)>,
    data: Data,
}

// Assume these are immutable or append-only files.
// Updates to data involve re-creating it.
#[derive(Clone, Debug)]
struct Data {
    // TODO: max columns, to support max columns, we want to split insert_recursive()
    // into two steps. The first step recursively scans through the owned objects in order
    // to identify the columns for the batch of data.
    // - We only want to keep ~50 columns. The 50 columns should be the most common columns in the data.
    // - We want to infer the data type. (bool -> int64 -> float64 -> string -> variant)
    // - We support a variant type, which is basically a string because it has fewer encoding options.
    max_columns: u32,
    static_values_sparse: Vec<Vec<(RowId, StaticNode)>>,
    string_values_sparse: Vec<Vec<(RowId, String)>>,
    static_columns: HashMap<String, usize>,
    string_columns: HashMap<String, usize>,
}

#[derive(Clone, Debug)]
enum Column {
    Static(Vec<(RowId, StaticNode)>),
    String(Vec<(RowId, String)>),
}

impl Data {
    fn new() -> Self {
        Data {
            max_columns: DEFAULT_MAX_COLUMNS,
            static_values_sparse: Vec::new(),
            string_values_sparse: Vec::new(),
            static_columns: HashMap::new(),
            string_columns: HashMap::new(),
        }
    }

    fn insert(&mut self, id: RowId, value: &OwnedValue, json_path: String) {
        self.insert_recursive_entrypoint(value, json_path, id);
    }

    fn insert_recursive_entrypoint(&mut self, value: &OwnedValue, json_path: String, id: i64) {
        match value {
            simd_json::OwnedValue::Static(sv) => {
                if !self.static_columns.contains_key(&json_path) {
                    self.static_columns
                        .insert(json_path.clone(), self.static_columns.len());
                    self.static_values_sparse.push(Vec::new());
                }

                let i = self.static_columns.get(&json_path).unwrap().clone();
                self.static_values_sparse[i].push((id, sv.clone()));
            }
            simd_json::OwnedValue::String(s) => {
                if !self.string_columns.contains_key(&json_path) {
                    self.string_columns
                        .insert(json_path.clone(), self.string_columns.len());
                    self.string_values_sparse.push(Vec::new());
                }

                let i = self.string_columns.get(&json_path).unwrap().clone();
                self.string_values_sparse[i].push((id, s.clone()));
            }
            simd_json::OwnedValue::Array(v) => {
                for (i, av) in v.iter().enumerate() {
                    let new_path = format!("{}.[{}]", json_path, i);
                    self.insert(id, av, new_path)
                }
            }
            simd_json::OwnedValue::Object(v) => {
                let o = v.as_ref();
                for (k, ov) in o.iter() {
                    let k_escaped = k.replace("'", "\\'");
                    let key_path = format!("{}.['{}']", json_path, k_escaped); // jsonpath
                    self.insert(id, ov, key_path);
                }
            }
        }
    }
}

#[derive(Clone, PartialEq, Debug)]
enum DeltaOperation {
    Insert,
    Update,
    Delete,
}

impl SparseJsonColumnStore {
    fn new() -> Self {
        return SparseJsonColumnStore {
            deltas: Vec::new(),
            data: Data::new(),
        };
    }
    fn insert(&mut self, k: i64, value: String) {
        self.deltas.push((DeltaOperation::Insert, k, value));
    }
    fn update(&mut self, k: i64, value: String) {
        self.deltas.push((DeltaOperation::Update, k, value));
    }
    fn delete(&mut self, k: i64) {
        self.deltas.push((DeltaOperation::Delete, k, String::new()));
    }

    fn compact(&mut self) {
        // Move data from deltas to columns
        let delta_map: HashMap<i64, (DeltaOperation, String)> = {
            let mut m: HashMap<i64, (DeltaOperation, String)> = HashMap::new();
            for (op, k, v) in self.deltas.iter() {
                match op {
                    DeltaOperation::Insert => {
                        m.insert(k.clone(), (op.clone(), v.clone()));
                    }
                    DeltaOperation::Update => {
                        // TODO: what about partial updates? Merge the rows?
                        m.insert(k.clone(), (op.clone(), v.clone()));
                    }
                    DeltaOperation::Delete => {
                        m.insert(k.clone(), (op.clone(), v.clone()));
                    }
                }
            }
            m
        };
        self.deltas = Vec::new();

        let mut temp = Data::new();
        // Insert existing data, ignoring deleted data and data to be updated
        temp.static_columns = self.data.static_columns.clone();
        temp.string_columns = self.data.string_columns.clone();
        temp.static_values_sparse = vec![Vec::new(); self.data.static_values_sparse.len()];
        temp.string_values_sparse = vec![Vec::new(); self.data.string_values_sparse.len()];

        for (i, c) in self.data.static_values_sparse.iter().enumerate() {
            for (id, v) in c.iter() {
                if delta_map.contains_key(id) {
                    // ignore
                    // TODO: what if you want to preserve clustering?
                } else {
                    temp.static_values_sparse[i].push((id.clone(), v.clone()));
                }
            }
        }

        // Insert deltas as columns
        for (id, (op, value_string)) in delta_map {
            match op {
                DeltaOperation::Insert | DeltaOperation::Update => {
                    let mut vs = value_string.clone();
                    let b = unsafe { vs.as_bytes_mut() };
                    let vr = simd_json::to_owned_value(b);
                    match vr {
                        Ok(v) => {
                            match v {
                                OwnedValue::Static(_) | OwnedValue::String(_) => {
                                    temp.insert(id, &v, "$".into())
                                }
                                OwnedValue::Array(_) | OwnedValue::Object(_) => {
                                    temp.insert(id, &v, "$".into());
                                    temp.insert(id, &OwnedValue::String(value_string), "$".into());
                                    // TODO: collisions?
                                }
                            }
                        }
                        Err(_) => todo!(), // TODO:
                    }
                }
                DeltaOperation::Delete => {
                    // ignore the row
                }
            }
        }

        self.data = temp;
    }

    fn scan(&mut self, columns: Vec<String>) -> Vec<Column> {
        if self.deltas.len() > 0 {
            self.compact();
        }

        let mut result = Vec::new();
        for k in columns {
            if let Some(&i) = self.data.static_columns.get(&k) {
                let c = self.data.static_values_sparse[i].clone();
                result.push(Column::Static(c));
            } else if let Some(&i) = self.data.string_columns.get(&k) {
                // TODO: what if a column is both a string and a fixed?
                // Then we need a layer of indirection to deal with it.
                // Expose a variant type?
                let c = self.data.string_values_sparse[i].clone();
                result.push(Column::String(c));
            } else {
                // TODO: error?
            }
        }

        return result;
    }

    fn inspect(&self) {
        println!("{:?}", (self.data));
    }
}

type RowId = i64;

fn main() {
    println!("Hello, world!");

    let mut j = SparseJsonColumnStore::new();
    let v = r#"{"b": true, "z": 1234}"#;
    let v2 =
        r#"{"b": true, "z": 5677, "s": "hello \"bob\"", "o": {"nested o": {"record": 34234} }}"#;
    j.insert(1, v.to_string());
    j.insert(2, v.to_string());
    j.update(1, v2.to_string());
    j.delete(2);

    let result = j.scan(vec!["$.['b']".into(), "$.['z']".into(), "$.['s']".into()]);
    println!("result: {:?}", result);

    j.inspect();
}