Skip to content
Merged
114 changes: 94 additions & 20 deletions parquet-variant/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.
use crate::decoder::{VariantBasicType, VariantPrimitiveType};
use crate::{ShortString, Variant};
use std::collections::HashMap;
use std::collections::BTreeMap;

const BASIC_TYPE_BITS: u8 = 2;
const UNIX_EPOCH_DATE: chrono::NaiveDate = chrono::NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
Expand Down Expand Up @@ -166,15 +166,15 @@ fn make_room_for_header(buffer: &mut Vec<u8>, start_pos: usize, header_size: usi
///
pub struct VariantBuilder {
buffer: Vec<u8>,
dict: HashMap<String, u32>,
dict: BTreeMap<String, u32>,
dict_keys: Vec<String>,
}

impl VariantBuilder {
pub fn new() -> Self {
Self {
buffer: Vec::new(),
dict: HashMap::new(),
dict: BTreeMap::new(),
dict_keys: Vec::new(),
}
}
Expand Down Expand Up @@ -296,7 +296,7 @@ impl VariantBuilder {

/// Add key to dictionary, return its ID
fn add_key(&mut self, key: &str) -> u32 {
use std::collections::hash_map::Entry;
use std::collections::btree_map::Entry;
match self.dict.entry(key.to_string()) {
Entry::Occupied(entry) => *entry.get(),
Entry::Vacant(entry) => {
Expand Down Expand Up @@ -482,7 +482,7 @@ impl<'a> ListBuilder<'a> {
pub struct ObjectBuilder<'a> {
parent: &'a mut VariantBuilder,
start_pos: usize,
fields: Vec<(u32, usize)>, // (field_id, offset)
fields: BTreeMap<u32, usize>, // (field_id, offset)
}

impl<'a> ObjectBuilder<'a> {
Expand All @@ -491,7 +491,7 @@ impl<'a> ObjectBuilder<'a> {
Self {
parent,
start_pos,
fields: Vec::new(),
fields: BTreeMap::new(),
}
}

Expand All @@ -500,25 +500,27 @@ impl<'a> ObjectBuilder<'a> {
let id = self.parent.add_key(key);
let field_start = self.parent.offset() - self.start_pos;
self.parent.append_value(value);
self.fields.push((id, field_start));
let res = self.fields.insert(id, field_start);
debug_assert!(res.is_none());
}

/// Finalize object with sorted fields
pub fn finish(mut self) {
// Sort fields by key name
self.fields.sort_by(|a, b| {
let key_a = &self.parent.dict_keys[a.0 as usize];
let key_b = &self.parent.dict_keys[b.0 as usize];
key_a.cmp(key_b)
});

pub fn finish(self) {
let data_size = self.parent.offset() - self.start_pos;
let num_fields = self.fields.len();
let is_large = num_fields > u8::MAX as usize;
let size_bytes = if is_large { 4 } else { 1 };

let max_id = self.fields.iter().map(|&(id, _)| id).max().unwrap_or(0);
let id_size = int_size(max_id as usize);
let field_ids_by_sorted_field_name = self
.parent
.dict
.iter()
.filter_map(|(_, id)| self.fields.contains_key(id).then_some(*id))
.collect::<Vec<_>>();

let max_id = self.fields.keys().last().copied().unwrap_or(0) as usize;

let id_size = int_size(max_id);
let offset_size = int_size(data_size);

let header_size = 1
Expand All @@ -542,17 +544,18 @@ impl<'a> ObjectBuilder<'a> {
}

// Write field IDs (sorted order)
for &(id, _) in &self.fields {
for id in &field_ids_by_sorted_field_name {
write_offset(
&mut self.parent.buffer[pos..pos + id_size as usize],
id as usize,
*id as usize,
id_size,
);
pos += id_size as usize;
}

// Write field offsets
for &(_, offset) in &self.fields {
for id in &field_ids_by_sorted_field_name {
let &offset = self.fields.get(id).unwrap();
write_offset(
&mut self.parent.buffer[pos..pos + offset_size as usize],
offset,
Expand Down Expand Up @@ -749,6 +752,77 @@ mod tests {
assert_eq!(field_ids, vec![1, 2, 0]);
}

#[test]
fn test_object_and_metadata_ordering() {
let mut builder = VariantBuilder::new();

let mut obj = builder.new_object();

obj.append_value("zebra", "stripes"); // ID = 0
obj.append_value("apple", "red"); // ID = 1

{
// fields_map is ordered by insertion order (field id)
let fields_map = obj.fields.keys().copied().collect::<Vec<_>>();
assert_eq!(fields_map, vec![0, 1]);

// dict is ordered by field names
// NOTE: when we support nested objects, we'll want to perform a filter by fields_map field ids
let dict_metadata = obj
.parent
.dict
.iter()
.map(|(f, i)| (f.as_str(), *i))
.collect::<Vec<_>>();

assert_eq!(dict_metadata, vec![("apple", 1), ("zebra", 0)]);

// dict_keys is ordered by insertion order (field id)
let dict_keys = obj
.parent
.dict_keys
.iter()
.map(|k| k.as_str())
.collect::<Vec<_>>();
assert_eq!(dict_keys, vec!["zebra", "apple"]);
}

obj.append_value("banana", "yellow"); // ID = 2

{
// fields_map is ordered by insertion order (field id)
let fields_map = obj.fields.keys().copied().collect::<Vec<_>>();
assert_eq!(fields_map, vec![0, 1, 2]);

// dict is ordered by field names
// NOTE: when we support nested objects, we'll want to perform a filter by fields_map field ids
let dict_metadata = obj
.parent
.dict
.iter()
.map(|(f, i)| (f.as_str(), *i))
.collect::<Vec<_>>();

assert_eq!(
dict_metadata,
vec![("apple", 1), ("banana", 2), ("zebra", 0)]
);

// dict_keys is ordered by insertion order (field id)
let dict_keys = obj
.parent
.dict_keys
.iter()
.map(|k| k.as_str())
.collect::<Vec<_>>();
assert_eq!(dict_keys, vec!["zebra", "apple", "banana"]);
}

obj.finish();

builder.finish();
}

#[test]
fn test_append_object() {
let (object_metadata, object_value) = {
Expand Down
Loading