use std::rc::Rc;
use super::statistics::{self, Statistics};
use basic::{ColumnOrder, Compression, Encoding, Type};
use errors::{ParquetError, Result};
use parquet_format::{ColumnChunk, ColumnMetaData, RowGroup};
use schema::types::{
ColumnDescPtr, ColumnDescriptor, ColumnPath, SchemaDescPtr, SchemaDescriptor,
Type as SchemaType, TypePtr,
};
pub type ParquetMetaDataPtr = Rc<ParquetMetaData>;
pub struct ParquetMetaData {
file_metadata: FileMetaDataPtr,
row_groups: Vec<RowGroupMetaDataPtr>,
}
impl ParquetMetaData {
pub fn new(
file_metadata: FileMetaData,
row_group_ptrs: Vec<RowGroupMetaDataPtr>,
) -> Self
{
ParquetMetaData {
file_metadata: Rc::new(file_metadata),
row_groups: row_group_ptrs,
}
}
pub fn file_metadata(&self) -> FileMetaDataPtr { self.file_metadata.clone() }
pub fn num_row_groups(&self) -> usize { self.row_groups.len() }
pub fn row_group(&self, i: usize) -> RowGroupMetaDataPtr { self.row_groups[i].clone() }
pub fn row_groups(&self) -> &[RowGroupMetaDataPtr] { &self.row_groups.as_slice() }
}
pub type FileMetaDataPtr = Rc<FileMetaData>;
pub struct FileMetaData {
version: i32,
num_rows: i64,
created_by: Option<String>,
schema: TypePtr,
schema_descr: SchemaDescPtr,
column_orders: Option<Vec<ColumnOrder>>,
}
impl FileMetaData {
pub fn new(
version: i32,
num_rows: i64,
created_by: Option<String>,
schema: TypePtr,
schema_descr: SchemaDescPtr,
column_orders: Option<Vec<ColumnOrder>>,
) -> Self
{
FileMetaData {
version,
num_rows,
created_by,
schema,
schema_descr,
column_orders,
}
}
pub fn version(&self) -> i32 { self.version }
pub fn num_rows(&self) -> i64 { self.num_rows }
pub fn created_by(&self) -> &Option<String> { &self.created_by }
pub fn schema(&self) -> &SchemaType { self.schema.as_ref() }
pub fn schema_descr(&self) -> &SchemaDescriptor { &self.schema_descr }
pub fn schema_descr_ptr(&self) -> SchemaDescPtr { self.schema_descr.clone() }
pub fn column_orders(&self) -> Option<&Vec<ColumnOrder>> { self.column_orders.as_ref() }
pub fn column_order(&self, i: usize) -> ColumnOrder {
self
.column_orders
.as_ref()
.map(|data| data[i])
.unwrap_or(ColumnOrder::UNDEFINED)
}
}
pub type RowGroupMetaDataPtr = Rc<RowGroupMetaData>;
pub struct RowGroupMetaData {
columns: Vec<ColumnChunkMetaDataPtr>,
num_rows: i64,
total_byte_size: i64,
schema_descr: SchemaDescPtr,
}
impl RowGroupMetaData {
pub fn builder(schema_descr: SchemaDescPtr) -> RowGroupMetaDataBuilder {
RowGroupMetaDataBuilder::new(schema_descr)
}
pub fn num_columns(&self) -> usize { self.columns.len() }
pub fn column(&self, i: usize) -> &ColumnChunkMetaData { &self.columns[i] }
pub fn columns(&self) -> &[ColumnChunkMetaDataPtr] { &self.columns }
pub fn num_rows(&self) -> i64 { self.num_rows }
pub fn total_byte_size(&self) -> i64 { self.total_byte_size }
pub fn schema_descr(&self) -> &SchemaDescriptor { self.schema_descr.as_ref() }
pub fn schema_descr_ptr(&self) -> SchemaDescPtr { self.schema_descr.clone() }
pub fn from_thrift(
schema_descr: SchemaDescPtr,
mut rg: RowGroup,
) -> Result<RowGroupMetaData>
{
assert_eq!(schema_descr.num_columns(), rg.columns.len());
let total_byte_size = rg.total_byte_size;
let num_rows = rg.num_rows;
let mut columns = vec![];
for (c, d) in rg.columns.drain(0..).zip(schema_descr.columns()) {
let cc = ColumnChunkMetaData::from_thrift(d.clone(), c)?;
columns.push(Rc::new(cc));
}
Ok(RowGroupMetaData {
columns,
num_rows,
total_byte_size,
schema_descr,
})
}
pub fn to_thrift(&self) -> RowGroup {
RowGroup {
columns: self.columns().into_iter().map(|v| v.to_thrift()).collect(),
total_byte_size: self.total_byte_size,
num_rows: self.num_rows,
sorting_columns: None,
}
}
}
pub struct RowGroupMetaDataBuilder {
columns: Vec<ColumnChunkMetaDataPtr>,
schema_descr: SchemaDescPtr,
num_rows: i64,
total_byte_size: i64,
}
impl RowGroupMetaDataBuilder {
fn new(schema_descr: SchemaDescPtr) -> Self {
Self {
columns: Vec::with_capacity(schema_descr.num_columns()),
schema_descr,
num_rows: 0,
total_byte_size: 0,
}
}
pub fn set_num_rows(mut self, value: i64) -> Self {
self.num_rows = value;
self
}
pub fn set_total_byte_size(mut self, value: i64) -> Self {
self.total_byte_size = value;
self
}
pub fn set_column_metadata(mut self, value: Vec<ColumnChunkMetaDataPtr>) -> Self {
self.columns = value;
self
}
pub fn build(self) -> Result<RowGroupMetaData> {
if self.schema_descr.num_columns() != self.columns.len() {
return Err(general_err!(
"Column length mismatch: {} != {}",
self.schema_descr.num_columns(),
self.columns.len()
));
}
Ok(RowGroupMetaData {
columns: self.columns,
num_rows: self.num_rows,
total_byte_size: self.total_byte_size,
schema_descr: self.schema_descr,
})
}
}
pub type ColumnChunkMetaDataPtr = Rc<ColumnChunkMetaData>;
pub struct ColumnChunkMetaData {
column_type: Type,
column_path: ColumnPath,
column_descr: ColumnDescPtr,
encodings: Vec<Encoding>,
file_path: Option<String>,
file_offset: i64,
num_values: i64,
compression: Compression,
total_compressed_size: i64,
total_uncompressed_size: i64,
data_page_offset: i64,
index_page_offset: Option<i64>,
dictionary_page_offset: Option<i64>,
statistics: Option<Statistics>,
}
impl ColumnChunkMetaData {
pub fn builder(column_descr: ColumnDescPtr) -> ColumnChunkMetaDataBuilder {
ColumnChunkMetaDataBuilder::new(column_descr)
}
pub fn file_path(&self) -> Option<&String> { self.file_path.as_ref() }
pub fn file_offset(&self) -> i64 { self.file_offset }
pub fn column_type(&self) -> Type { self.column_type }
pub fn column_path(&self) -> &ColumnPath { &self.column_path }
pub fn column_descr(&self) -> &ColumnDescriptor { self.column_descr.as_ref() }
pub fn column_descr_ptr(&self) -> ColumnDescPtr { self.column_descr.clone() }
pub fn encodings(&self) -> &Vec<Encoding> { &self.encodings }
pub fn num_values(&self) -> i64 { self.num_values }
pub fn compression(&self) -> Compression { self.compression }
pub fn compressed_size(&self) -> i64 { self.total_compressed_size }
pub fn uncompressed_size(&self) -> i64 { self.total_uncompressed_size }
pub fn data_page_offset(&self) -> i64 { self.data_page_offset }
pub fn has_index_page(&self) -> bool { self.index_page_offset.is_some() }
pub fn index_page_offset(&self) -> Option<i64> { self.index_page_offset }
pub fn has_dictionary_page(&self) -> bool { self.dictionary_page_offset.is_some() }
pub fn dictionary_page_offset(&self) -> Option<i64> { self.dictionary_page_offset }
pub fn statistics(&self) -> Option<&Statistics> { self.statistics.as_ref() }
pub fn from_thrift(column_descr: ColumnDescPtr, cc: ColumnChunk) -> Result<Self> {
if cc.meta_data.is_none() {
return Err(general_err!("Expected to have column metadata"));
}
let mut col_metadata: ColumnMetaData = cc.meta_data.unwrap();
let column_type = Type::from(col_metadata.type_);
let column_path = ColumnPath::new(col_metadata.path_in_schema);
let encodings = col_metadata
.encodings
.drain(0..)
.map(Encoding::from)
.collect();
let compression = Compression::from(col_metadata.codec);
let file_path = cc.file_path;
let file_offset = cc.file_offset;
let num_values = col_metadata.num_values;
let total_compressed_size = col_metadata.total_compressed_size;
let total_uncompressed_size = col_metadata.total_uncompressed_size;
let data_page_offset = col_metadata.data_page_offset;
let index_page_offset = col_metadata.index_page_offset;
let dictionary_page_offset = col_metadata.dictionary_page_offset;
let statistics = statistics::from_thrift(column_type, col_metadata.statistics);
let result = ColumnChunkMetaData {
column_type,
column_path,
column_descr,
encodings,
file_path,
file_offset,
num_values,
compression,
total_compressed_size,
total_uncompressed_size,
data_page_offset,
index_page_offset,
dictionary_page_offset,
statistics,
};
Ok(result)
}
pub fn to_thrift(&self) -> ColumnChunk {
let column_metadata = ColumnMetaData {
type_: self.column_type.into(),
encodings: self.encodings().into_iter().map(|&v| v.into()).collect(),
path_in_schema: Vec::from(self.column_path.as_ref()),
codec: self.compression.into(),
num_values: self.num_values,
total_uncompressed_size: self.total_uncompressed_size,
total_compressed_size: self.total_compressed_size,
key_value_metadata: None,
data_page_offset: self.data_page_offset,
index_page_offset: self.index_page_offset,
dictionary_page_offset: self.dictionary_page_offset,
statistics: statistics::to_thrift(self.statistics.as_ref()),
encoding_stats: None,
};
ColumnChunk {
file_path: self.file_path().map(|v| v.clone()),
file_offset: self.file_offset,
meta_data: Some(column_metadata),
offset_index_offset: None,
offset_index_length: None,
column_index_offset: None,
column_index_length: None,
}
}
}
pub struct ColumnChunkMetaDataBuilder {
column_descr: ColumnDescPtr,
encodings: Vec<Encoding>,
file_path: Option<String>,
file_offset: i64,
num_values: i64,
compression: Compression,
total_compressed_size: i64,
total_uncompressed_size: i64,
data_page_offset: i64,
index_page_offset: Option<i64>,
dictionary_page_offset: Option<i64>,
statistics: Option<Statistics>,
}
impl ColumnChunkMetaDataBuilder {
fn new(column_descr: ColumnDescPtr) -> Self {
Self {
column_descr,
encodings: Vec::new(),
file_path: None,
file_offset: 0,
num_values: 0,
compression: Compression::UNCOMPRESSED,
total_compressed_size: 0,
total_uncompressed_size: 0,
data_page_offset: 0,
index_page_offset: None,
dictionary_page_offset: None,
statistics: None,
}
}
pub fn set_encodings(mut self, encodings: Vec<Encoding>) -> Self {
self.encodings = encodings;
self
}
pub fn set_file_path(mut self, value: String) -> Self {
self.file_path = Some(value);
self
}
pub fn set_file_offset(mut self, value: i64) -> Self {
self.file_offset = value;
self
}
pub fn set_num_values(mut self, value: i64) -> Self {
self.num_values = value;
self
}
pub fn set_compression(mut self, value: Compression) -> Self {
self.compression = value;
self
}
pub fn set_total_compressed_size(mut self, value: i64) -> Self {
self.total_compressed_size = value;
self
}
pub fn set_total_uncompressed_size(mut self, value: i64) -> Self {
self.total_uncompressed_size = value;
self
}
pub fn set_data_page_offset(mut self, value: i64) -> Self {
self.data_page_offset = value;
self
}
pub fn set_dictionary_page_offset(mut self, value: Option<i64>) -> Self {
self.dictionary_page_offset = value;
self
}
pub fn set_index_page_offset(mut self, value: Option<i64>) -> Self {
self.index_page_offset = value;
self
}
pub fn set_statistics(mut self, value: Statistics) -> Self {
self.statistics = Some(value);
self
}
pub fn build(self) -> Result<ColumnChunkMetaData> {
Ok(ColumnChunkMetaData {
column_type: self.column_descr.physical_type(),
column_path: self.column_descr.path().clone(),
column_descr: self.column_descr,
encodings: self.encodings,
file_path: self.file_path,
file_offset: self.file_offset,
num_values: self.num_values,
compression: self.compression,
total_compressed_size: self.total_compressed_size,
total_uncompressed_size: self.total_uncompressed_size,
data_page_offset: self.data_page_offset,
index_page_offset: self.index_page_offset,
dictionary_page_offset: self.dictionary_page_offset,
statistics: self.statistics,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_row_group_metadata_thrift_conversion() {
let schema_descr = get_test_schema_descr();
let mut columns = vec![];
for ptr in schema_descr.columns() {
let column = ColumnChunkMetaData::builder(ptr.clone()).build().unwrap();
columns.push(Rc::new(column));
}
let row_group_meta = RowGroupMetaData::builder(schema_descr.clone())
.set_num_rows(1000)
.set_total_byte_size(2000)
.set_column_metadata(columns)
.build()
.unwrap();
let row_group_exp = row_group_meta.to_thrift();
let row_group_res =
RowGroupMetaData::from_thrift(schema_descr.clone(), row_group_exp.clone())
.unwrap()
.to_thrift();
assert_eq!(row_group_res, row_group_exp);
}
#[test]
fn test_row_group_metadata_thrift_conversion_empty() {
let schema_descr = get_test_schema_descr();
let row_group_meta = RowGroupMetaData::builder(schema_descr.clone()).build();
assert!(row_group_meta.is_err());
if let Err(e) = row_group_meta {
assert_eq!(
e.to_string(),
"Parquet error: Column length mismatch: 2 != 0"
);
}
}
#[test]
fn test_column_chunk_metadata_thrift_conversion() {
let column_descr = get_test_schema_descr().column(0);
let col_metadata = ColumnChunkMetaData::builder(column_descr.clone())
.set_encodings(vec![Encoding::PLAIN, Encoding::RLE])
.set_file_path("file_path".to_owned())
.set_file_offset(100)
.set_num_values(1000)
.set_compression(Compression::SNAPPY)
.set_total_compressed_size(2000)
.set_total_uncompressed_size(3000)
.set_data_page_offset(4000)
.set_dictionary_page_offset(Some(5000))
.build()
.unwrap();
let col_chunk_exp = col_metadata.to_thrift();
let col_chunk_res =
ColumnChunkMetaData::from_thrift(column_descr.clone(), col_chunk_exp.clone())
.unwrap()
.to_thrift();
assert_eq!(col_chunk_res, col_chunk_exp);
}
#[test]
fn test_column_chunk_metadata_thrift_conversion_empty() {
let column_descr = get_test_schema_descr().column(0);
let col_metadata = ColumnChunkMetaData::builder(column_descr.clone())
.build()
.unwrap();
let col_chunk_exp = col_metadata.to_thrift();
let col_chunk_res =
ColumnChunkMetaData::from_thrift(column_descr.clone(), col_chunk_exp.clone())
.unwrap()
.to_thrift();
assert_eq!(col_chunk_res, col_chunk_exp);
}
fn get_test_schema_descr() -> SchemaDescPtr {
let schema = SchemaType::group_type_builder("schema")
.with_fields(&mut vec![
Rc::new(
SchemaType::primitive_type_builder("a", Type::INT32)
.build()
.unwrap(),
),
Rc::new(
SchemaType::primitive_type_builder("b", Type::INT32)
.build()
.unwrap(),
),
])
.build()
.unwrap();
Rc::new(SchemaDescriptor::new(Rc::new(schema)))
}
}