use std::{
cmp::{max, min},
collections::HashMap,
mem,
};
use super::page::{Page, PageReader};
use basic::*;
use data_type::*;
use encodings::{
decoding::{get_decoder, Decoder, DictDecoder, PlainDecoder},
levels::LevelDecoder,
};
use errors::{ParquetError, Result};
use schema::types::ColumnDescPtr;
use util::memory::ByteBufferPtr;
pub enum ColumnReader {
BoolColumnReader(ColumnReaderImpl<BoolType>),
Int32ColumnReader(ColumnReaderImpl<Int32Type>),
Int64ColumnReader(ColumnReaderImpl<Int64Type>),
Int96ColumnReader(ColumnReaderImpl<Int96Type>),
FloatColumnReader(ColumnReaderImpl<FloatType>),
DoubleColumnReader(ColumnReaderImpl<DoubleType>),
ByteArrayColumnReader(ColumnReaderImpl<ByteArrayType>),
FixedLenByteArrayColumnReader(ColumnReaderImpl<FixedLenByteArrayType>),
}
pub fn get_column_reader(
col_descr: ColumnDescPtr,
col_page_reader: Box<PageReader>,
) -> ColumnReader
{
match col_descr.physical_type() {
Type::BOOLEAN => {
ColumnReader::BoolColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
},
Type::INT32 => {
ColumnReader::Int32ColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
},
Type::INT64 => {
ColumnReader::Int64ColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
},
Type::INT96 => {
ColumnReader::Int96ColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
},
Type::FLOAT => {
ColumnReader::FloatColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
},
Type::DOUBLE => {
ColumnReader::DoubleColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
},
Type::BYTE_ARRAY => ColumnReader::ByteArrayColumnReader(ColumnReaderImpl::new(
col_descr,
col_page_reader,
)),
Type::FIXED_LEN_BYTE_ARRAY => ColumnReader::FixedLenByteArrayColumnReader(
ColumnReaderImpl::new(col_descr, col_page_reader),
),
}
}
pub fn get_typed_column_reader<T: DataType>(
col_reader: ColumnReader,
) -> ColumnReaderImpl<T> {
match col_reader {
ColumnReader::BoolColumnReader(r) => unsafe { mem::transmute(r) },
ColumnReader::Int32ColumnReader(r) => unsafe { mem::transmute(r) },
ColumnReader::Int64ColumnReader(r) => unsafe { mem::transmute(r) },
ColumnReader::Int96ColumnReader(r) => unsafe { mem::transmute(r) },
ColumnReader::FloatColumnReader(r) => unsafe { mem::transmute(r) },
ColumnReader::DoubleColumnReader(r) => unsafe { mem::transmute(r) },
ColumnReader::ByteArrayColumnReader(r) => unsafe { mem::transmute(r) },
ColumnReader::FixedLenByteArrayColumnReader(r) => unsafe { mem::transmute(r) },
}
}
pub struct ColumnReaderImpl<T: DataType> {
descr: ColumnDescPtr,
def_level_decoder: Option<LevelDecoder>,
rep_level_decoder: Option<LevelDecoder>,
page_reader: Box<PageReader>,
current_encoding: Option<Encoding>,
num_buffered_values: u32,
num_decoded_values: u32,
decoders: HashMap<Encoding, Box<Decoder<T>>>,
}
impl<T: DataType> ColumnReaderImpl<T> {
pub fn new(descr: ColumnDescPtr, page_reader: Box<PageReader>) -> Self {
Self {
descr,
def_level_decoder: None,
rep_level_decoder: None,
page_reader,
current_encoding: None,
num_buffered_values: 0,
num_decoded_values: 0,
decoders: HashMap::new(),
}
}
#[inline]
pub fn read_batch(
&mut self,
batch_size: usize,
mut def_levels: Option<&mut [i16]>,
mut rep_levels: Option<&mut [i16]>,
values: &mut [T::T],
) -> Result<(usize, usize)>
{
let mut values_read = 0;
let mut levels_read = 0;
let mut batch_size = min(batch_size, values.len());
if let Some(ref levels) = def_levels {
batch_size = min(batch_size, levels.len());
}
if let Some(ref levels) = rep_levels {
batch_size = min(batch_size, levels.len());
}
while max(values_read, levels_read) < batch_size {
if !self.has_next()? {
break;
}
let iter_batch_size = {
let mut adjusted_size = min(
batch_size,
(self.num_buffered_values - self.num_decoded_values) as usize,
);
adjusted_size = min(adjusted_size, values.len() - values_read);
if let Some(ref levels) = def_levels {
adjusted_size = min(adjusted_size, levels.len() - levels_read);
}
if let Some(ref levels) = rep_levels {
adjusted_size = min(adjusted_size, levels.len() - levels_read);
}
adjusted_size
};
let mut values_to_read = 0;
let mut num_def_levels = 0;
let mut num_rep_levels = 0;
if self.descr.max_def_level() > 0 && def_levels.as_ref().is_some() {
if let Some(ref mut levels) = def_levels {
num_def_levels = self
.read_def_levels(&mut levels[levels_read..levels_read + iter_batch_size])?;
for i in levels_read..levels_read + num_def_levels {
if levels[i] == self.descr.max_def_level() {
values_to_read += 1;
}
}
}
} else {
values_to_read = iter_batch_size;
}
if self.descr.max_rep_level() > 0 && rep_levels.is_some() {
if let Some(ref mut levels) = rep_levels {
num_rep_levels = self
.read_rep_levels(&mut levels[levels_read..levels_read + iter_batch_size])?;
if def_levels.is_some() {
assert_eq!(
num_def_levels, num_rep_levels,
"Number of decoded rep / def levels did not match"
);
}
}
}
let curr_values_read =
self.read_values(&mut values[values_read..values_read + values_to_read])?;
let curr_levels_read = max(num_def_levels, num_rep_levels);
self.num_decoded_values += max(curr_levels_read, curr_values_read) as u32;
levels_read += curr_levels_read;
values_read += curr_values_read;
}
Ok((values_read, levels_read))
}
fn read_new_page(&mut self) -> Result<bool> {
#[allow(while_true)]
while true {
match self.page_reader.get_next_page()? {
None => return Ok(false),
Some(current_page) => {
match current_page {
p @ Page::DictionaryPage { .. } => {
self.configure_dictionary(p)?;
continue;
},
Page::DataPage {
buf,
num_values,
encoding,
def_level_encoding,
rep_level_encoding,
statistics: _,
} => {
self.num_buffered_values = num_values;
self.num_decoded_values = 0;
let mut buffer_ptr = buf;
if self.descr.max_rep_level() > 0 {
let mut rep_decoder =
LevelDecoder::v1(rep_level_encoding, self.descr.max_rep_level());
let total_bytes = rep_decoder
.set_data(self.num_buffered_values as usize, buffer_ptr.all());
buffer_ptr = buffer_ptr.start_from(total_bytes);
self.rep_level_decoder = Some(rep_decoder);
}
if self.descr.max_def_level() > 0 {
let mut def_decoder =
LevelDecoder::v1(def_level_encoding, self.descr.max_def_level());
let total_bytes = def_decoder
.set_data(self.num_buffered_values as usize, buffer_ptr.all());
buffer_ptr = buffer_ptr.start_from(total_bytes);
self.def_level_decoder = Some(def_decoder);
}
self.set_current_page_encoding(
encoding,
&buffer_ptr,
0,
num_values as usize,
)?;
return Ok(true);
},
Page::DataPageV2 {
buf,
num_values,
encoding,
num_nulls: _,
num_rows: _,
def_levels_byte_len,
rep_levels_byte_len,
is_compressed: _,
statistics: _,
} => {
self.num_buffered_values = num_values;
self.num_decoded_values = 0;
let mut offset = 0;
if self.descr.max_rep_level() > 0 {
let mut rep_decoder = LevelDecoder::v2(self.descr.max_rep_level());
let bytes_read = rep_decoder.set_data_range(
self.num_buffered_values as usize,
&buf,
offset,
rep_levels_byte_len as usize,
);
offset += bytes_read;
self.rep_level_decoder = Some(rep_decoder);
}
if self.descr.max_def_level() > 0 {
let mut def_decoder = LevelDecoder::v2(self.descr.max_def_level());
let bytes_read = def_decoder.set_data_range(
self.num_buffered_values as usize,
&buf,
offset,
def_levels_byte_len as usize,
);
offset += bytes_read;
self.def_level_decoder = Some(def_decoder);
}
self.set_current_page_encoding(
encoding,
&buf,
offset,
num_values as usize,
)?;
return Ok(true);
},
};
},
}
}
Ok(true)
}
fn set_current_page_encoding(
&mut self,
mut encoding: Encoding,
buffer_ptr: &ByteBufferPtr,
offset: usize,
len: usize,
) -> Result<()>
{
if encoding == Encoding::PLAIN_DICTIONARY {
encoding = Encoding::RLE_DICTIONARY;
}
let decoder = if encoding == Encoding::RLE_DICTIONARY {
self
.decoders
.get_mut(&encoding)
.expect("Decoder for dict should have been set")
} else {
if !self.decoders.contains_key(&encoding) {
let data_decoder = get_decoder::<T>(self.descr.clone(), encoding)?;
self.decoders.insert(encoding, data_decoder);
}
self.decoders.get_mut(&encoding).unwrap()
};
decoder.set_data(buffer_ptr.start_from(offset), len as usize)?;
self.current_encoding = Some(encoding);
Ok(())
}
#[inline]
fn has_next(&mut self) -> Result<bool> {
if self.num_buffered_values == 0
|| self.num_buffered_values == self.num_decoded_values
{
if !self.read_new_page()? {
Ok(false)
} else {
Ok(self.num_buffered_values != 0)
}
} else {
Ok(true)
}
}
#[inline]
fn read_rep_levels(&mut self, buffer: &mut [i16]) -> Result<usize> {
let level_decoder = self
.rep_level_decoder
.as_mut()
.expect("rep_level_decoder be set");
level_decoder.get(buffer)
}
#[inline]
fn read_def_levels(&mut self, buffer: &mut [i16]) -> Result<usize> {
let level_decoder = self
.def_level_decoder
.as_mut()
.expect("def_level_decoder be set");
level_decoder.get(buffer)
}
#[inline]
fn read_values(&mut self, buffer: &mut [T::T]) -> Result<usize> {
let encoding = self
.current_encoding
.expect("current_encoding should be set");
let current_decoder = self
.decoders
.get_mut(&encoding)
.expect(format!("decoder for encoding {} should be set", encoding).as_str());
current_decoder.get(buffer)
}
#[inline]
fn configure_dictionary(&mut self, page: Page) -> Result<bool> {
let mut encoding = page.encoding();
if encoding == Encoding::PLAIN || encoding == Encoding::PLAIN_DICTIONARY {
encoding = Encoding::RLE_DICTIONARY
}
if self.decoders.contains_key(&encoding) {
return Err(general_err!("Column cannot have more than one dictionary"));
}
if encoding == Encoding::RLE_DICTIONARY {
let mut dictionary = PlainDecoder::<T>::new(self.descr.type_length());
let num_values = page.num_values();
dictionary.set_data(page.buffer().clone(), num_values as usize)?;
let mut decoder = DictDecoder::new();
decoder.set_dict(Box::new(dictionary))?;
self.decoders.insert(encoding, Box::new(decoder));
Ok(true)
} else {
Err(nyi_err!(
"Invalid/Unsupported encoding type for dictionary: {}",
encoding
))
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use rand::distributions::range::SampleRange;
use std::{collections::VecDeque, rc::Rc, vec::IntoIter};
use basic::Type as PhysicalType;
use column::page::Page;
use encodings::{
encoding::{get_encoder, DictEncoder, Encoder},
levels::{max_buffer_size, LevelEncoder},
};
use schema::types::{ColumnDescriptor, ColumnPath, Type as SchemaType};
use util::{
memory::{ByteBufferPtr, MemTracker, MemTrackerPtr},
test_common::random_numbers_range,
};
const NUM_LEVELS: usize = 128;
const NUM_PAGES: usize = 2;
const MAX_DEF_LEVEL: i16 = 5;
const MAX_REP_LEVEL: i16 = 5;
macro_rules! test {
($test_func:ident, i32, $func:ident, $def_level:expr, $rep_level:expr,
$num_pages:expr, $num_levels:expr, $batch_size:expr, $min:expr, $max:expr) => {
test_internal!(
$test_func,
Int32Type,
get_test_int32_type,
$func,
$def_level,
$rep_level,
$num_pages,
$num_levels,
$batch_size,
$min,
$max
);
};
($test_func:ident, i64, $func:ident, $def_level:expr, $rep_level:expr,
$num_pages:expr, $num_levels:expr, $batch_size:expr, $min:expr, $max:expr) => {
test_internal!(
$test_func,
Int64Type,
get_test_int64_type,
$func,
$def_level,
$rep_level,
$num_pages,
$num_levels,
$batch_size,
$min,
$max
);
};
}
macro_rules! test_internal {
($test_func:ident, $ty:ident, $pty:ident, $func:ident, $def_level:expr,
$rep_level:expr, $num_pages:expr, $num_levels:expr, $batch_size:expr,
$min:expr, $max:expr) => {
#[test]
fn $test_func() {
let desc = Rc::new(ColumnDescriptor::new(
Rc::new($pty()),
None,
$def_level,
$rep_level,
ColumnPath::new(Vec::new()),
));
let mut tester = ColumnReaderTester::<$ty>::new();
tester.$func(desc, $num_pages, $num_levels, $batch_size, $min, $max);
}
};
}
test!(
test_read_plain_v1_int32,
i32,
plain_v1,
MAX_DEF_LEVEL,
MAX_REP_LEVEL,
NUM_PAGES,
NUM_LEVELS,
16,
::std::i32::MIN,
::std::i32::MAX
);
test!(
test_read_plain_v2_int32,
i32,
plain_v2,
MAX_DEF_LEVEL,
MAX_REP_LEVEL,
NUM_PAGES,
NUM_LEVELS,
16,
::std::i32::MIN,
::std::i32::MAX
);
test!(
test_read_plain_v1_int32_uneven,
i32,
plain_v1,
MAX_DEF_LEVEL,
MAX_REP_LEVEL,
NUM_PAGES,
NUM_LEVELS,
17,
::std::i32::MIN,
::std::i32::MAX
);
test!(
test_read_plain_v2_int32_uneven,
i32,
plain_v2,
MAX_DEF_LEVEL,
MAX_REP_LEVEL,
NUM_PAGES,
NUM_LEVELS,
17,
::std::i32::MIN,
::std::i32::MAX
);
test!(
test_read_plain_v1_int32_multi_page,
i32,
plain_v1,
MAX_DEF_LEVEL,
MAX_REP_LEVEL,
NUM_PAGES,
NUM_LEVELS,
512,
::std::i32::MIN,
::std::i32::MAX
);
test!(
test_read_plain_v2_int32_multi_page,
i32,
plain_v2,
MAX_DEF_LEVEL,
MAX_REP_LEVEL,
NUM_PAGES,
NUM_LEVELS,
512,
::std::i32::MIN,
::std::i32::MAX
);
test!(
test_read_plain_v1_int32_required_non_repeated,
i32,
plain_v1,
0,
0,
NUM_PAGES,
NUM_LEVELS,
16,
::std::i32::MIN,
::std::i32::MAX
);
test!(
test_read_plain_v2_int32_required_non_repeated,
i32,
plain_v2,
0,
0,
NUM_PAGES,
NUM_LEVELS,
16,
::std::i32::MIN,
::std::i32::MAX
);
test!(
test_read_plain_v1_int64,
i64,
plain_v1,
1,
1,
NUM_PAGES,
NUM_LEVELS,
16,
::std::i64::MIN,
::std::i64::MAX
);
test!(
test_read_plain_v2_int64,
i64,
plain_v2,
1,
1,
NUM_PAGES,
NUM_LEVELS,
16,
::std::i64::MIN,
::std::i64::MAX
);
test!(
test_read_plain_v1_int64_uneven,
i64,
plain_v1,
1,
1,
NUM_PAGES,
NUM_LEVELS,
17,
::std::i64::MIN,
::std::i64::MAX
);
test!(
test_read_plain_v2_int64_uneven,
i64,
plain_v2,
1,
1,
NUM_PAGES,
NUM_LEVELS,
17,
::std::i64::MIN,
::std::i64::MAX
);
test!(
test_read_plain_v1_int64_multi_page,
i64,
plain_v1,
1,
1,
NUM_PAGES,
NUM_LEVELS,
512,
::std::i64::MIN,
::std::i64::MAX
);
test!(
test_read_plain_v2_int64_multi_page,
i64,
plain_v2,
1,
1,
NUM_PAGES,
NUM_LEVELS,
512,
::std::i64::MIN,
::std::i64::MAX
);
test!(
test_read_plain_v1_int64_required_non_repeated,
i64,
plain_v1,
0,
0,
NUM_PAGES,
NUM_LEVELS,
16,
::std::i64::MIN,
::std::i64::MAX
);
test!(
test_read_plain_v2_int64_required_non_repeated,
i64,
plain_v2,
0,
0,
NUM_PAGES,
NUM_LEVELS,
16,
::std::i64::MIN,
::std::i64::MAX
);
test!(
test_read_dict_v1_int32_small,
i32,
dict_v1,
MAX_DEF_LEVEL,
MAX_REP_LEVEL,
2,
2,
16,
0,
3
);
test!(
test_read_dict_v2_int32_small,
i32,
dict_v2,
MAX_DEF_LEVEL,
MAX_REP_LEVEL,
2,
2,
16,
0,
3
);
test!(
test_read_dict_v1_int32,
i32,
dict_v1,
MAX_DEF_LEVEL,
MAX_REP_LEVEL,
NUM_PAGES,
NUM_LEVELS,
16,
0,
3
);
test!(
test_read_dict_v2_int32,
i32,
dict_v2,
MAX_DEF_LEVEL,
MAX_REP_LEVEL,
NUM_PAGES,
NUM_LEVELS,
16,
0,
3
);
test!(
test_read_dict_v1_int32_uneven,
i32,
dict_v1,
MAX_DEF_LEVEL,
MAX_REP_LEVEL,
NUM_PAGES,
NUM_LEVELS,
17,
0,
3
);
test!(
test_read_dict_v2_int32_uneven,
i32,
dict_v2,
MAX_DEF_LEVEL,
MAX_REP_LEVEL,
NUM_PAGES,
NUM_LEVELS,
17,
0,
3
);
test!(
test_read_dict_v1_int32_multi_page,
i32,
dict_v1,
MAX_DEF_LEVEL,
MAX_REP_LEVEL,
NUM_PAGES,
NUM_LEVELS,
512,
0,
3
);
test!(
test_read_dict_v2_int32_multi_page,
i32,
dict_v2,
MAX_DEF_LEVEL,
MAX_REP_LEVEL,
NUM_PAGES,
NUM_LEVELS,
512,
0,
3
);
test!(
test_read_dict_v1_int64,
i64,
dict_v1,
MAX_DEF_LEVEL,
MAX_REP_LEVEL,
NUM_PAGES,
NUM_LEVELS,
16,
0,
3
);
test!(
test_read_dict_v2_int64,
i64,
dict_v2,
MAX_DEF_LEVEL,
MAX_REP_LEVEL,
NUM_PAGES,
NUM_LEVELS,
16,
0,
3
);
#[test]
fn test_read_batch_values_only() {
test_read_batch_int32(16, &mut vec![0; 10], None, None);
test_read_batch_int32(16, &mut vec![0; 16], None, None);
test_read_batch_int32(16, &mut vec![0; 51], None, None);
}
#[test]
fn test_read_batch_values_def_levels() {
test_read_batch_int32(16, &mut vec![0; 10], Some(&mut vec![0; 10]), None);
test_read_batch_int32(16, &mut vec![0; 16], Some(&mut vec![0; 16]), None);
test_read_batch_int32(16, &mut vec![0; 51], Some(&mut vec![0; 51]), None);
}
#[test]
fn test_read_batch_values_rep_levels() {
test_read_batch_int32(16, &mut vec![0; 10], None, Some(&mut vec![0; 10]));
test_read_batch_int32(16, &mut vec![0; 16], None, Some(&mut vec![0; 16]));
test_read_batch_int32(16, &mut vec![0; 51], None, Some(&mut vec![0; 51]));
}
#[test]
fn test_read_batch_different_buf_sizes() {
test_read_batch_int32(
16,
&mut vec![0; 8],
Some(&mut vec![0; 9]),
Some(&mut vec![0; 7]),
);
test_read_batch_int32(
16,
&mut vec![0; 1],
Some(&mut vec![0; 9]),
Some(&mut vec![0; 3]),
);
}
#[test]
fn test_read_batch_values_def_rep_levels() {
test_read_batch_int32(
128,
&mut vec![0; 128],
Some(&mut vec![0; 128]),
Some(&mut vec![0; 128]),
);
}
#[test]
fn test_read_batch_adjust_after_buffering_page() {
let primitive_type = get_test_int32_type();
let desc = Rc::new(ColumnDescriptor::new(
Rc::new(primitive_type),
None,
1,
1,
ColumnPath::new(Vec::new()),
));
let num_pages = 2;
let num_levels = 4;
let batch_size = 5;
let values = &mut vec![0; 7];
let def_levels = &mut vec![0; 7];
let rep_levels = &mut vec![0; 7];
let mut tester = ColumnReaderTester::<Int32Type>::new();
tester.test_read_batch(
desc,
Encoding::RLE_DICTIONARY,
num_pages,
num_levels,
batch_size,
::std::i32::MIN,
::std::i32::MAX,
values,
Some(def_levels),
Some(rep_levels),
false,
);
}
fn get_test_int32_type() -> SchemaType {
SchemaType::primitive_type_builder("a", PhysicalType::INT32)
.with_repetition(Repetition::REQUIRED)
.with_logical_type(LogicalType::INT_32)
.with_length(-1)
.build()
.expect("build() should be OK")
}
fn get_test_int64_type() -> SchemaType {
SchemaType::primitive_type_builder("a", PhysicalType::INT64)
.with_repetition(Repetition::REQUIRED)
.with_logical_type(LogicalType::INT_64)
.with_length(-1)
.build()
.expect("build() should be OK")
}
fn test_read_batch_int32(
batch_size: usize,
values: &mut [i32],
def_levels: Option<&mut [i16]>,
rep_levels: Option<&mut [i16]>,
)
{
let primitive_type = get_test_int32_type();
let max_def_level = if def_levels.is_some() {
MAX_DEF_LEVEL
} else {
0
};
let max_rep_level = if def_levels.is_some() {
MAX_REP_LEVEL
} else {
0
};
let desc = Rc::new(ColumnDescriptor::new(
Rc::new(primitive_type),
None,
max_def_level,
max_rep_level,
ColumnPath::new(Vec::new()),
));
let mut tester = ColumnReaderTester::<Int32Type>::new();
tester.test_read_batch(
desc,
Encoding::RLE_DICTIONARY,
NUM_PAGES,
NUM_LEVELS,
batch_size,
::std::i32::MIN,
::std::i32::MAX,
values,
def_levels,
rep_levels,
false,
);
}
struct ColumnReaderTester<T: DataType>
where T::T: PartialOrd + SampleRange + Copy {
rep_levels: Vec<i16>,
def_levels: Vec<i16>,
values: Vec<T::T>,
}
impl<T: DataType> ColumnReaderTester<T>
where T::T: PartialOrd + SampleRange + Copy
{
pub fn new() -> Self {
Self {
rep_levels: Vec::new(),
def_levels: Vec::new(),
values: Vec::new(),
}
}
fn plain_v1(
&mut self,
desc: ColumnDescPtr,
num_pages: usize,
num_levels: usize,
batch_size: usize,
min: T::T,
max: T::T,
)
{
self.test_read_batch_general(
desc,
Encoding::PLAIN,
num_pages,
num_levels,
batch_size,
min,
max,
false,
);
}
fn plain_v2(
&mut self,
desc: ColumnDescPtr,
num_pages: usize,
num_levels: usize,
batch_size: usize,
min: T::T,
max: T::T,
)
{
self.test_read_batch_general(
desc,
Encoding::PLAIN,
num_pages,
num_levels,
batch_size,
min,
max,
true,
);
}
fn dict_v1(
&mut self,
desc: ColumnDescPtr,
num_pages: usize,
num_levels: usize,
batch_size: usize,
min: T::T,
max: T::T,
)
{
self.test_read_batch_general(
desc,
Encoding::RLE_DICTIONARY,
num_pages,
num_levels,
batch_size,
min,
max,
false,
);
}
fn dict_v2(
&mut self,
desc: ColumnDescPtr,
num_pages: usize,
num_levels: usize,
batch_size: usize,
min: T::T,
max: T::T,
)
{
self.test_read_batch_general(
desc,
Encoding::RLE_DICTIONARY,
num_pages,
num_levels,
batch_size,
min,
max,
true,
);
}
fn test_read_batch_general(
&mut self,
desc: ColumnDescPtr,
encoding: Encoding,
num_pages: usize,
num_levels: usize,
batch_size: usize,
min: T::T,
max: T::T,
use_v2: bool,
)
{
let mut def_levels = vec![0; num_levels * num_pages];
let mut rep_levels = vec![0; num_levels * num_pages];
let mut values = vec![T::T::default(); num_levels * num_pages];
self.test_read_batch(
desc,
encoding,
num_pages,
num_levels,
batch_size,
min,
max,
&mut values,
Some(&mut def_levels),
Some(&mut rep_levels),
use_v2,
);
}
fn test_read_batch(
&mut self,
desc: ColumnDescPtr,
encoding: Encoding,
num_pages: usize,
num_levels: usize,
batch_size: usize,
min: T::T,
max: T::T,
values: &mut [T::T],
mut def_levels: Option<&mut [i16]>,
mut rep_levels: Option<&mut [i16]>,
use_v2: bool,
)
{
let mut pages = VecDeque::new();
make_pages::<T>(
desc.clone(),
encoding,
num_pages,
num_levels,
min,
max,
&mut self.def_levels,
&mut self.rep_levels,
&mut self.values,
&mut pages,
use_v2,
);
let max_def_level = desc.max_def_level();
let page_reader = TestPageReader::new(Vec::from(pages));
let column_reader: ColumnReader = get_column_reader(desc, Box::new(page_reader));
let mut typed_column_reader = get_typed_column_reader::<T>(column_reader);
let mut curr_values_read = 0;
let mut curr_levels_read = 0;
let mut done = false;
while !done {
let actual_def_levels = match &mut def_levels {
Some(ref mut vec) => Some(&mut vec[curr_levels_read..]),
None => None,
};
let actual_rep_levels = match rep_levels {
Some(ref mut vec) => Some(&mut vec[curr_levels_read..]),
None => None,
};
let (values_read, levels_read) = typed_column_reader
.read_batch(
batch_size,
actual_def_levels,
actual_rep_levels,
&mut values[curr_values_read..],
)
.expect("read_batch() should be OK");
if values_read == 0 && levels_read == 0 {
done = true;
}
curr_values_read += values_read;
curr_levels_read += levels_read;
}
assert!(
values.len() >= curr_values_read,
"values.len() >= values_read"
);
assert_eq!(
&values[0..curr_values_read],
&self.values[0..curr_values_read],
"values content doesn't match"
);
if let Some(ref levels) = def_levels {
assert!(
levels.len() >= curr_levels_read,
"def_levels.len() >= levels_read"
);
assert_eq!(
&levels[0..curr_levels_read],
&self.def_levels[0..curr_levels_read],
"definition levels content doesn't match"
);
}
if let Some(ref levels) = rep_levels {
assert!(
levels.len() >= curr_levels_read,
"rep_levels.len() >= levels_read"
);
assert_eq!(
&levels[0..curr_levels_read],
&self.rep_levels[0..curr_levels_read],
"repetition levels content doesn't match"
);
}
if def_levels.is_none() && rep_levels.is_none() {
assert!(
curr_levels_read == 0,
"expected to read 0 levels, found {}",
curr_levels_read
);
} else if def_levels.is_some() && max_def_level > 0 {
assert!(
curr_levels_read >= curr_values_read,
"expected levels read to be greater than values read"
);
}
}
}
struct TestPageReader {
pages: IntoIter<Page>,
}
impl TestPageReader {
pub fn new(pages: Vec<Page>) -> Self {
Self {
pages: pages.into_iter(),
}
}
}
impl PageReader for TestPageReader {
fn get_next_page(&mut self) -> Result<Option<Page>> { Ok(self.pages.next()) }
}
trait DataPageBuilder {
fn add_rep_levels(&mut self, max_level: i16, rep_levels: &[i16]);
fn add_def_levels(&mut self, max_level: i16, def_levels: &[i16]);
fn add_values<T: DataType>(&mut self, encoding: Encoding, values: &[T::T]);
fn add_indices(&mut self, indices: ByteBufferPtr);
fn consume(self) -> Page;
}
struct DataPageBuilderImpl {
desc: ColumnDescPtr,
encoding: Option<Encoding>,
mem_tracker: MemTrackerPtr,
num_values: u32,
buffer: Vec<u8>,
rep_levels_byte_len: u32,
def_levels_byte_len: u32,
datapage_v2: bool,
}
impl DataPageBuilderImpl {
fn new(desc: ColumnDescPtr, num_values: u32, datapage_v2: bool) -> Self {
DataPageBuilderImpl {
desc,
encoding: None,
mem_tracker: Rc::new(MemTracker::new()),
num_values,
buffer: vec![],
rep_levels_byte_len: 0,
def_levels_byte_len: 0,
datapage_v2,
}
}
fn add_levels(&mut self, max_level: i16, levels: &[i16]) -> u32 {
let size = max_buffer_size(Encoding::RLE, max_level, levels.len());
let mut level_encoder = LevelEncoder::v1(Encoding::RLE, max_level, vec![0; size]);
level_encoder.put(levels).expect("put() should be OK");
let encoded_levels = level_encoder.consume().expect("consume() should be OK");
let encoded_bytes = &encoded_levels[mem::size_of::<i32>()..];
if self.datapage_v2 {
self.buffer.extend_from_slice(encoded_bytes);
} else {
self.buffer.extend_from_slice(encoded_levels.as_slice());
}
encoded_bytes.len() as u32
}
}
impl DataPageBuilder for DataPageBuilderImpl {
fn add_rep_levels(&mut self, max_levels: i16, rep_levels: &[i16]) {
self.num_values = rep_levels.len() as u32;
self.rep_levels_byte_len = self.add_levels(max_levels, rep_levels);
}
fn add_def_levels(&mut self, max_levels: i16, def_levels: &[i16]) {
assert!(
self.num_values == def_levels.len() as u32,
"Must call `add_rep_levels() first!`"
);
self.def_levels_byte_len = self.add_levels(max_levels, def_levels);
}
fn add_values<T: DataType>(&mut self, encoding: Encoding, values: &[T::T]) {
assert!(
self.num_values >= values.len() as u32,
"num_values: {}, values.len(): {}",
self.num_values,
values.len()
);
self.encoding = Some(encoding);
let mut encoder: Box<Encoder<T>> =
get_encoder::<T>(self.desc.clone(), encoding, self.mem_tracker.clone())
.expect("get_encoder() should be OK");
encoder.put(values).expect("put() should be OK");
let encoded_values = encoder
.flush_buffer()
.expect("consume_buffer() should be OK");
self.buffer.extend_from_slice(encoded_values.data());
}
fn add_indices(&mut self, indices: ByteBufferPtr) {
self.encoding = Some(Encoding::RLE_DICTIONARY);
self.buffer.extend_from_slice(indices.data());
}
fn consume(self) -> Page {
if self.datapage_v2 {
Page::DataPageV2 {
buf: ByteBufferPtr::new(self.buffer),
num_values: self.num_values,
encoding: self.encoding.unwrap(),
num_nulls: 0,
num_rows: self.num_values,
def_levels_byte_len: self.def_levels_byte_len,
rep_levels_byte_len: self.rep_levels_byte_len,
is_compressed: false,
statistics: None,
}
} else {
Page::DataPage {
buf: ByteBufferPtr::new(self.buffer),
num_values: self.num_values,
encoding: self.encoding.unwrap(),
def_level_encoding: Encoding::RLE,
rep_level_encoding: Encoding::RLE,
statistics: None,
}
}
}
}
fn make_pages<T: DataType>(
desc: ColumnDescPtr,
encoding: Encoding,
num_pages: usize,
levels_per_page: usize,
min: T::T,
max: T::T,
def_levels: &mut Vec<i16>,
rep_levels: &mut Vec<i16>,
values: &mut Vec<T::T>,
pages: &mut VecDeque<Page>,
use_v2: bool,
) where
T::T: PartialOrd + SampleRange + Copy,
{
let mut num_values = 0;
let max_def_level = desc.max_def_level();
let max_rep_level = desc.max_rep_level();
let mem_tracker = Rc::new(MemTracker::new());
let mut dict_encoder = DictEncoder::<T>::new(desc.clone(), mem_tracker);
for i in 0..num_pages {
let mut num_values_cur_page = 0;
let level_range = i * levels_per_page..(i + 1) * levels_per_page;
if max_def_level > 0 {
random_numbers_range(levels_per_page, 0, max_def_level + 1, def_levels);
for dl in &def_levels[level_range.clone()] {
if *dl == max_def_level {
num_values_cur_page += 1;
}
}
} else {
num_values_cur_page = levels_per_page;
}
if max_rep_level > 0 {
random_numbers_range(levels_per_page, 0, max_rep_level + 1, rep_levels);
}
random_numbers_range(num_values_cur_page, min, max, values);
let mut pb =
DataPageBuilderImpl::new(desc.clone(), num_values_cur_page as u32, use_v2);
if max_rep_level > 0 {
pb.add_rep_levels(max_rep_level, &rep_levels[level_range.clone()]);
}
if max_def_level > 0 {
pb.add_def_levels(max_def_level, &def_levels[level_range]);
}
let value_range = num_values..num_values + num_values_cur_page;
match encoding {
Encoding::PLAIN_DICTIONARY | Encoding::RLE_DICTIONARY => {
let _ = dict_encoder.put(&values[value_range.clone()]);
let indices = dict_encoder
.write_indices()
.expect("write_indices() should be OK");
pb.add_indices(indices);
},
Encoding::PLAIN => {
pb.add_values::<T>(encoding, &values[value_range]);
},
enc @ _ => panic!("Unexpected encoding {}", enc),
}
let data_page = pb.consume();
pages.push_back(data_page);
num_values += num_values_cur_page;
}
if encoding == Encoding::PLAIN_DICTIONARY || encoding == Encoding::RLE_DICTIONARY {
let dict = dict_encoder
.write_dict()
.expect("write_dict() should be OK");
let dict_page = Page::DictionaryPage {
buf: dict,
num_values: dict_encoder.num_entries() as u32,
encoding: Encoding::RLE_DICTIONARY,
is_sorted: false,
};
pages.push_front(dict_page);
}
}
}