use std::collections::HashMap;
use std::fmt;
use std::rc::Rc;
use basic::{LogicalType, Repetition};
use errors::{ParquetError, Result};
use file::reader::{FileReader, RowGroupReader};
use schema::types::{ColumnPath, SchemaDescriptor, SchemaDescPtr, Type, TypePtr};
use record::api::{Row, Field, make_row, make_list, make_map};
use record::triplet::TripletIter;
const DEFAULT_BATCH_SIZE: usize = 1024;
pub struct TreeBuilder {
batch_size: usize
}
impl TreeBuilder {
pub fn new() -> Self {
Self {
batch_size: DEFAULT_BATCH_SIZE
}
}
pub fn with_batch_size(mut self, batch_size: usize) -> Self {
self.batch_size = batch_size;
self
}
pub fn build(
&self,
descr: SchemaDescPtr,
row_group_reader: &RowGroupReader
) -> Reader {
let mut paths: HashMap<ColumnPath, usize> = HashMap::new();
let row_group_metadata = row_group_reader.metadata();
for col_index in 0..row_group_reader.num_columns() {
let col_meta = row_group_metadata.column(col_index);
let col_path = col_meta.column_path().clone();
paths.insert(col_path, col_index);
}
let mut readers = Vec::new();
let mut path = Vec::new();
for field in descr.root_schema().get_fields() {
let reader = self.reader_tree(
field.clone(), &mut path, 0, 0, &paths, row_group_reader);
readers.push(reader);
}
Reader::GroupReader(None, 0, readers)
}
pub fn as_iter(
&self,
descr: SchemaDescPtr,
row_group_reader: &RowGroupReader
) -> ReaderIter {
let num_records = row_group_reader.metadata().num_rows() as usize;
ReaderIter::new(self.build(descr, row_group_reader), num_records)
}
fn reader_tree(
&self,
field: TypePtr,
mut path: &mut Vec<String>,
mut curr_def_level: i16,
mut curr_rep_level: i16,
paths: &HashMap<ColumnPath, usize>,
row_group_reader: &RowGroupReader
) -> Reader {
assert!(field.get_basic_info().has_repetition());
let repetition = field.get_basic_info().repetition();
match repetition {
Repetition::OPTIONAL => {
curr_def_level += 1;
},
Repetition::REPEATED => {
curr_def_level += 1;
curr_rep_level += 1;
},
_ => {}
}
path.push(String::from(field.name()));
let reader = if field.is_primitive() {
let col_path = ColumnPath::new(path.to_vec());
let orig_index = *paths.get(&col_path).unwrap();
let col_descr = row_group_reader.metadata().column(orig_index).column_descr_ptr();
let col_reader = row_group_reader.get_column_reader(orig_index).unwrap();
let column = TripletIter::new(col_descr, col_reader, self.batch_size);
Reader::PrimitiveReader(field, column)
} else {
match field.get_basic_info().logical_type() {
LogicalType::LIST => {
assert_eq!(field.get_fields().len(), 1, "Invalid list type {:?}", field);
let repeated_field = field.get_fields()[0].clone();
assert_eq!(
repeated_field.get_basic_info().repetition(),
Repetition::REPEATED,
"Invalid list type {:?}",
field
);
if Reader::is_element_type(&repeated_field) {
let reader = self.reader_tree(repeated_field.clone(), &mut path,
curr_def_level, curr_rep_level, paths, row_group_reader);
Reader::RepeatedReader(
field, curr_def_level, curr_rep_level, Box::new(reader))
} else {
let child_field = repeated_field.get_fields()[0].clone();
path.push(String::from(repeated_field.name()));
let reader = self.reader_tree(child_field, &mut path,
curr_def_level + 1, curr_rep_level + 1, paths, row_group_reader);
path.pop();
Reader::RepeatedReader(
field, curr_def_level, curr_rep_level, Box::new(reader))
}
},
LogicalType::MAP | LogicalType:: MAP_KEY_VALUE => {
assert_eq!(field.get_fields().len(), 1, "Invalid map type: {:?}", field);
assert!(!field.get_fields()[0].is_primitive(), "Invalid map type: {:?}", field);
let key_value_type = field.get_fields()[0].clone();
assert_eq!(
key_value_type.get_basic_info().repetition(),
Repetition::REPEATED,
"Invalid map type: {:?}",
field
);
assert_eq!(
key_value_type.get_fields().len(),
2,
"Invalid map type: {:?}",
field
);
path.push(String::from(key_value_type.name()));
let key_type = &key_value_type.get_fields()[0];
assert!(
key_type.is_primitive(),
"Map key type is expected to be a primitive type, but found {:?}",
key_type
);
let key_reader = self.reader_tree(key_type.clone(), &mut path,
curr_def_level + 1, curr_rep_level + 1, paths, row_group_reader);
let value_type = &key_value_type.get_fields()[1];
let value_reader = self.reader_tree(value_type.clone(), &mut path,
curr_def_level + 1, curr_rep_level + 1, paths, row_group_reader);
path.pop();
Reader::KeyValueReader(field, curr_def_level, curr_rep_level,
Box::new(key_reader), Box::new(value_reader))
},
_ if repetition == Repetition::REPEATED => {
let required_field = Type::group_type_builder(field.name())
.with_repetition(Repetition::REQUIRED)
.with_logical_type(field.get_basic_info().logical_type())
.with_fields(&mut Vec::from(field.get_fields()))
.build()
.unwrap();
path.pop();
let reader = self.reader_tree(Rc::new(required_field), &mut path,
curr_def_level, curr_rep_level, paths, row_group_reader);
Reader::RepeatedReader(field, curr_def_level - 1, curr_rep_level - 1,
Box::new(reader))
},
_ => {
let mut readers = Vec::new();
for child in field.get_fields() {
let reader = self.reader_tree(child.clone(), &mut path,
curr_def_level, curr_rep_level, paths, row_group_reader);
readers.push(reader);
}
Reader::GroupReader(Some(field), curr_def_level, readers)
}
}
};
path.pop();
Reader::option(repetition, curr_def_level, reader)
}
}
pub enum Reader {
PrimitiveReader(TypePtr, TripletIter),
OptionReader(i16, Box<Reader>),
GroupReader(Option<TypePtr>, i16, Vec<Reader>),
RepeatedReader(TypePtr, i16, i16, Box<Reader>),
KeyValueReader(TypePtr, i16, i16, Box<Reader>, Box<Reader>)
}
impl Reader {
fn option(repetition: Repetition, def_level: i16, reader: Reader) -> Self {
if repetition == Repetition::OPTIONAL {
Reader::OptionReader(def_level - 1, Box::new(reader))
} else {
reader
}
}
fn is_element_type(repeated_type: &Type) -> bool {
repeated_type.is_primitive() ||
repeated_type.is_group() && repeated_type.get_fields().len() > 1 ||
repeated_type.name() == "array" ||
repeated_type.name().ends_with("_tuple")
}
fn read(&mut self) -> Row {
match *self {
Reader::GroupReader(_, _, ref mut readers) => {
let mut fields = Vec::new();
for reader in readers {
fields.push((String::from(reader.field_name()), reader.read_field()));
}
make_row(fields)
},
_ => panic!("Cannot call read() on {}", self)
}
}
fn read_field(&mut self) -> Field {
match *self {
Reader::PrimitiveReader(_, ref mut column) => {
let value = column.current_value();
column.read_next().unwrap();
value
},
Reader::OptionReader(def_level, ref mut reader) => {
if reader.current_def_level() > def_level {
reader.read_field()
} else {
reader.advance_columns();
Field::Null
}
},
Reader::GroupReader(_, def_level, ref mut readers) => {
let mut fields = Vec::new();
for reader in readers {
if reader.repetition() != Repetition::OPTIONAL ||
reader.current_def_level() > def_level {
fields.push((String::from(reader.field_name()), reader.read_field()));
} else {
reader.advance_columns();
fields.push((String::from(reader.field_name()), Field::Null));
}
}
let row = make_row(fields);
Field::Group(row)
},
Reader::RepeatedReader(_, def_level, rep_level, ref mut reader) => {
let mut elements = Vec::new();
loop {
if reader.current_def_level() > def_level {
elements.push(reader.read_field());
} else {
reader.advance_columns();
break;
}
if !reader.has_next() || reader.current_rep_level() <= rep_level {
break;
}
}
Field::ListInternal(make_list(elements))
},
Reader::KeyValueReader(_, def_level, rep_level,
ref mut keys, ref mut values) => {
let mut pairs = Vec::new();
loop {
if keys.current_def_level() > def_level {
pairs.push((keys.read_field(), values.read_field()));
} else {
keys.advance_columns();
values.advance_columns();
break;
}
if !keys.has_next() || keys.current_rep_level() <= rep_level {
break;
}
}
Field::MapInternal(make_map(pairs))
}
}
}
fn field_name(&self) -> &str {
match *self {
Reader::PrimitiveReader(ref field, _) => field.name(),
Reader::OptionReader(_, ref reader) => reader.field_name(),
Reader::GroupReader(ref opt, _, _) => match opt {
&Some(ref field) => field.name(),
&None => panic!("Field is None for group reader")
},
Reader::RepeatedReader(ref field, _, _, _) => field.name(),
Reader::KeyValueReader(ref field, _, _, _, _) => field.name()
}
}
fn repetition(&self) -> Repetition {
match *self {
Reader::PrimitiveReader(ref field, _) => {
field.get_basic_info().repetition()
},
Reader::OptionReader(_, ref reader) => {
reader.repetition()
},
Reader::GroupReader(ref opt, _, _) => match opt {
&Some(ref field) => field.get_basic_info().repetition(),
&None => panic!("Field is None for group reader")
},
Reader::RepeatedReader(ref field, _, _, _) => {
field.get_basic_info().repetition()
},
Reader::KeyValueReader(ref field, _, _, _, _) => {
field.get_basic_info().repetition()
}
}
}
fn has_next(&self) -> bool {
match *self {
Reader::PrimitiveReader(_, ref column) => column.has_next(),
Reader::OptionReader(_, ref reader) => reader.has_next(),
Reader::GroupReader(_, _, ref readers) => readers.first().unwrap().has_next(),
Reader::RepeatedReader(_, _, _, ref reader) => reader.has_next(),
Reader::KeyValueReader(_, _, _, ref keys, _) => keys.has_next()
}
}
fn current_def_level(&self) -> i16 {
match *self {
Reader::PrimitiveReader(_, ref column) => column.current_def_level(),
Reader::OptionReader(_, ref reader) => reader.current_def_level(),
Reader::GroupReader(_, _, ref readers) => match readers.first() {
Some(reader) => reader.current_def_level(),
None => panic!("Current definition level: empty group reader")
},
Reader::RepeatedReader(_, _, _, ref reader) => reader.current_def_level(),
Reader::KeyValueReader(_, _, _, ref keys, _) => keys.current_def_level()
}
}
fn current_rep_level(&self) -> i16 {
match *self {
Reader::PrimitiveReader(_, ref column) => column.current_rep_level(),
Reader::OptionReader(_, ref reader) => reader.current_rep_level(),
Reader::GroupReader(_, _, ref readers) => match readers.first() {
Some(reader) => reader.current_rep_level(),
None => panic!("Current repetition level: empty group reader")
},
Reader::RepeatedReader(_, _, _, ref reader) => reader.current_rep_level(),
Reader::KeyValueReader(_, _, _, ref keys, _) => keys.current_rep_level()
}
}
fn advance_columns(&mut self) {
match *self {
Reader::PrimitiveReader(_, ref mut column) => {
column.read_next().unwrap();
},
Reader::OptionReader(_, ref mut reader) => {
reader.advance_columns();
},
Reader::GroupReader(_, _, ref mut readers) => {
for reader in readers {
reader.advance_columns();
}
},
Reader::RepeatedReader(_, _, _, ref mut reader) => {
reader.advance_columns();
},
Reader::KeyValueReader(_, _, _, ref mut keys, ref mut values) => {
keys.advance_columns();
values.advance_columns();
}
}
}
}
impl fmt::Display for Reader {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let s =
match self {
Reader::PrimitiveReader(_, _) => "PrimitiveReader",
Reader::OptionReader(_, _) => "OptionReader",
Reader::GroupReader(_, _, _) => "GroupReader",
Reader::RepeatedReader(_, _, _, _) => "RepeatedReader",
Reader::KeyValueReader(_, _, _, _, _) => "KeyValueReader"
};
write!(f, "{}", s)
}
}
pub struct RowIter<'a> {
descr: SchemaDescPtr,
tree_builder: TreeBuilder,
file_reader: Option<&'a FileReader>,
current_row_group: usize,
num_row_groups: usize,
row_iter: Option<ReaderIter>
}
impl<'a> RowIter<'a> {
pub fn from_file(proj: Option<Type>, reader: &'a FileReader) -> Result<Self> {
let descr = Self::get_proj_descr(proj,
reader.metadata().file_metadata().schema_descr_ptr())?;
let num_row_groups = reader.num_row_groups();
Ok(Self {
descr: descr,
tree_builder: Self::tree_builder(),
file_reader: Some(reader),
current_row_group: 0,
num_row_groups: num_row_groups,
row_iter: None
})
}
pub fn from_row_group(proj: Option<Type>, reader: &'a RowGroupReader) -> Result<Self> {
let descr = Self::get_proj_descr(proj, reader.metadata().schema_descr_ptr())?;
let tree_builder = Self::tree_builder();
let row_iter = tree_builder.as_iter(descr.clone(), reader);
Ok(Self {
descr: descr,
tree_builder: tree_builder,
file_reader: None,
current_row_group: 0,
num_row_groups: 0,
row_iter: Some(row_iter)
})
}
#[inline]
fn tree_builder() -> TreeBuilder {
TreeBuilder::new()
}
#[inline]
fn get_proj_descr(
proj: Option<Type>,
root_descr: SchemaDescPtr
) -> Result<SchemaDescPtr> {
match proj {
Some(projection) => {
let root_schema = root_descr.root_schema();
if !root_schema.check_contains(&projection) {
return Err(general_err!("Root schema does not contain projection"));
}
Ok(Rc::new(SchemaDescriptor::new(Rc::new(projection))))
},
None => {
Ok(root_descr)
}
}
}
}
impl<'a> Iterator for RowIter<'a> {
type Item = Row;
fn next(&mut self) -> Option<Row> {
let mut row = None;
if let Some(ref mut iter) = self.row_iter {
row = iter.next();
}
while row.is_none() && self.current_row_group < self.num_row_groups {
let row_group_reader = &*self.file_reader
.as_ref()
.expect("File reader is required to advance row group")
.get_row_group(self.current_row_group).unwrap();
self.current_row_group += 1;
let mut iter = self.tree_builder.as_iter(self.descr.clone(), row_group_reader);
row = iter.next();
self.row_iter = Some(iter);
}
row
}
}
pub struct ReaderIter {
root_reader: Reader,
records_left: usize
}
impl ReaderIter {
fn new(mut root_reader: Reader, num_records: usize) -> Self {
root_reader.advance_columns();
Self {
root_reader: root_reader,
records_left: num_records
}
}
}
impl Iterator for ReaderIter {
type Item = Row;
fn next(&mut self) -> Option<Row> {
if self.records_left > 0 {
self.records_left -= 1;
Some(self.root_reader.read())
} else {
None
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use errors::{ParquetError, Result};
use file::reader::{FileReader, SerializedFileReader};
use record::api::{Row, Field};
use schema::parser::parse_message_type;
use util::test_common::get_test_file;
macro_rules! row {
( $( $e:expr ), * ) => {
{
let mut result = Vec::new();
$(
result.push($e);
)*
make_row(result)
}
}
}
macro_rules! list {
( $( $e:expr ), * ) => {
{
let mut result = Vec::new();
$(
result.push($e);
)*
Field::ListInternal(make_list(result))
}
}
}
macro_rules! map {
( $( $e:expr ), * ) => {
{
let mut result = Vec::new();
$(
result.push($e);
)*
Field::MapInternal(make_map(result))
}
}
}
macro_rules! group {
( $( $e:expr ), * ) => {
{
Field::Group(row!($( $e ), *))
}
}
}
#[test]
fn test_file_reader_rows_nulls() {
let rows = test_file_reader_rows("nulls.snappy.parquet", None).unwrap();
let expected_rows = vec![
row![
("b_struct".to_string(),
group![("b_c_int".to_string(), Field::Null)])
],
row![
("b_struct".to_string(),
group![("b_c_int".to_string(), Field::Null)])
],
row![
("b_struct".to_string(),
group![("b_c_int".to_string(), Field::Null)])
],
row![
("b_struct".to_string(),
group![("b_c_int".to_string(), Field::Null)])
],
row![
("b_struct".to_string(),
group![("b_c_int".to_string(), Field::Null)])
],
row![
("b_struct".to_string(),
group![("b_c_int".to_string(), Field::Null)])
],
row![
("b_struct".to_string(),
group![("b_c_int".to_string(), Field::Null)])
],
row![
("b_struct".to_string(),
group![("b_c_int".to_string(), Field::Null)])
]
];
assert_eq!(rows, expected_rows);
}
#[test]
fn test_file_reader_rows_nonnullable() {
let rows = test_file_reader_rows("nonnullable.impala.parquet", None).unwrap();
let expected_rows = vec![
row![
("ID".to_string(), Field::Long(8)),
("Int_Array".to_string(), list![Field::Int(-1)]),
("int_array_array".to_string(), list![
list![Field::Int(-1), Field::Int(-2)],
list![]
]),
("Int_Map".to_string(), map![(Field::Str("k1".to_string()), Field::Int(-1))]),
("int_map_array".to_string(), list![
map![],
map![(Field::Str("k1".to_string()), Field::Int(1))],
map![],
map![]
]),
("nested_Struct".to_string(), group![
("a".to_string(), Field::Int(-1)),
("B".to_string(), list![
Field::Int(-1)
]),
("c".to_string(), group![
("D".to_string(), list![
list![
group![
("e".to_string(), Field::Int(-1)),
("f".to_string(), Field::Str("nonnullable".to_string()))
]
]
])
]),
("G".to_string(), map![])
])
]
];
assert_eq!(rows, expected_rows);
}
#[test]
fn test_file_reader_rows_nullable() {
let rows = test_file_reader_rows("nullable.impala.parquet", None).unwrap();
let expected_rows = vec![
row![
("id".to_string(), Field::Long(1)),
("int_array".to_string(), list![Field::Int(1), Field::Int(2), Field::Int(3)]),
("int_array_Array".to_string(), list![
list![Field::Int(1), Field::Int(2)],
list![Field::Int(3), Field::Int(4)]
]),
("int_map".to_string(), map![
(Field::Str("k1".to_string()), Field::Int(1)),
(Field::Str("k2".to_string()), Field::Int(100))
]),
("int_Map_Array".to_string(), list![
map![(Field::Str("k1".to_string()), Field::Int(1))]
]),
("nested_struct".to_string(), group![
("A".to_string(), Field::Int(1)),
("b".to_string(), list![Field::Int(1)]),
("C".to_string(), group![
("d".to_string(), list![
list![
group![
("E".to_string(), Field::Int(10)),
("F".to_string(), Field::Str("aaa".to_string()))
],
group![
("E".to_string(), Field::Int(-10)),
("F".to_string(), Field::Str("bbb".to_string()))
]
],
list![
group![
("E".to_string(), Field::Int(11)),
("F".to_string(), Field::Str("c".to_string()))
]
]
])
]),
("g".to_string(), map![
(Field::Str("foo".to_string()), group![
("H".to_string(), group![
("i".to_string(), list![Field::Double(1.1)])
])
])
])
])
],
row![
("id".to_string(), Field::Long(2)),
("int_array".to_string(), list![
Field::Null,
Field::Int(1),
Field::Int(2),
Field::Null,
Field::Int(3),
Field::Null
]),
("int_array_Array".to_string(), list![
list![
Field::Null,
Field::Int(1),
Field::Int(2),
Field::Null
],
list![
Field::Int(3),
Field::Null,
Field::Int(4)
],
list![],
Field::Null
]),
("int_map".to_string(), map![
(Field::Str("k1".to_string()), Field::Int(2)),
(Field::Str("k2".to_string()), Field::Null)
]),
("int_Map_Array".to_string(), list![
map![
(Field::Str("k3".to_string()), Field::Null),
(Field::Str("k1".to_string()), Field::Int(1))
],
Field::Null,
map![]
]),
("nested_struct".to_string(), group![
("A".to_string(), Field::Null),
("b".to_string(), list![
Field::Null
]),
("C".to_string(), group![
("d".to_string(), list![
list![
group![
("E".to_string(), Field::Null),
("F".to_string(), Field::Null)
],
group![
("E".to_string(), Field::Int(10)),
("F".to_string(), Field::Str("aaa".to_string()))
],
group![
("E".to_string(), Field::Null),
("F".to_string(), Field::Null)
],
group![
("E".to_string(), Field::Int(-10)),
("F".to_string(), Field::Str("bbb".to_string()))
],
group![
("E".to_string(), Field::Null),
("F".to_string(), Field::Null)
]
],
list![
group![
("E".to_string(), Field::Int(11)),
("F".to_string(), Field::Str("c".to_string()))
],
Field::Null
],
list![],
Field::Null
])
]),
("g".to_string(), map![
(Field::Str("g1".to_string()), group![
("H".to_string(), group![
("i".to_string(), list![
Field::Double(2.2),
Field::Null
])
])
]),
(Field::Str("g2".to_string()), group![
("H".to_string(), group![
("i".to_string(), list![])
])
]),
(Field::Str("g3".to_string()), Field::Null),
(Field::Str("g4".to_string()), group![
("H".to_string(), group![
("i".to_string(), Field::Null)
])
]),
(Field::Str("g5".to_string()), group![
("H".to_string(), Field::Null)
])
])
])
],
row![
("id".to_string(), Field::Long(3)),
("int_array".to_string(), list![]),
("int_array_Array".to_string(), list![Field::Null]),
("int_map".to_string(), map![]),
("int_Map_Array".to_string(), list![Field::Null, Field::Null]),
("nested_struct".to_string(), group![
("A".to_string(), Field::Null),
("b".to_string(), Field::Null),
("C".to_string(), group![("d".to_string(), list![])]),
("g".to_string(), map![])
])
],
row![
("id".to_string(), Field::Long(4)),
("int_array".to_string(), Field::Null),
("int_array_Array".to_string(), list![]),
("int_map".to_string(), map![]),
("int_Map_Array".to_string(), list![]),
("nested_struct".to_string(), group![
("A".to_string(), Field::Null),
("b".to_string(), Field::Null),
("C".to_string(), group![
("d".to_string(), Field::Null)
]),
("g".to_string(), Field::Null)
])
],
row![
("id".to_string(), Field::Long(5)),
("int_array".to_string(), Field::Null),
("int_array_Array".to_string(), Field::Null),
("int_map".to_string(), map![]),
("int_Map_Array".to_string(), Field::Null),
("nested_struct".to_string(), group![
("A".to_string(), Field::Null),
("b".to_string(), Field::Null),
("C".to_string(), Field::Null),
("g".to_string(), map![
(Field::Str("foo".to_string()), group![
("H".to_string(), group![
("i".to_string(), list![
Field::Double(2.2),
Field::Double(3.3)
])
])
])
])
])
],
row![
("id".to_string(), Field::Long(6)),
("int_array".to_string(), Field::Null),
("int_array_Array".to_string(), Field::Null),
("int_map".to_string(), Field::Null),
("int_Map_Array".to_string(), Field::Null),
("nested_struct".to_string(), Field::Null)
],
row![
("id".to_string(), Field::Long(7)),
("int_array".to_string(), Field::Null),
("int_array_Array".to_string(), list![
Field::Null,
list![
Field::Int(5),
Field::Int(6)
]
]),
("int_map".to_string(), map![
(Field::Str("k1".to_string()), Field::Null),
(Field::Str("k3".to_string()), Field::Null)
]),
("int_Map_Array".to_string(), Field::Null),
("nested_struct".to_string(), group![
("A".to_string(), Field::Int(7)),
("b".to_string(), list![
Field::Int(2),
Field::Int(3), Field::Null
]),
("C".to_string(), group![
("d".to_string(), list![
list![],
list![Field::Null],
Field::Null
])
]),
("g".to_string(), Field::Null)
])
]
];
assert_eq!(rows, expected_rows);
}
#[test]
fn test_file_reader_rows_projection() {
let schema = "
message spark_schema {
REQUIRED DOUBLE c;
REQUIRED INT32 b;
}
";
let schema = parse_message_type(&schema).unwrap();
let rows =
test_file_reader_rows("nested_maps.snappy.parquet", Some(schema)).unwrap();
let expected_rows = vec![
row![
("c".to_string(), Field::Double(1.0)),
("b".to_string(), Field::Int(1))
],
row![
("c".to_string(), Field::Double(1.0)),
("b".to_string(), Field::Int(1))
],
row![
("c".to_string(), Field::Double(1.0)),
("b".to_string(), Field::Int(1))
],
row![
("c".to_string(), Field::Double(1.0)),
("b".to_string(), Field::Int(1))
],
row![
("c".to_string(), Field::Double(1.0)),
("b".to_string(), Field::Int(1))
],
row![
("c".to_string(), Field::Double(1.0)),
("b".to_string(), Field::Int(1))
]
];
assert_eq!(rows, expected_rows);
}
#[test]
fn test_file_reader_rows_projection_map() {
let schema = "
message spark_schema {
OPTIONAL group a (MAP) {
REPEATED group key_value {
REQUIRED BYTE_ARRAY key (UTF8);
OPTIONAL group value (MAP) {
REPEATED group key_value {
REQUIRED INT32 key;
REQUIRED BOOLEAN value;
}
}
}
}
}
";
let schema = parse_message_type(&schema).unwrap();
let rows =
test_file_reader_rows("nested_maps.snappy.parquet", Some(schema)).unwrap();
let expected_rows = vec![
row![
("a".to_string(), map![
(Field::Str("a".to_string()), map![
(Field::Int(1), Field::Bool(true)),
(Field::Int(2), Field::Bool(false))
])
])
],
row![
("a".to_string(), map![
(Field::Str("b".to_string()), map![
(Field::Int(1), Field::Bool(true))
])
])
],
row![
("a".to_string(), map![
(Field::Str("c".to_string()), Field::Null)
])
],
row![
("a".to_string(), map![
(Field::Str("d".to_string()), map![])
])
],
row![
("a".to_string(), map![
(Field::Str("e".to_string()), map![
(Field::Int(1), Field::Bool(true))
])
])
],
row![
("a".to_string(), map![
(Field::Str("f".to_string()), map![
(Field::Int(3), Field::Bool(true)),
(Field::Int(4), Field::Bool(false)),
(Field::Int(5), Field::Bool(true))
])
])
]
];
assert_eq!(rows, expected_rows);
}
#[test]
fn test_file_reader_rows_projection_list() {
let schema = "
message spark_schema {
OPTIONAL group a (LIST) {
REPEATED group list {
OPTIONAL group element (LIST) {
REPEATED group list {
OPTIONAL group element (LIST) {
REPEATED group list {
OPTIONAL BYTE_ARRAY element (UTF8);
}
}
}
}
}
}
}
";
let schema = parse_message_type(&schema).unwrap();
let rows =
test_file_reader_rows("nested_lists.snappy.parquet", Some(schema)).unwrap();
let expected_rows = vec![
row![
("a".to_string(), list![
list![
list![
Field::Str("a".to_string()),
Field::Str("b".to_string())
],
list![
Field::Str("c".to_string())
]
],
list![
Field::Null,
list![
Field::Str("d".to_string())
]
]
])
],
row![
("a".to_string(), list![
list![
list![
Field::Str("a".to_string()),
Field::Str("b".to_string())
],
list![
Field::Str("c".to_string()),
Field::Str("d".to_string())
]
],
list![
Field::Null,
list![
Field::Str("e".to_string())
]
]
])
],
row![
("a".to_string(), list![
list![
list![
Field::Str("a".to_string()),
Field::Str("b".to_string())
],
list![
Field::Str("c".to_string()),
Field::Str("d".to_string())
],
list![
Field::Str("e".to_string())
]
],
list![
Field::Null,
list![
Field::Str("f".to_string())
]
]
])
]
];
assert_eq!(rows, expected_rows);
}
#[test]
fn test_file_reader_rows_invalid_projection() {
let schema = "
message spark_schema {
REQUIRED INT32 key;
REQUIRED BOOLEAN value;
}
";
let schema = parse_message_type(&schema).unwrap();
let res = test_file_reader_rows("nested_maps.snappy.parquet", Some(schema));
assert!(res.is_err());
assert_eq!(
res.unwrap_err(),
general_err!("Root schema does not contain projection")
);
}
#[test]
fn test_row_group_rows_invalid_projection() {
let schema = "
message spark_schema {
REQUIRED INT32 key;
REQUIRED BOOLEAN value;
}
";
let schema = parse_message_type(&schema).unwrap();
let res = test_row_group_rows("nested_maps.snappy.parquet", Some(schema));
assert!(res.is_err());
assert_eq!(
res.unwrap_err(),
general_err!("Root schema does not contain projection")
);
}
#[test]
#[should_panic(expected = "Invalid map type")]
fn test_file_reader_rows_invalid_map_type() {
let schema = "
message spark_schema {
OPTIONAL group a (MAP) {
REPEATED group key_value {
REQUIRED BYTE_ARRAY key (UTF8);
OPTIONAL group value (MAP) {
REPEATED group key_value {
REQUIRED INT32 key;
}
}
}
}
}
";
let schema = parse_message_type(&schema).unwrap();
test_file_reader_rows("nested_maps.snappy.parquet", Some(schema)).unwrap();
}
#[test]
fn test_tree_reader_handle_repeated_fields_with_no_annotation() {
let rows = test_file_reader_rows("repeated_no_annotation.parquet", None).unwrap();
let expected_rows = vec![
row![
("id".to_string(), Field::Int(1)),
("phoneNumbers".to_string(), Field::Null)
],
row![
("id".to_string(), Field::Int(2)),
("phoneNumbers".to_string(), Field::Null)
],
row![
("id".to_string(), Field::Int(3)),
("phoneNumbers".to_string(), group![
("phone".to_string(), list![])
])
],
row![
("id".to_string(), Field::Int(4)),
("phoneNumbers".to_string(), group![
("phone".to_string(), list![
group![
("number".to_string(), Field::Long(5555555555)),
("kind".to_string(), Field::Null)
]
])
])
],
row![
("id".to_string(), Field::Int(5)),
("phoneNumbers".to_string(), group![
("phone".to_string(), list![
group![
("number".to_string(), Field::Long(1111111111)),
("kind".to_string(), Field::Str("home".to_string()))
]
])
])
],
row![
("id".to_string(), Field::Int(6)),
("phoneNumbers".to_string(), group![
("phone".to_string(), list![
group![
("number".to_string(), Field::Long(1111111111)),
("kind".to_string(), Field::Str("home".to_string()))
],
group![
("number".to_string(), Field::Long(2222222222)),
("kind".to_string(), Field::Null)
],
group![
("number".to_string(), Field::Long(3333333333)),
("kind".to_string(), Field::Str("mobile".to_string()))
]
])
])
]
];
assert_eq!(rows, expected_rows);
}
fn test_file_reader_rows(file_name: &str, schema: Option<Type>) -> Result<Vec<Row>> {
let file = get_test_file(file_name);
let file_reader: Box<FileReader> = Box::new(SerializedFileReader::new(file)?);
let iter = file_reader.get_row_iter(schema)?;
Ok(iter.collect())
}
fn test_row_group_rows(file_name: &str, schema: Option<Type>) -> Result<Vec<Row>> {
let file = get_test_file(file_name);
let file_reader: Box<FileReader> = Box::new(SerializedFileReader::new(file)?);
let row_group_reader = file_reader.get_row_group(0).unwrap();
let iter = row_group_reader.get_row_iter(schema)?;
Ok(iter.collect())
}
}