use std::collections::HashMap;
use std::rc::Rc;
use basic::{Compression, Encoding};
use schema::types::ColumnPath;
const DEFAULT_PAGE_SIZE: usize = 1024 * 1024;
const DEFAULT_WRITE_BATCH_SIZE: usize = 1024;
const DEFAULT_WRITER_VERSION: WriterVersion = WriterVersion::PARQUET_1_0;
const DEFAULT_ENCODING: Encoding = Encoding::PLAIN;
const DEFAULT_COMPRESSION: Compression = Compression::UNCOMPRESSED;
const DEFAULT_DICTIONARY_ENABLED: bool = true;
const DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT: usize = DEFAULT_PAGE_SIZE;
const DEFAULT_STATISTICS_ENABLED: bool = true;
const DEFAULT_MAX_STATISTICS_SIZE: usize = 4096;
const DEFAULT_MAX_ROW_GROUP_SIZE: usize = 128 * 1024 * 1024;
const DEFAULT_CREATED_BY: &str = env!("PARQUET_CREATED_BY");
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum WriterVersion {
PARQUET_1_0,
PARQUET_2_0
}
impl WriterVersion {
pub fn as_num(&self) -> i32 {
match self {
WriterVersion::PARQUET_1_0 => 1,
WriterVersion::PARQUET_2_0 => 2
}
}
}
pub type WriterPropertiesPtr = Rc<WriterProperties>;
#[derive(Debug, Clone)]
pub struct WriterProperties {
data_pagesize_limit: usize,
dictionary_pagesize_limit: usize,
write_batch_size: usize,
max_row_group_size: usize,
writer_version: WriterVersion,
created_by: String,
default_column_properties: ColumnProperties,
column_properties: HashMap<ColumnPath, ColumnProperties>
}
impl WriterProperties {
pub fn builder() -> WriterPropertiesBuilder {
WriterPropertiesBuilder::with_defaults()
}
pub fn data_pagesize_limit(&self) -> usize {
self.data_pagesize_limit
}
pub fn dictionary_pagesize_limit(&self) -> usize {
self.dictionary_pagesize_limit
}
pub fn write_batch_size(&self) -> usize {
self.write_batch_size
}
pub fn max_row_group_size(&self) -> usize {
self.max_row_group_size
}
pub fn writer_version(&self) -> WriterVersion {
self.writer_version
}
pub fn created_by(&self) -> &str {
&self.created_by
}
#[inline]
pub fn dictionary_data_page_encoding(&self) -> Encoding {
Encoding::RLE_DICTIONARY
}
#[inline]
pub fn dictionary_page_encoding(&self) -> Encoding {
Encoding::PLAIN
}
pub fn encoding(&self, col: &ColumnPath) -> Encoding {
self.column_properties.get(col)
.and_then(|c| c.encoding())
.or_else(|| self.default_column_properties.encoding())
.unwrap_or(DEFAULT_ENCODING)
}
pub fn compression(&self, col: &ColumnPath) -> Compression {
self.column_properties.get(col)
.and_then(|c| c.compression())
.or_else(|| self.default_column_properties.compression())
.unwrap_or(DEFAULT_COMPRESSION)
}
pub fn dictionary_enabled(&self, col: &ColumnPath) -> bool {
self.column_properties.get(col)
.and_then(|c| c.dictionary_enabled())
.or_else(|| self.default_column_properties.dictionary_enabled())
.unwrap_or(DEFAULT_DICTIONARY_ENABLED)
}
pub fn statistics_enabled(&self, col: &ColumnPath) -> bool {
self.column_properties.get(col)
.and_then(|c| c.statistics_enabled())
.or_else(|| self.default_column_properties.statistics_enabled())
.unwrap_or(DEFAULT_STATISTICS_ENABLED)
}
pub fn max_statistics_size(&self, col: &ColumnPath) -> usize {
self.column_properties.get(col)
.and_then(|c| c.max_statistics_size())
.or_else(|| self.default_column_properties.max_statistics_size())
.unwrap_or(DEFAULT_MAX_STATISTICS_SIZE)
}
}
pub struct WriterPropertiesBuilder {
data_pagesize_limit: usize,
dictionary_pagesize_limit: usize,
write_batch_size: usize,
max_row_group_size: usize,
writer_version: WriterVersion,
created_by: String,
default_column_properties: ColumnProperties,
column_properties: HashMap<ColumnPath, ColumnProperties>
}
impl WriterPropertiesBuilder {
fn with_defaults() -> Self {
Self {
data_pagesize_limit: DEFAULT_PAGE_SIZE,
dictionary_pagesize_limit: DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT,
write_batch_size: DEFAULT_WRITE_BATCH_SIZE,
max_row_group_size: DEFAULT_MAX_ROW_GROUP_SIZE,
writer_version: DEFAULT_WRITER_VERSION,
created_by: DEFAULT_CREATED_BY.to_string(),
default_column_properties: ColumnProperties::new(),
column_properties: HashMap::new()
}
}
pub fn build(self) -> WriterProperties {
WriterProperties {
data_pagesize_limit: self.data_pagesize_limit,
dictionary_pagesize_limit: self.dictionary_pagesize_limit,
write_batch_size: self.write_batch_size,
max_row_group_size: self.max_row_group_size,
writer_version: self.writer_version,
created_by: self.created_by,
default_column_properties: self.default_column_properties,
column_properties: self.column_properties
}
}
pub fn set_writer_version(mut self, value: WriterVersion) -> Self {
self.writer_version = value;
self
}
pub fn set_data_pagesize_limit(mut self, value: usize) -> Self {
self.data_pagesize_limit = value;
self
}
pub fn set_dictionary_pagesize_limit(mut self, value: usize) -> Self {
self.dictionary_pagesize_limit = value;
self
}
pub fn set_write_batch_size(mut self, value: usize) -> Self {
self.write_batch_size = value;
self
}
pub fn set_max_row_group_size(mut self, value: usize) -> Self {
self.max_row_group_size = value;
self
}
pub fn set_created_by(mut self, value: String) -> Self {
self.created_by = value;
self
}
pub fn set_encoding(mut self, value: Encoding) -> Self {
self.default_column_properties.set_encoding(value);
self
}
pub fn set_compression(mut self, value: Compression) -> Self {
self.default_column_properties.set_compression(value);
self
}
pub fn set_dictionary_enabled(mut self, value: bool) -> Self {
self.default_column_properties.set_dictionary_enabled(value);
self
}
pub fn set_statistics_enabled(mut self, value: bool) -> Self {
self.default_column_properties.set_statistics_enabled(value);
self
}
pub fn set_max_statistics_size(mut self, value: usize) -> Self {
self.default_column_properties.set_max_statistics_size(value);
self
}
#[inline]
fn get_mut_props(&mut self, col: ColumnPath) -> &mut ColumnProperties {
self.column_properties.entry(col).or_insert(ColumnProperties::new())
}
pub fn set_column_encoding(mut self, col: ColumnPath, value: Encoding) -> Self {
self.get_mut_props(col).set_encoding(value);
self
}
pub fn set_column_compression(mut self, col: ColumnPath, value: Compression) -> Self {
self.get_mut_props(col).set_compression(value);
self
}
pub fn set_column_dictionary_enabled(mut self, col: ColumnPath, value: bool) -> Self {
self.get_mut_props(col).set_dictionary_enabled(value);
self
}
pub fn set_column_statistics_enabled(mut self, col: ColumnPath, value: bool) -> Self {
self.get_mut_props(col).set_statistics_enabled(value);
self
}
pub fn set_column_max_statistics_size(mut self, col: ColumnPath, value: usize) -> Self {
self.get_mut_props(col).set_max_statistics_size(value);
self
}
}
#[derive(Debug, Clone, PartialEq)]
struct ColumnProperties {
encoding: Option<Encoding>,
codec: Option<Compression>,
dictionary_enabled: Option<bool>,
statistics_enabled: Option<bool>,
max_statistics_size: Option<usize>
}
impl ColumnProperties {
fn new() -> Self {
Self {
encoding: None,
codec: None,
dictionary_enabled: None,
statistics_enabled: None,
max_statistics_size: None
}
}
fn set_encoding(&mut self, value: Encoding) {
if value == Encoding::PLAIN_DICTIONARY || value == Encoding::RLE_DICTIONARY {
panic!("Dictionary encoding can not be used as fallback encoding");
}
self.encoding = Some(value);
}
fn set_compression(&mut self, value: Compression) {
self.codec = Some(value);
}
fn set_dictionary_enabled(&mut self, enabled: bool) {
self.dictionary_enabled = Some(enabled);
}
fn set_statistics_enabled(&mut self, enabled: bool) {
self.statistics_enabled = Some(enabled);
}
fn set_max_statistics_size(&mut self, value: usize) {
self.max_statistics_size = Some(value);
}
fn encoding(&self) -> Option<Encoding> {
self.encoding
}
fn compression(&self) -> Option<Compression> {
self.codec
}
fn dictionary_enabled(&self) -> Option<bool> {
self.dictionary_enabled
}
fn statistics_enabled(&self) -> Option<bool> {
self.statistics_enabled
}
fn max_statistics_size(&self) -> Option<usize> {
self.max_statistics_size
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_writer_version() {
assert_eq!(WriterVersion::PARQUET_1_0.as_num(), 1);
assert_eq!(WriterVersion::PARQUET_2_0.as_num(), 2);
}
#[test]
fn test_writer_properties_default_settings() {
let props = WriterProperties::builder().build();
assert_eq!(props.data_pagesize_limit(), DEFAULT_PAGE_SIZE);
assert_eq!(props.dictionary_pagesize_limit(), DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT);
assert_eq!(props.write_batch_size(), DEFAULT_WRITE_BATCH_SIZE);
assert_eq!(props.max_row_group_size(), DEFAULT_MAX_ROW_GROUP_SIZE);
assert_eq!(props.writer_version(), DEFAULT_WRITER_VERSION);
assert_eq!(props.created_by(), DEFAULT_CREATED_BY);
assert_eq!(props.encoding(&ColumnPath::from("col")), DEFAULT_ENCODING);
assert_eq!(props.compression(&ColumnPath::from("col")), DEFAULT_COMPRESSION);
assert_eq!(
props.dictionary_enabled(&ColumnPath::from("col")),
DEFAULT_DICTIONARY_ENABLED
);
assert_eq!(
props.statistics_enabled(&ColumnPath::from("col")),
DEFAULT_STATISTICS_ENABLED
);
assert_eq!(
props.max_statistics_size(&ColumnPath::from("col")),
DEFAULT_MAX_STATISTICS_SIZE
);
}
#[test]
fn test_writer_properties_dictionary_encoding() {
for version in vec![WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
let props = WriterProperties::builder().set_writer_version(version).build();
assert_eq!(props.dictionary_page_encoding(), Encoding::PLAIN);
assert_eq!(props.dictionary_data_page_encoding(), Encoding::RLE_DICTIONARY);
}
}
#[test]
#[should_panic(expected = "Dictionary encoding can not be used as fallback encoding")]
fn test_writer_properties_panic_when_plain_dictionary_is_fallback() {
WriterProperties::builder().set_encoding(Encoding::PLAIN_DICTIONARY).build();
}
#[test]
#[should_panic(expected = "Dictionary encoding can not be used as fallback encoding")]
fn test_writer_properties_panic_when_rle_dictionary_is_fallback() {
WriterProperties::builder().set_encoding(Encoding::RLE_DICTIONARY).build();
}
#[test]
#[should_panic(expected = "Dictionary encoding can not be used as fallback encoding")]
fn test_writer_properties_panic_when_dictionary_is_enabled() {
WriterProperties::builder()
.set_dictionary_enabled(true)
.set_column_encoding(ColumnPath::from("col"), Encoding::RLE_DICTIONARY)
.build();
}
#[test]
#[should_panic(expected = "Dictionary encoding can not be used as fallback encoding")]
fn test_writer_properties_panic_when_dictionary_is_disabled() {
WriterProperties::builder()
.set_dictionary_enabled(false)
.set_column_encoding(ColumnPath::from("col"), Encoding::RLE_DICTIONARY)
.build();
}
#[test]
fn test_writer_properties_builder() {
let props = WriterProperties::builder()
.set_writer_version(WriterVersion::PARQUET_2_0)
.set_data_pagesize_limit(10)
.set_dictionary_pagesize_limit(20)
.set_write_batch_size(30)
.set_max_row_group_size(40)
.set_created_by("default".to_owned())
.set_encoding(Encoding::DELTA_BINARY_PACKED)
.set_compression(Compression::GZIP)
.set_dictionary_enabled(false)
.set_statistics_enabled(false)
.set_max_statistics_size(50)
.set_column_encoding(ColumnPath::from("col"), Encoding::RLE)
.set_column_compression(ColumnPath::from("col"), Compression::SNAPPY)
.set_column_dictionary_enabled(ColumnPath::from("col"), true)
.set_column_statistics_enabled(ColumnPath::from("col"), true)
.set_column_max_statistics_size(ColumnPath::from("col"), 123)
.build();
assert_eq!(props.writer_version(), WriterVersion::PARQUET_2_0);
assert_eq!(props.data_pagesize_limit(), 10);
assert_eq!(props.dictionary_pagesize_limit(), 20);
assert_eq!(props.write_batch_size(), 30);
assert_eq!(props.max_row_group_size(), 40);
assert_eq!(props.created_by(), "default");
assert_eq!(props.encoding(&ColumnPath::from("a")), Encoding::DELTA_BINARY_PACKED);
assert_eq!(props.compression(&ColumnPath::from("a")), Compression::GZIP);
assert_eq!(props.dictionary_enabled(&ColumnPath::from("a")), false);
assert_eq!(props.statistics_enabled(&ColumnPath::from("a")), false);
assert_eq!(props.max_statistics_size(&ColumnPath::from("a")), 50);
assert_eq!(props.encoding(&ColumnPath::from("col")), Encoding::RLE);
assert_eq!(props.compression(&ColumnPath::from("col")), Compression::SNAPPY);
assert_eq!(props.dictionary_enabled(&ColumnPath::from("col")), true);
assert_eq!(props.statistics_enabled(&ColumnPath::from("col")), true);
assert_eq!(props.max_statistics_size(&ColumnPath::from("col")), 123);
}
#[test]
fn test_writer_properties_builder_partial_defaults() {
let props = WriterProperties::builder()
.set_encoding(Encoding::DELTA_BINARY_PACKED)
.set_compression(Compression::GZIP)
.set_column_encoding(ColumnPath::from("col"), Encoding::RLE)
.build();
assert_eq!(props.encoding(&ColumnPath::from("col")), Encoding::RLE);
assert_eq!(props.compression(&ColumnPath::from("col")), Compression::GZIP);
assert_eq!(
props.dictionary_enabled(&ColumnPath::from("col")),
DEFAULT_DICTIONARY_ENABLED
);
}
}