use std::cmp::{max, min};
use std::collections::HashMap;
use std::mem;
use super::page::{Page, PageReader};
use basic::*;
use data_type::*;
use encodings::decoding::{get_decoder, Decoder, PlainDecoder, DictDecoder};
use encodings::levels::LevelDecoder;
use errors::{Result, ParquetError};
use schema::types::ColumnDescPtr;
use util::memory::ByteBufferPtr;
pub enum ColumnReader {
BoolColumnReader(ColumnReaderImpl<BoolType>),
Int32ColumnReader(ColumnReaderImpl<Int32Type>),
Int64ColumnReader(ColumnReaderImpl<Int64Type>),
Int96ColumnReader(ColumnReaderImpl<Int96Type>),
FloatColumnReader(ColumnReaderImpl<FloatType>),
DoubleColumnReader(ColumnReaderImpl<DoubleType>),
ByteArrayColumnReader(ColumnReaderImpl<ByteArrayType>),
FixedLenByteArrayColumnReader(ColumnReaderImpl<FixedLenByteArrayType>)
}
pub fn get_column_reader(
col_descr: ColumnDescPtr,
col_page_reader: Box<PageReader>
) -> ColumnReader {
match col_descr.physical_type() {
Type::BOOLEAN => ColumnReader::BoolColumnReader(
ColumnReaderImpl::new(col_descr, col_page_reader)),
Type::INT32 => ColumnReader::Int32ColumnReader(
ColumnReaderImpl::new(col_descr, col_page_reader)),
Type::INT64 => ColumnReader::Int64ColumnReader(
ColumnReaderImpl::new(col_descr, col_page_reader)),
Type::INT96 => ColumnReader::Int96ColumnReader(
ColumnReaderImpl::new(col_descr, col_page_reader)),
Type::FLOAT => ColumnReader::FloatColumnReader(
ColumnReaderImpl::new(col_descr, col_page_reader)),
Type::DOUBLE => ColumnReader::DoubleColumnReader(
ColumnReaderImpl::new(col_descr, col_page_reader)),
Type::BYTE_ARRAY => ColumnReader::ByteArrayColumnReader(
ColumnReaderImpl::new(col_descr, col_page_reader)),
Type::FIXED_LEN_BYTE_ARRAY => ColumnReader::FixedLenByteArrayColumnReader(
ColumnReaderImpl::new(col_descr, col_page_reader))
}
}
pub fn get_typed_column_reader<T: DataType>(
col_reader: ColumnReader
) -> ColumnReaderImpl<T> {
match col_reader {
ColumnReader::BoolColumnReader(r) => unsafe { mem::transmute(r) },
ColumnReader::Int32ColumnReader(r) => unsafe { mem::transmute(r) },
ColumnReader::Int64ColumnReader(r) => unsafe { mem::transmute(r) },
ColumnReader::Int96ColumnReader(r) => unsafe { mem::transmute(r) },
ColumnReader::FloatColumnReader(r) => unsafe { mem::transmute(r) },
ColumnReader::DoubleColumnReader(r) => unsafe { mem::transmute(r) },
ColumnReader::ByteArrayColumnReader(r) => unsafe { mem::transmute(r) },
ColumnReader::FixedLenByteArrayColumnReader(r) => unsafe { mem::transmute(r) }
}
}
pub struct ColumnReaderImpl<T: DataType> {
descr: ColumnDescPtr,
def_level_decoder: Option<LevelDecoder>,
rep_level_decoder: Option<LevelDecoder>,
page_reader: Box<PageReader>,
current_encoding: Option<Encoding>,
num_buffered_values: u32,
num_decoded_values: u32,
decoders: HashMap<Encoding, Box<Decoder<T>>>
}
impl<T: DataType> ColumnReaderImpl<T> {
pub fn new(descr: ColumnDescPtr, page_reader: Box<PageReader>) -> Self {
Self {
descr: descr,
def_level_decoder: None,
rep_level_decoder: None,
page_reader: page_reader,
current_encoding: None,
num_buffered_values: 0,
num_decoded_values: 0,
decoders: HashMap::new()
}
}
#[inline]
pub fn read_batch(
&mut self,
batch_size: usize,
mut def_levels: Option<&mut [i16]>,
mut rep_levels: Option<&mut [i16]>,
values: &mut [T::T]
) -> Result<(usize, usize)> {
let mut values_read = 0;
let mut levels_read = 0;
let mut batch_size = min(batch_size, values.len());
if let Some(ref levels) = def_levels {
batch_size = min(batch_size, levels.len());
}
if let Some(ref levels) = rep_levels {
batch_size = min(batch_size, levels.len());
}
while max(values_read, levels_read) < batch_size {
if !self.has_next()? {
break;
}
let iter_batch_size = {
let mut adjusted_size = min(
batch_size,
(self.num_buffered_values - self.num_decoded_values) as usize
);
adjusted_size = min(adjusted_size, values.len() - values_read);
if let Some(ref levels) = def_levels {
adjusted_size = min(adjusted_size, levels.len() - levels_read);
}
if let Some(ref levels) = rep_levels {
adjusted_size = min(adjusted_size, levels.len() - levels_read);
}
adjusted_size
};
let mut values_to_read = 0;
let mut num_def_levels = 0;
let mut num_rep_levels = 0;
if self.descr.max_def_level() > 0 && def_levels.as_ref().is_some() {
if let Some(ref mut levels) = def_levels {
num_def_levels = self.read_def_levels(
&mut levels[levels_read..levels_read + iter_batch_size])?;
for i in levels_read..levels_read + num_def_levels {
if levels[i] == self.descr.max_def_level() {
values_to_read += 1;
}
}
}
} else {
values_to_read = iter_batch_size;
}
if self.descr.max_rep_level() > 0 && rep_levels.is_some() {
if let Some(ref mut levels) = rep_levels {
num_rep_levels = self.read_rep_levels(
&mut levels[levels_read..levels_read + iter_batch_size])?;
if def_levels.is_some() {
assert_eq!(
num_def_levels, num_rep_levels,
"Number of decoded rep / def levels did not match"
);
}
}
}
let curr_values_read = self.read_values(
&mut values[values_read..values_read + values_to_read])?;
let curr_levels_read = max(num_def_levels, num_rep_levels);
self.num_decoded_values += max(curr_levels_read, curr_values_read) as u32;
levels_read += curr_levels_read;
values_read += curr_values_read;
}
Ok((values_read, levels_read))
}
fn read_new_page(&mut self) -> Result<bool> {
#[allow(while_true)]
while true {
match self.page_reader.get_next_page()? {
None => {
return Ok(false)
},
Some(current_page) => {
match current_page {
p @ Page::DictionaryPage { .. } => {
self.configure_dictionary(p)?;
continue;
},
Page::DataPage {
buf,
num_values,
encoding,
def_level_encoding,
rep_level_encoding,
statistics: _
} => {
self.num_buffered_values = num_values;
self.num_decoded_values = 0;
let mut buffer_ptr = buf;
if self.descr.max_rep_level() > 0 {
let mut rep_decoder = LevelDecoder::v1(
rep_level_encoding, self.descr.max_rep_level());
let total_bytes = rep_decoder.set_data(
self.num_buffered_values as usize, buffer_ptr.all());
buffer_ptr = buffer_ptr.start_from(total_bytes);
self.rep_level_decoder = Some(rep_decoder);
}
if self.descr.max_def_level() > 0 {
let mut def_decoder = LevelDecoder::v1(
def_level_encoding, self.descr.max_def_level());
let total_bytes = def_decoder.set_data(
self.num_buffered_values as usize, buffer_ptr.all());
buffer_ptr = buffer_ptr.start_from(total_bytes);
self.def_level_decoder = Some(def_decoder);
}
self.set_current_page_encoding(
encoding, &buffer_ptr, 0, num_values as usize)?;
return Ok(true)
},
Page::DataPageV2 {
buf,
num_values,
encoding,
num_nulls: _,
num_rows: _,
def_levels_byte_len,
rep_levels_byte_len,
is_compressed: _,
statistics: _
} => {
self.num_buffered_values = num_values;
self.num_decoded_values = 0;
let mut offset = 0;
if self.descr.max_rep_level() > 0 {
let mut rep_decoder = LevelDecoder::v2(self.descr.max_rep_level());
let bytes_read = rep_decoder.set_data_range(
self.num_buffered_values as usize, &buf, offset,
rep_levels_byte_len as usize);
offset += bytes_read;
self.rep_level_decoder = Some(rep_decoder);
}
if self.descr.max_def_level() > 0 {
let mut def_decoder = LevelDecoder::v2(self.descr.max_def_level());
let bytes_read = def_decoder.set_data_range(
self.num_buffered_values as usize, &buf, offset,
def_levels_byte_len as usize);
offset += bytes_read;
self.def_level_decoder = Some(def_decoder);
}
self.set_current_page_encoding(
encoding, &buf, offset, num_values as usize)?;
return Ok(true)
},
};
}
}
}
Ok(true)
}
fn set_current_page_encoding(
&mut self,
mut encoding: Encoding,
buffer_ptr: &ByteBufferPtr,
offset: usize,
len: usize
) -> Result<()> {
if encoding == Encoding::PLAIN_DICTIONARY {
encoding = Encoding::RLE_DICTIONARY;
}
let decoder =
if encoding == Encoding::RLE_DICTIONARY {
self.decoders.get_mut(&encoding).expect("Decoder for dict should have been set")
} else {
if !self.decoders.contains_key(&encoding) {
let data_decoder = get_decoder::<T>(self.descr.clone(), encoding)?;
self.decoders.insert(encoding, data_decoder);
}
self.decoders.get_mut(&encoding).unwrap()
};
decoder.set_data(buffer_ptr.start_from(offset), len as usize)?;
self.current_encoding = Some(encoding);
Ok(())
}
#[inline]
fn has_next(&mut self) -> Result<bool> {
if self.num_buffered_values == 0 ||
self.num_buffered_values == self.num_decoded_values {
if !self.read_new_page()? {
Ok(false)
} else {
Ok(self.num_buffered_values != 0)
}
} else { Ok(true) }
}
#[inline]
fn read_rep_levels(&mut self, buffer: &mut [i16]) -> Result<usize> {
let level_decoder = self.rep_level_decoder.as_mut()
.expect("rep_level_decoder be set");
level_decoder.get(buffer)
}
#[inline]
fn read_def_levels(&mut self, buffer: &mut [i16]) -> Result<usize> {
let level_decoder = self.def_level_decoder.as_mut()
.expect("def_level_decoder be set");
level_decoder.get(buffer)
}
#[inline]
fn read_values(&mut self, buffer: &mut [T::T]) -> Result<usize> {
let encoding = self.current_encoding.expect("current_encoding should be set");
let current_decoder = self.decoders
.get_mut(&encoding)
.expect(format!("decoder for encoding {} should be set", encoding).as_str());
current_decoder.get(buffer)
}
#[inline]
fn configure_dictionary(&mut self, page: Page) -> Result<bool> {
let mut encoding = page.encoding();
if encoding == Encoding::PLAIN || encoding == Encoding::PLAIN_DICTIONARY {
encoding = Encoding::RLE_DICTIONARY
}
if self.decoders.contains_key(&encoding) {
return Err(general_err!("Column cannot have more than one dictionary"))
}
if encoding == Encoding::RLE_DICTIONARY {
let mut dictionary = PlainDecoder::<T>::new(self.descr.type_length());
let num_values = page.num_values();
dictionary.set_data(page.buffer().clone(), num_values as usize)?;
let mut decoder = DictDecoder::new();
decoder.set_dict(Box::new(dictionary))?;
self.decoders.insert(encoding, Box::new(decoder));
Ok(true)
} else {
Err(nyi_err!("Invalid/Unsupported encoding type for dictionary: {}", encoding))
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use rand::distributions::range::SampleRange;
use std::collections::VecDeque;
use std::rc::Rc;
use std::vec::IntoIter;
use basic::Type as PhysicalType;
use column::page::Page;
use encodings::encoding::{get_encoder, DictEncoder, Encoder};
use encodings::levels::{max_buffer_size, LevelEncoder};
use schema::types::{ColumnDescriptor, ColumnPath, Type as SchemaType};
use util::memory::{ByteBufferPtr, MemTracker, MemTrackerPtr};
use util::test_common::random_numbers_range;
const NUM_LEVELS: usize = 128;
const NUM_PAGES: usize = 2;
const MAX_DEF_LEVEL: i16 = 5;
const MAX_REP_LEVEL: i16 = 5;
macro_rules! test {
($test_func:ident, i32, $func:ident, $def_level:expr, $rep_level:expr,
$num_pages:expr, $num_levels:expr, $batch_size:expr, $min:expr, $max:expr) => {
test_internal!($test_func, Int32Type, get_test_int32_type, $func, $def_level,
$rep_level, $num_pages, $num_levels, $batch_size, $min, $max);
};
($test_func:ident, i64, $func:ident, $def_level:expr, $rep_level:expr,
$num_pages:expr, $num_levels:expr, $batch_size:expr, $min:expr, $max:expr) => {
test_internal!($test_func, Int64Type, get_test_int64_type, $func, $def_level,
$rep_level, $num_pages, $num_levels, $batch_size, $min, $max);
};
}
macro_rules! test_internal {
($test_func:ident, $ty:ident, $pty:ident, $func:ident, $def_level:expr,
$rep_level:expr, $num_pages:expr, $num_levels:expr, $batch_size:expr,
$min:expr, $max:expr) => {
#[test]
fn $test_func() {
let desc = Rc::new(ColumnDescriptor::new(
Rc::new($pty()), None, $def_level, $rep_level,
ColumnPath::new(Vec::new())));
let mut tester = ColumnReaderTester::<$ty>::new();
tester.$func(desc, $num_pages, $num_levels, $batch_size, $min, $max);
}
};
}
test!(test_read_plain_v1_int32, i32, plain_v1, MAX_DEF_LEVEL, MAX_REP_LEVEL,
NUM_PAGES, NUM_LEVELS, 16, ::std::i32::MIN, ::std::i32::MAX);
test!(test_read_plain_v2_int32, i32, plain_v2, MAX_DEF_LEVEL, MAX_REP_LEVEL,
NUM_PAGES, NUM_LEVELS, 16, ::std::i32::MIN, ::std::i32::MAX);
test!(test_read_plain_v1_int32_uneven, i32, plain_v1, MAX_DEF_LEVEL, MAX_REP_LEVEL,
NUM_PAGES, NUM_LEVELS, 17, ::std::i32::MIN, ::std::i32::MAX);
test!(test_read_plain_v2_int32_uneven, i32, plain_v2, MAX_DEF_LEVEL, MAX_REP_LEVEL,
NUM_PAGES, NUM_LEVELS, 17, ::std::i32::MIN, ::std::i32::MAX);
test!(test_read_plain_v1_int32_multi_page, i32, plain_v1, MAX_DEF_LEVEL, MAX_REP_LEVEL,
NUM_PAGES, NUM_LEVELS, 512, ::std::i32::MIN, ::std::i32::MAX);
test!(test_read_plain_v2_int32_multi_page, i32, plain_v2, MAX_DEF_LEVEL, MAX_REP_LEVEL,
NUM_PAGES, NUM_LEVELS, 512, ::std::i32::MIN, ::std::i32::MAX);
test!(test_read_plain_v1_int32_required_non_repeated, i32, plain_v1, 0, 0,
NUM_PAGES, NUM_LEVELS, 16, ::std::i32::MIN, ::std::i32::MAX);
test!(test_read_plain_v2_int32_required_non_repeated, i32, plain_v2, 0, 0,
NUM_PAGES, NUM_LEVELS, 16, ::std::i32::MIN, ::std::i32::MAX);
test!(test_read_plain_v1_int64, i64, plain_v1, 1, 1,
NUM_PAGES, NUM_LEVELS, 16, ::std::i64::MIN, ::std::i64::MAX);
test!(test_read_plain_v2_int64, i64, plain_v2, 1, 1,
NUM_PAGES, NUM_LEVELS, 16, ::std::i64::MIN, ::std::i64::MAX);
test!(test_read_plain_v1_int64_uneven, i64, plain_v1, 1, 1,
NUM_PAGES, NUM_LEVELS, 17, ::std::i64::MIN, ::std::i64::MAX);
test!(test_read_plain_v2_int64_uneven, i64, plain_v2, 1, 1,
NUM_PAGES, NUM_LEVELS, 17, ::std::i64::MIN, ::std::i64::MAX);
test!(test_read_plain_v1_int64_multi_page, i64, plain_v1, 1, 1,
NUM_PAGES, NUM_LEVELS, 512, ::std::i64::MIN, ::std::i64::MAX);
test!(test_read_plain_v2_int64_multi_page, i64, plain_v2, 1, 1,
NUM_PAGES, NUM_LEVELS, 512, ::std::i64::MIN, ::std::i64::MAX);
test!(test_read_plain_v1_int64_required_non_repeated, i64, plain_v1, 0, 0,
NUM_PAGES, NUM_LEVELS, 16, ::std::i64::MIN, ::std::i64::MAX);
test!(test_read_plain_v2_int64_required_non_repeated, i64, plain_v2, 0, 0,
NUM_PAGES, NUM_LEVELS, 16, ::std::i64::MIN, ::std::i64::MAX);
test!(test_read_dict_v1_int32_small, i32, dict_v1, MAX_DEF_LEVEL, MAX_REP_LEVEL,
2, 2, 16, 0, 3);
test!(test_read_dict_v2_int32_small, i32, dict_v2, MAX_DEF_LEVEL, MAX_REP_LEVEL,
2, 2, 16, 0, 3);
test!(test_read_dict_v1_int32, i32, dict_v1, MAX_DEF_LEVEL, MAX_REP_LEVEL,
NUM_PAGES, NUM_LEVELS, 16, 0, 3);
test!(test_read_dict_v2_int32, i32, dict_v2, MAX_DEF_LEVEL, MAX_REP_LEVEL,
NUM_PAGES, NUM_LEVELS, 16, 0, 3);
test!(test_read_dict_v1_int32_uneven, i32, dict_v1, MAX_DEF_LEVEL, MAX_REP_LEVEL,
NUM_PAGES, NUM_LEVELS, 17, 0, 3);
test!(test_read_dict_v2_int32_uneven, i32, dict_v2, MAX_DEF_LEVEL, MAX_REP_LEVEL,
NUM_PAGES, NUM_LEVELS, 17, 0, 3);
test!(test_read_dict_v1_int32_multi_page, i32, dict_v1, MAX_DEF_LEVEL, MAX_REP_LEVEL,
NUM_PAGES, NUM_LEVELS, 512, 0, 3);
test!(test_read_dict_v2_int32_multi_page, i32, dict_v2, MAX_DEF_LEVEL, MAX_REP_LEVEL,
NUM_PAGES, NUM_LEVELS, 512, 0, 3);
test!(test_read_dict_v1_int64, i64, dict_v1, MAX_DEF_LEVEL, MAX_REP_LEVEL,
NUM_PAGES, NUM_LEVELS, 16, 0, 3);
test!(test_read_dict_v2_int64, i64, dict_v2, MAX_DEF_LEVEL, MAX_REP_LEVEL,
NUM_PAGES, NUM_LEVELS, 16, 0, 3);
#[test]
fn test_read_batch_values_only() {
test_read_batch_int32(16, &mut vec![0; 10], None, None);
test_read_batch_int32(16, &mut vec![0; 16], None, None);
test_read_batch_int32(16, &mut vec![0; 51], None, None);
}
#[test]
fn test_read_batch_values_def_levels() {
test_read_batch_int32(16, &mut vec![0; 10], Some(&mut vec![0; 10]), None);
test_read_batch_int32(16, &mut vec![0; 16], Some(&mut vec![0; 16]), None);
test_read_batch_int32(16, &mut vec![0; 51], Some(&mut vec![0; 51]), None);
}
#[test]
fn test_read_batch_values_rep_levels() {
test_read_batch_int32(16, &mut vec![0; 10], None, Some(&mut vec![0; 10]));
test_read_batch_int32(16, &mut vec![0; 16], None, Some(&mut vec![0; 16]));
test_read_batch_int32(16, &mut vec![0; 51], None, Some(&mut vec![0; 51]));
}
#[test]
fn test_read_batch_different_buf_sizes() {
test_read_batch_int32(
16, &mut vec![0; 8], Some(&mut vec![0; 9]), Some(&mut vec![0; 7]));
test_read_batch_int32(
16, &mut vec![0; 1], Some(&mut vec![0; 9]), Some(&mut vec![0; 3]));
}
#[test]
fn test_read_batch_values_def_rep_levels() {
test_read_batch_int32(
128, &mut vec![0; 128], Some(&mut vec![0; 128]), Some(&mut vec![0; 128]));
}
#[test]
fn test_read_batch_adjust_after_buffering_page() {
let primitive_type = get_test_int32_type();
let desc = Rc::new(ColumnDescriptor::new(
Rc::new(primitive_type), None, 1, 1,
ColumnPath::new(Vec::new())));
let num_pages = 2;
let num_levels = 4;
let batch_size = 5;
let values = &mut vec![0; 7];
let def_levels = &mut vec![0; 7];
let rep_levels = &mut vec![0; 7];
let mut tester = ColumnReaderTester::<Int32Type>::new();
tester.test_read_batch(
desc,
Encoding::RLE_DICTIONARY,
num_pages,
num_levels,
batch_size,
::std::i32::MIN,
::std::i32::MAX,
values,
Some(def_levels),
Some(rep_levels),
false
);
}
fn get_test_int32_type() -> SchemaType {
SchemaType::primitive_type_builder("a", PhysicalType::INT32)
.with_repetition(Repetition::REQUIRED)
.with_logical_type(LogicalType::INT_32)
.with_length(-1)
.build()
.expect("build() should be OK")
}
fn get_test_int64_type() -> SchemaType {
SchemaType::primitive_type_builder("a", PhysicalType::INT64)
.with_repetition(Repetition::REQUIRED)
.with_logical_type(LogicalType::INT_64)
.with_length(-1)
.build()
.expect("build() should be OK")
}
fn test_read_batch_int32(
batch_size: usize,
values: &mut[i32],
def_levels: Option<&mut [i16]>,
rep_levels: Option<&mut [i16]>
) {
let primitive_type = get_test_int32_type();
let max_def_level = if def_levels.is_some() { MAX_DEF_LEVEL } else { 0 };
let max_rep_level = if def_levels.is_some() { MAX_REP_LEVEL } else { 0 };
let desc = Rc::new(ColumnDescriptor::new(
Rc::new(primitive_type), None, max_def_level, max_rep_level,
ColumnPath::new(Vec::new())));
let mut tester = ColumnReaderTester::<Int32Type>::new();
tester.test_read_batch(
desc,
Encoding::RLE_DICTIONARY,
NUM_PAGES,
NUM_LEVELS,
batch_size,
::std::i32::MIN,
::std::i32::MAX,
values,
def_levels,
rep_levels,
false
);
}
struct ColumnReaderTester<T: DataType>
where T::T: PartialOrd + SampleRange + Copy {
rep_levels: Vec<i16>,
def_levels: Vec<i16>,
values: Vec<T::T>
}
impl<T: DataType> ColumnReaderTester<T>
where T::T: PartialOrd + SampleRange + Copy {
pub fn new() -> Self {
Self { rep_levels: Vec::new(), def_levels: Vec::new(), values: Vec::new() }
}
fn plain_v1(
&mut self,
desc: ColumnDescPtr,
num_pages: usize,
num_levels: usize,
batch_size: usize,
min: T::T,
max: T::T
) {
self.test_read_batch_general(
desc,
Encoding::PLAIN,
num_pages,
num_levels,
batch_size,
min,
max,
false
);
}
fn plain_v2(
&mut self,
desc: ColumnDescPtr,
num_pages: usize,
num_levels: usize,
batch_size: usize,
min: T::T,
max: T::T
) {
self.test_read_batch_general(
desc,
Encoding::PLAIN,
num_pages,
num_levels,
batch_size,
min,
max,
true
);
}
fn dict_v1(
&mut self,
desc: ColumnDescPtr,
num_pages: usize,
num_levels: usize,
batch_size: usize,
min: T::T,
max: T::T
) {
self.test_read_batch_general(
desc,
Encoding::RLE_DICTIONARY,
num_pages,
num_levels,
batch_size,
min,
max,
false
);
}
fn dict_v2(
&mut self,
desc: ColumnDescPtr,
num_pages: usize,
num_levels: usize,
batch_size: usize,
min: T::T,
max: T::T
) {
self.test_read_batch_general(
desc,
Encoding::RLE_DICTIONARY,
num_pages,
num_levels,
batch_size,
min,
max,
true
);
}
fn test_read_batch_general(
&mut self,
desc: ColumnDescPtr,
encoding: Encoding,
num_pages: usize,
num_levels: usize,
batch_size: usize,
min: T::T,
max: T::T,
use_v2: bool
) {
let mut def_levels = vec![0; num_levels * num_pages];
let mut rep_levels = vec![0; num_levels * num_pages];
let mut values = vec![T::T::default(); num_levels * num_pages];
self.test_read_batch(
desc,
encoding,
num_pages,
num_levels,
batch_size,
min,
max,
&mut values,
Some(&mut def_levels),
Some(&mut rep_levels),
use_v2
);
}
fn test_read_batch(
&mut self,
desc: ColumnDescPtr,
encoding: Encoding,
num_pages: usize,
num_levels: usize,
batch_size: usize,
min: T::T,
max: T::T,
values: &mut [T::T],
mut def_levels: Option<&mut [i16]>,
mut rep_levels: Option<&mut [i16]>,
use_v2: bool
) {
let mut pages = VecDeque::new();
make_pages::<T>(
desc.clone(), encoding, num_pages, num_levels, min, max,
&mut self.def_levels, &mut self.rep_levels, &mut self.values, &mut pages, use_v2);
let max_def_level = desc.max_def_level();
let page_reader = TestPageReader::new(Vec::from(pages));
let column_reader: ColumnReader = get_column_reader(desc, Box::new(page_reader));
let mut typed_column_reader = get_typed_column_reader::<T>(column_reader);
let mut curr_values_read = 0;
let mut curr_levels_read = 0;
let mut done = false;
while !done {
let actual_def_levels = match &mut def_levels {
Some(ref mut vec) => Some(&mut vec[curr_levels_read..]),
None => None
};
let actual_rep_levels = match rep_levels {
Some(ref mut vec) => Some(&mut vec[curr_levels_read..]),
None => None
};
let (values_read, levels_read) = typed_column_reader.read_batch(
batch_size,
actual_def_levels,
actual_rep_levels,
&mut values[curr_values_read..]
).expect("read_batch() should be OK");
if values_read == 0 && levels_read == 0 {
done = true;
}
curr_values_read += values_read;
curr_levels_read += levels_read;
}
assert!(values.len() >= curr_values_read, "values.len() >= values_read");
assert_eq!(
&values[0..curr_values_read],
&self.values[0..curr_values_read],
"values content doesn't match"
);
if let Some(ref levels) = def_levels {
assert!(levels.len() >= curr_levels_read, "def_levels.len() >= levels_read");
assert_eq!(
&levels[0..curr_levels_read],
&self.def_levels[0..curr_levels_read],
"definition levels content doesn't match"
);
}
if let Some(ref levels) = rep_levels {
assert!(levels.len() >= curr_levels_read, "rep_levels.len() >= levels_read");
assert_eq!(
&levels[0..curr_levels_read],
&self.rep_levels[0..curr_levels_read],
"repetition levels content doesn't match"
);
}
if def_levels.is_none() && rep_levels.is_none() {
assert!(
curr_levels_read == 0,
"expected to read 0 levels, found {}",
curr_levels_read
);
} else if def_levels.is_some() && max_def_level > 0 {
assert!(
curr_levels_read >= curr_values_read,
"expected levels read to be greater than values read"
);
}
}
}
struct TestPageReader {
pages: IntoIter<Page>
}
impl TestPageReader {
pub fn new(pages: Vec<Page>) -> Self {
Self { pages: pages.into_iter() }
}
}
impl PageReader for TestPageReader {
fn get_next_page(&mut self) -> Result<Option<Page>> {
Ok(self.pages.next())
}
}
trait DataPageBuilder {
fn add_rep_levels(&mut self, max_level: i16, rep_levels: &[i16]);
fn add_def_levels(&mut self, max_level: i16, def_levels: &[i16]);
fn add_values<T: DataType>(
&mut self, encoding: Encoding, values: &[T::T]
);
fn add_indices(&mut self, indices: ByteBufferPtr);
fn consume(self) -> Page;
}
struct DataPageBuilderImpl {
desc: ColumnDescPtr,
encoding: Option<Encoding>,
mem_tracker: MemTrackerPtr,
num_values: u32,
buffer: Vec<u8>,
rep_levels_byte_len: u32,
def_levels_byte_len: u32,
datapage_v2: bool
}
impl DataPageBuilderImpl {
fn new(desc: ColumnDescPtr, num_values: u32, datapage_v2: bool) -> Self {
DataPageBuilderImpl {
desc: desc,
encoding: None,
mem_tracker: Rc::new(MemTracker::new()),
num_values: num_values,
buffer: vec![],
rep_levels_byte_len: 0,
def_levels_byte_len: 0,
datapage_v2: datapage_v2
}
}
fn add_levels(&mut self, max_level: i16, levels: &[i16]) -> u32 {
let size = max_buffer_size(Encoding::RLE, max_level, levels.len());
let mut level_encoder = LevelEncoder::v1(Encoding::RLE, max_level, vec![0; size]);
level_encoder.put(levels).expect("put() should be OK");
let encoded_levels = level_encoder.consume().expect("consume() should be OK");
let encoded_bytes = &encoded_levels[mem::size_of::<i32>()..];
if self.datapage_v2 {
self.buffer.extend_from_slice(encoded_bytes);
} else {
self.buffer.extend_from_slice(encoded_levels.as_slice());
}
encoded_bytes.len() as u32
}
}
impl DataPageBuilder for DataPageBuilderImpl {
fn add_rep_levels(&mut self, max_levels: i16, rep_levels: &[i16]) {
self.num_values = rep_levels.len() as u32;
self.rep_levels_byte_len = self.add_levels(max_levels, rep_levels);
}
fn add_def_levels(&mut self, max_levels: i16, def_levels: &[i16]) {
assert!(
self.num_values == def_levels.len() as u32,
"Must call `add_rep_levels() first!`");
self.def_levels_byte_len = self.add_levels(max_levels, def_levels);
}
fn add_values<T: DataType>(
&mut self, encoding: Encoding, values: &[T::T]
){
assert!(
self.num_values >= values.len() as u32,
"num_values: {}, values.len(): {}",
self.num_values,
values.len()
);
self.encoding = Some(encoding);
let mut encoder: Box<Encoder<T>> = get_encoder::<T>(
self.desc.clone(), encoding, self.mem_tracker.clone()
).expect("get_encoder() should be OK");
encoder.put(values).expect("put() should be OK");
let encoded_values = encoder.flush_buffer().expect("consume_buffer() should be OK");
self.buffer.extend_from_slice(encoded_values.data());
}
fn add_indices(&mut self, indices: ByteBufferPtr) {
self.encoding = Some(Encoding::RLE_DICTIONARY);
self.buffer.extend_from_slice(indices.data());
}
fn consume(self) -> Page {
if self.datapage_v2 {
Page::DataPageV2 {
buf: ByteBufferPtr::new(self.buffer),
num_values: self.num_values,
encoding: self.encoding.unwrap(),
num_nulls: 0,
num_rows: self.num_values,
def_levels_byte_len: self.def_levels_byte_len,
rep_levels_byte_len: self.rep_levels_byte_len,
is_compressed: false,
statistics: None
}
} else {
Page::DataPage {
buf: ByteBufferPtr::new(self.buffer),
num_values: self.num_values,
encoding: self.encoding.unwrap(),
def_level_encoding: Encoding::RLE,
rep_level_encoding: Encoding::RLE,
statistics: None
}
}
}
}
fn make_pages<T: DataType>(
desc: ColumnDescPtr,
encoding: Encoding,
num_pages: usize,
levels_per_page: usize,
min: T::T,
max: T::T,
def_levels: &mut Vec<i16>,
rep_levels: &mut Vec<i16>,
values: &mut Vec<T::T>,
pages: &mut VecDeque<Page>,
use_v2: bool
) where T::T: PartialOrd + SampleRange + Copy {
let mut num_values = 0;
let max_def_level = desc.max_def_level();
let max_rep_level = desc.max_rep_level();
let mem_tracker = Rc::new(MemTracker::new());
let mut dict_encoder = DictEncoder::<T>::new(desc.clone(), mem_tracker);
for i in 0..num_pages {
let mut num_values_cur_page = 0;
let level_range = i * levels_per_page..(i+1) * levels_per_page;
if max_def_level > 0 {
random_numbers_range(levels_per_page, 0, max_def_level + 1, def_levels);
for dl in &def_levels[level_range.clone()] {
if *dl == max_def_level {
num_values_cur_page += 1;
}
}
} else {
num_values_cur_page = levels_per_page;
}
if max_rep_level > 0 {
random_numbers_range(levels_per_page, 0, max_rep_level + 1, rep_levels);
}
random_numbers_range(num_values_cur_page, min, max, values);
let mut pb = DataPageBuilderImpl::new(desc.clone(),
num_values_cur_page as u32, use_v2);
if max_rep_level > 0 {
pb.add_rep_levels(max_rep_level, &rep_levels[level_range.clone()]);
}
if max_def_level > 0 {
pb.add_def_levels(max_def_level, &def_levels[level_range]);
}
let value_range = num_values..num_values + num_values_cur_page;
match encoding {
Encoding::PLAIN_DICTIONARY | Encoding::RLE_DICTIONARY => {
let _ = dict_encoder.put(&values[value_range.clone()]);
let indices = dict_encoder
.write_indices()
.expect("write_indices() should be OK");
pb.add_indices(indices);
},
Encoding::PLAIN => {
pb.add_values::<T>(encoding, &values[value_range]);
},
enc @ _ => panic!("Unexpected encoding {}", enc)
}
let data_page = pb.consume();
pages.push_back(data_page);
num_values += num_values_cur_page;
}
if encoding == Encoding::PLAIN_DICTIONARY || encoding == Encoding::RLE_DICTIONARY {
let dict = dict_encoder.write_dict().expect("write_dict() should be OK");
let dict_page = Page::DictionaryPage {
buf: dict,
num_values: dict_encoder.num_entries() as u32,
encoding: Encoding::RLE_DICTIONARY,
is_sorted: false
};
pages.push_front(dict_page);
}
}
}