use std::{cmp, marker::PhantomData, mem, slice::from_raw_parts_mut};
use super::rle::RleDecoder;
use basic::*;
use byteorder::{ByteOrder, LittleEndian};
use data_type::*;
use errors::{ParquetError, Result};
use schema::types::ColumnDescPtr;
use util::{
bit_util::BitReader,
memory::{ByteBuffer, ByteBufferPtr},
};
pub trait Decoder<T: DataType> {
fn set_data(&mut self, data: ByteBufferPtr, num_values: usize) -> Result<()>;
fn get(&mut self, buffer: &mut [T::T]) -> Result<usize>;
fn values_left(&self) -> usize;
fn encoding(&self) -> Encoding;
}
pub fn get_decoder<T: DataType>(
descr: ColumnDescPtr,
encoding: Encoding,
) -> Result<Box<Decoder<T>>>
{
let decoder: Box<Decoder<T>> = match encoding {
Encoding::PLAIN => Box::new(PlainDecoder::new(descr.type_length())),
Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => {
return Err(general_err!(
"Cannot initialize this encoding through this function"
));
},
Encoding::RLE => Box::new(RleValueDecoder::new()),
Encoding::DELTA_BINARY_PACKED => Box::new(DeltaBitPackDecoder::new()),
Encoding::DELTA_LENGTH_BYTE_ARRAY => Box::new(DeltaLengthByteArrayDecoder::new()),
Encoding::DELTA_BYTE_ARRAY => Box::new(DeltaByteArrayDecoder::new()),
e => return Err(nyi_err!("Encoding {} is not supported", e)),
};
Ok(decoder)
}
pub struct PlainDecoder<T: DataType> {
num_values: usize,
start: usize,
type_length: i32,
data: Option<ByteBufferPtr>,
bit_reader: Option<BitReader>,
_phantom: PhantomData<T>,
}
impl<T: DataType> PlainDecoder<T> {
pub fn new(type_length: i32) -> Self {
PlainDecoder {
data: None,
bit_reader: None,
type_length,
num_values: 0,
start: 0,
_phantom: PhantomData,
}
}
}
impl<T: DataType> Decoder<T> for PlainDecoder<T> {
#[inline]
default fn set_data(&mut self, data: ByteBufferPtr, num_values: usize) -> Result<()> {
self.num_values = num_values;
self.start = 0;
self.data = Some(data);
Ok(())
}
#[inline]
fn values_left(&self) -> usize { self.num_values }
#[inline]
fn encoding(&self) -> Encoding { Encoding::PLAIN }
#[inline]
default fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
assert!(self.data.is_some());
let data = self.data.as_mut().unwrap();
let num_values = cmp::min(buffer.len(), self.num_values);
let bytes_left = data.len() - self.start;
let bytes_to_decode = mem::size_of::<T::T>() * num_values;
if bytes_left < bytes_to_decode {
return Err(eof_err!("Not enough bytes to decode"));
}
let raw_buffer: &mut [u8] =
unsafe { from_raw_parts_mut(buffer.as_ptr() as *mut u8, bytes_to_decode) };
raw_buffer.copy_from_slice(data.range(self.start, bytes_to_decode).as_ref());
self.start += bytes_to_decode;
self.num_values -= num_values;
Ok(num_values)
}
}
impl Decoder<Int96Type> for PlainDecoder<Int96Type> {
fn get(&mut self, buffer: &mut [Int96]) -> Result<usize> {
assert!(self.data.is_some());
let data = self.data.as_ref().unwrap();
let num_values = cmp::min(buffer.len(), self.num_values);
let bytes_left = data.len() - self.start;
let bytes_to_decode = 12 * num_values;
if bytes_left < bytes_to_decode {
return Err(eof_err!("Not enough bytes to decode"));
}
let data_range = data.range(self.start, bytes_to_decode);
let bytes: &[u8] = data_range.data();
self.start += bytes_to_decode;
let mut pos = 0;
for i in 0..num_values {
let elem0 = LittleEndian::read_u32(&bytes[pos..pos + 4]);
let elem1 = LittleEndian::read_u32(&bytes[pos + 4..pos + 8]);
let elem2 = LittleEndian::read_u32(&bytes[pos + 8..pos + 12]);
buffer[i].set_data(elem0, elem1, elem2);
pos += 12;
}
self.num_values -= num_values;
Ok(num_values)
}
}
impl Decoder<BoolType> for PlainDecoder<BoolType> {
fn set_data(&mut self, data: ByteBufferPtr, num_values: usize) -> Result<()> {
self.num_values = num_values;
self.bit_reader = Some(BitReader::new(data));
Ok(())
}
fn get(&mut self, buffer: &mut [bool]) -> Result<usize> {
assert!(self.bit_reader.is_some());
let bit_reader = self.bit_reader.as_mut().unwrap();
let values_read = bit_reader.get_batch::<bool>(buffer, 1);
self.num_values -= values_read;
Ok(values_read)
}
}
impl Decoder<ByteArrayType> for PlainDecoder<ByteArrayType> {
fn get(&mut self, buffer: &mut [ByteArray]) -> Result<usize> {
assert!(self.data.is_some());
let data = self.data.as_mut().unwrap();
let num_values = cmp::min(buffer.len(), self.num_values);
for i in 0..num_values {
let len: usize =
read_num_bytes!(u32, 4, data.start_from(self.start).as_ref()) as usize;
self.start += mem::size_of::<u32>();
if data.len() < self.start + len {
return Err(eof_err!("Not enough bytes to decode"));
}
buffer[i].set_data(data.range(self.start, len));
self.start += len;
}
self.num_values -= num_values;
Ok(num_values)
}
}
impl Decoder<FixedLenByteArrayType> for PlainDecoder<FixedLenByteArrayType> {
fn get(&mut self, buffer: &mut [ByteArray]) -> Result<usize> {
assert!(self.data.is_some());
assert!(self.type_length > 0);
let data = self.data.as_mut().unwrap();
let type_length = self.type_length as usize;
let num_values = cmp::min(buffer.len(), self.num_values);
for i in 0..num_values {
if data.len() < self.start + type_length {
return Err(eof_err!("Not enough bytes to decode"));
}
buffer[i].set_data(data.range(self.start, type_length));
self.start += type_length;
}
self.num_values -= num_values;
Ok(num_values)
}
}
pub struct DictDecoder<T: DataType> {
dictionary: Vec<T::T>,
has_dictionary: bool,
rle_decoder: Option<RleDecoder>,
num_values: usize,
}
impl<T: DataType> DictDecoder<T> {
pub fn new() -> Self {
Self {
dictionary: vec![],
has_dictionary: false,
rle_decoder: None,
num_values: 0,
}
}
pub fn set_dict(&mut self, mut decoder: Box<Decoder<T>>) -> Result<()> {
let num_values = decoder.values_left();
self.dictionary.resize(num_values, T::T::default());
let _ = decoder.get(&mut self.dictionary)?;
self.has_dictionary = true;
Ok(())
}
}
impl<T: DataType> Decoder<T> for DictDecoder<T> {
fn set_data(&mut self, data: ByteBufferPtr, num_values: usize) -> Result<()> {
let bit_width = data.as_ref()[0];
let mut rle_decoder = RleDecoder::new(bit_width);
rle_decoder.set_data(data.start_from(1));
self.num_values = num_values;
self.rle_decoder = Some(rle_decoder);
Ok(())
}
fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
assert!(self.rle_decoder.is_some());
assert!(self.has_dictionary, "Must call set_dict() first!");
let rle = self.rle_decoder.as_mut().unwrap();
let num_values = cmp::min(buffer.len(), self.num_values);
rle.get_batch_with_dict(&self.dictionary[..], buffer, num_values)
}
fn values_left(&self) -> usize { self.num_values }
fn encoding(&self) -> Encoding { Encoding::RLE_DICTIONARY }
}
pub struct RleValueDecoder<T: DataType> {
values_left: usize,
decoder: Option<RleDecoder>,
_phantom: PhantomData<T>,
}
impl<T: DataType> RleValueDecoder<T> {
pub fn new() -> Self {
Self {
values_left: 0,
decoder: None,
_phantom: PhantomData,
}
}
#[inline]
fn set_data_internal(&mut self, data: ByteBufferPtr, num_values: usize) -> Result<()> {
let i32_size = mem::size_of::<i32>();
let data_size = read_num_bytes!(i32, i32_size, data.as_ref()) as usize;
let rle_decoder = self
.decoder
.as_mut()
.expect("RLE decoder is not initialized");
rle_decoder.set_data(data.range(i32_size, data_size));
self.values_left = num_values;
Ok(())
}
}
impl<T: DataType> Decoder<T> for RleValueDecoder<T> {
#[inline]
default fn set_data(&mut self, _data: ByteBufferPtr, _num_values: usize) -> Result<()> {
panic!("RleValueDecoder only supports BoolType");
}
#[inline]
fn values_left(&self) -> usize { self.values_left }
#[inline]
fn encoding(&self) -> Encoding { Encoding::RLE }
#[inline]
fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
let rle_decoder = self
.decoder
.as_mut()
.expect("RLE decoder is not initialized");
let values_read = rle_decoder.get_batch(buffer)?;
self.values_left -= values_read;
Ok(values_read)
}
}
impl Decoder<BoolType> for RleValueDecoder<BoolType> {
#[inline]
fn set_data(&mut self, data: ByteBufferPtr, num_values: usize) -> Result<()> {
self.decoder = Some(RleDecoder::new(1));
self.set_data_internal(data, num_values)
}
}
pub struct DeltaBitPackDecoder<T: DataType> {
bit_reader: BitReader,
initialized: bool,
num_values: usize,
num_mini_blocks: i64,
values_per_mini_block: usize,
values_current_mini_block: usize,
first_value: i64,
first_value_read: bool,
min_delta: i64,
mini_block_idx: usize,
delta_bit_width: u8,
delta_bit_widths: ByteBuffer,
deltas_in_mini_block: Vec<T::T>,
use_batch: bool,
current_value: i64,
_phantom: PhantomData<T>,
}
impl<T: DataType> DeltaBitPackDecoder<T> {
pub fn new() -> Self {
Self {
bit_reader: BitReader::from(vec![]),
initialized: false,
num_values: 0,
num_mini_blocks: 0,
values_per_mini_block: 0,
values_current_mini_block: 0,
first_value: 0,
first_value_read: false,
min_delta: 0,
mini_block_idx: 0,
delta_bit_width: 0,
delta_bit_widths: ByteBuffer::new(),
deltas_in_mini_block: vec![],
use_batch: mem::size_of::<T::T>() == 4,
current_value: 0,
_phantom: PhantomData,
}
}
pub fn get_offset(&self) -> usize {
assert!(self.initialized, "Bit reader is not initialized");
self.bit_reader.get_byte_offset()
}
#[inline]
fn init_block(&mut self) -> Result<()> {
self.min_delta = self
.bit_reader
.get_zigzag_vlq_int()
.ok_or(eof_err!("Not enough data to decode 'min_delta'"))?;
let mut widths = vec![];
for _ in 0..self.num_mini_blocks {
let w = self
.bit_reader
.get_aligned::<u8>(1)
.ok_or(eof_err!("Not enough data to decode 'width'"))?;
widths.push(w);
}
self.delta_bit_widths.set_data(widths);
self.mini_block_idx = 0;
self.delta_bit_width = self.delta_bit_widths.data()[0];
self.values_current_mini_block = self.values_per_mini_block;
Ok(())
}
#[inline]
fn load_deltas_in_mini_block(&mut self) -> Result<()> {
self.deltas_in_mini_block.clear();
if self.use_batch {
self
.deltas_in_mini_block
.resize(self.values_current_mini_block, T::T::default());
let loaded = self.bit_reader.get_batch::<T::T>(
&mut self.deltas_in_mini_block[..],
self.delta_bit_width as usize,
);
assert!(loaded == self.values_current_mini_block);
} else {
for _ in 0..self.values_current_mini_block {
let delta = self
.bit_reader
.get_value::<T::T>(self.delta_bit_width as usize)
.ok_or(eof_err!("Not enough data to decode 'delta'"))?;
self.deltas_in_mini_block.push(delta);
}
}
Ok(())
}
}
impl<T: DataType> Decoder<T> for DeltaBitPackDecoder<T> {
#[inline]
default fn set_data(&mut self, data: ByteBufferPtr, _: usize) -> Result<()> {
self.bit_reader = BitReader::new(data);
self.initialized = true;
let block_size = self
.bit_reader
.get_vlq_int()
.ok_or(eof_err!("Not enough data to decode 'block_size'"))?;
self.num_mini_blocks = self
.bit_reader
.get_vlq_int()
.ok_or(eof_err!("Not enough data to decode 'num_mini_blocks'"))?;
self.num_values = self
.bit_reader
.get_vlq_int()
.ok_or(eof_err!("Not enough data to decode 'num_values'"))?
as usize;
self.first_value = self
.bit_reader
.get_zigzag_vlq_int()
.ok_or(eof_err!("Not enough data to decode 'first_value'"))?;
self.first_value_read = false;
self.mini_block_idx = 0;
self.delta_bit_widths.clear();
self.values_current_mini_block = 0;
self.values_per_mini_block = (block_size / self.num_mini_blocks) as usize;
assert!(self.values_per_mini_block % 8 == 0);
Ok(())
}
default fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
assert!(self.initialized, "Bit reader is not initialized");
let num_values = cmp::min(buffer.len(), self.num_values);
for i in 0..num_values {
if !self.first_value_read {
self.set_decoded_value(buffer, i, self.first_value);
self.current_value = self.first_value;
self.first_value_read = true;
continue;
}
if self.values_current_mini_block == 0 {
self.mini_block_idx += 1;
if self.mini_block_idx < self.delta_bit_widths.size() {
self.delta_bit_width = self.delta_bit_widths.data()[self.mini_block_idx];
self.values_current_mini_block = self.values_per_mini_block;
} else {
self.init_block()?;
}
self.load_deltas_in_mini_block()?;
}
let delta =
self.get_delta(self.deltas_in_mini_block.len() - self.values_current_mini_block);
self.current_value = self.current_value.wrapping_add(self.min_delta);
self.current_value = self.current_value.wrapping_add(delta as i64);
self.set_decoded_value(buffer, i, self.current_value);
self.values_current_mini_block -= 1;
}
self.num_values -= num_values;
Ok(num_values)
}
fn values_left(&self) -> usize { self.num_values }
fn encoding(&self) -> Encoding { Encoding::DELTA_BINARY_PACKED }
}
trait DeltaBitPackDecoderConversion<T: DataType> {
#[inline]
fn get_delta(&self, index: usize) -> i64;
#[inline]
fn set_decoded_value(&self, buffer: &mut [T::T], index: usize, value: i64);
}
impl<T: DataType> DeltaBitPackDecoderConversion<T> for DeltaBitPackDecoder<T> {
#[inline]
default fn get_delta(&self, _: usize) -> i64 {
panic!("DeltaBitPackDecoder only supports Int32Type and Int64Type")
}
#[inline]
default fn set_decoded_value(&self, _: &mut [T::T], _: usize, _: i64) {
panic!("DeltaBitPackDecoder only supports Int32Type and Int64Type")
}
}
impl DeltaBitPackDecoderConversion<Int32Type> for DeltaBitPackDecoder<Int32Type> {
#[inline]
fn get_delta(&self, index: usize) -> i64 { self.deltas_in_mini_block[index] as i64 }
#[inline]
fn set_decoded_value(&self, buffer: &mut [i32], index: usize, value: i64) {
buffer[index] = value as i32;
}
}
impl DeltaBitPackDecoderConversion<Int64Type> for DeltaBitPackDecoder<Int64Type> {
#[inline]
fn get_delta(&self, index: usize) -> i64 { self.deltas_in_mini_block[index] }
#[inline]
fn set_decoded_value(&self, buffer: &mut [i64], index: usize, value: i64) {
buffer[index] = value;
}
}
pub struct DeltaLengthByteArrayDecoder<T: DataType> {
lengths: Vec<i32>,
current_idx: usize,
data: Option<ByteBufferPtr>,
offset: usize,
num_values: usize,
_phantom: PhantomData<T>,
}
impl<T: DataType> DeltaLengthByteArrayDecoder<T> {
pub fn new() -> Self {
Self {
lengths: vec![],
current_idx: 0,
data: None,
offset: 0,
num_values: 0,
_phantom: PhantomData,
}
}
}
impl<T: DataType> Decoder<T> for DeltaLengthByteArrayDecoder<T> {
default fn set_data(&mut self, _: ByteBufferPtr, _: usize) -> Result<()> {
Err(general_err!(
"DeltaLengthByteArrayDecoder only support ByteArrayType"
))
}
default fn get(&mut self, _: &mut [T::T]) -> Result<usize> {
Err(general_err!(
"DeltaLengthByteArrayDecoder only support ByteArrayType"
))
}
fn values_left(&self) -> usize { self.num_values }
fn encoding(&self) -> Encoding { Encoding::DELTA_LENGTH_BYTE_ARRAY }
}
impl Decoder<ByteArrayType> for DeltaLengthByteArrayDecoder<ByteArrayType> {
fn set_data(&mut self, data: ByteBufferPtr, num_values: usize) -> Result<()> {
let mut len_decoder = DeltaBitPackDecoder::<Int32Type>::new();
len_decoder.set_data(data.all(), num_values)?;
let num_lengths = len_decoder.values_left();
self.lengths.resize(num_lengths, 0);
len_decoder.get(&mut self.lengths[..])?;
self.data = Some(data.start_from(len_decoder.get_offset()));
self.offset = 0;
self.current_idx = 0;
self.num_values = num_lengths;
Ok(())
}
fn get(&mut self, buffer: &mut [ByteArray]) -> Result<usize> {
assert!(self.data.is_some());
let data = self.data.as_ref().unwrap();
let num_values = cmp::min(buffer.len(), self.num_values);
for i in 0..num_values {
let len = self.lengths[self.current_idx] as usize;
buffer[i].set_data(data.range(self.offset, len));
self.offset += len;
self.current_idx += 1;
}
self.num_values -= num_values;
Ok(num_values)
}
}
pub struct DeltaByteArrayDecoder<T: DataType> {
prefix_lengths: Vec<i32>,
current_idx: usize,
suffix_decoder: Option<DeltaLengthByteArrayDecoder<ByteArrayType>>,
previous_value: Vec<u8>,
num_values: usize,
_phantom: PhantomData<T>,
}
impl<T: DataType> DeltaByteArrayDecoder<T> {
pub fn new() -> Self {
Self {
prefix_lengths: vec![],
current_idx: 0,
suffix_decoder: None,
previous_value: vec![],
num_values: 0,
_phantom: PhantomData,
}
}
}
impl<'m, T: DataType> Decoder<T> for DeltaByteArrayDecoder<T> {
default fn set_data(&mut self, _: ByteBufferPtr, _: usize) -> Result<()> {
Err(general_err!(
"DeltaByteArrayDecoder only supports ByteArrayType and FixedLenByteArrayType"
))
}
default fn get(&mut self, _: &mut [T::T]) -> Result<usize> {
Err(general_err!(
"DeltaByteArrayDecoder only supports ByteArrayType and FixedLenByteArrayType"
))
}
fn values_left(&self) -> usize { self.num_values }
fn encoding(&self) -> Encoding { Encoding::DELTA_BYTE_ARRAY }
}
impl Decoder<ByteArrayType> for DeltaByteArrayDecoder<ByteArrayType> {
fn set_data(&mut self, data: ByteBufferPtr, num_values: usize) -> Result<()> {
let mut prefix_len_decoder = DeltaBitPackDecoder::<Int32Type>::new();
prefix_len_decoder.set_data(data.all(), num_values)?;
let num_prefixes = prefix_len_decoder.values_left();
self.prefix_lengths.resize(num_prefixes, 0);
prefix_len_decoder.get(&mut self.prefix_lengths[..])?;
let mut suffix_decoder = DeltaLengthByteArrayDecoder::new();
suffix_decoder
.set_data(data.start_from(prefix_len_decoder.get_offset()), num_values)?;
self.suffix_decoder = Some(suffix_decoder);
self.num_values = num_prefixes;
self.current_idx = 0;
self.previous_value.clear();
Ok(())
}
fn get(&mut self, buffer: &mut [ByteArray]) -> Result<usize> {
assert!(self.suffix_decoder.is_some());
let num_values = cmp::min(buffer.len(), self.num_values);
let mut v: [ByteArray; 1] = [ByteArray::new(); 1];
for i in 0..num_values {
let suffix_decoder = self.suffix_decoder.as_mut().unwrap();
suffix_decoder.get(&mut v[..])?;
let suffix = v[0].data();
let prefix_len = self.prefix_lengths[self.current_idx] as usize;
let mut result = Vec::new();
result.extend_from_slice(&self.previous_value[0..prefix_len]);
result.extend_from_slice(suffix);
let data = ByteBufferPtr::new(result.clone());
buffer[i].set_data(data);
self.previous_value = result;
self.current_idx += 1;
}
self.num_values -= num_values;
Ok(num_values)
}
}
impl Decoder<FixedLenByteArrayType> for DeltaByteArrayDecoder<FixedLenByteArrayType> {
fn set_data(&mut self, data: ByteBufferPtr, num_values: usize) -> Result<()> {
let s: &mut DeltaByteArrayDecoder<ByteArrayType> = unsafe { mem::transmute(self) };
s.set_data(data, num_values)
}
fn get(&mut self, buffer: &mut [ByteArray]) -> Result<usize> {
let s: &mut DeltaByteArrayDecoder<ByteArrayType> = unsafe { mem::transmute(self) };
s.get(buffer)
}
}
#[cfg(test)]
mod tests {
use super::{super::encoding::*, *};
use schema::types::{ColumnDescPtr, ColumnDescriptor, ColumnPath, Type as SchemaType};
use std::{mem, rc::Rc};
use util::{bit_util::set_array_bit, memory::MemTracker, test_common::RandGen};
#[test]
fn test_get_decoders() {
create_and_check_decoder::<Int32Type>(Encoding::PLAIN, None);
create_and_check_decoder::<Int32Type>(Encoding::DELTA_BINARY_PACKED, None);
create_and_check_decoder::<Int32Type>(Encoding::DELTA_LENGTH_BYTE_ARRAY, None);
create_and_check_decoder::<Int32Type>(Encoding::DELTA_BYTE_ARRAY, None);
create_and_check_decoder::<BoolType>(Encoding::RLE, None);
create_and_check_decoder::<Int32Type>(
Encoding::RLE_DICTIONARY,
Some(general_err!(
"Cannot initialize this encoding through this function"
)),
);
create_and_check_decoder::<Int32Type>(
Encoding::PLAIN_DICTIONARY,
Some(general_err!(
"Cannot initialize this encoding through this function"
)),
);
create_and_check_decoder::<Int32Type>(
Encoding::BIT_PACKED,
Some(nyi_err!("Encoding BIT_PACKED is not supported")),
);
}
#[test]
fn test_plain_decode_int32() {
let data = vec![42, 18, 52];
let data_bytes = Int32Type::to_byte_array(&data[..]);
let mut buffer = vec![0; 3];
test_plain_decode::<Int32Type>(
ByteBufferPtr::new(data_bytes),
3,
-1,
&mut buffer[..],
&data[..],
);
}
#[test]
fn test_plain_decode_int64() {
let data = vec![42, 18, 52];
let data_bytes = Int64Type::to_byte_array(&data[..]);
let mut buffer = vec![0; 3];
test_plain_decode::<Int64Type>(
ByteBufferPtr::new(data_bytes),
3,
-1,
&mut buffer[..],
&data[..],
);
}
#[test]
fn test_plain_decode_float() {
let data = vec![3.14, 2.414, 12.51];
let data_bytes = FloatType::to_byte_array(&data[..]);
let mut buffer = vec![0.0; 3];
test_plain_decode::<FloatType>(
ByteBufferPtr::new(data_bytes),
3,
-1,
&mut buffer[..],
&data[..],
);
}
#[test]
fn test_plain_decode_double() {
let data = vec![3.14f64, 2.414f64, 12.51f64];
let data_bytes = DoubleType::to_byte_array(&data[..]);
let mut buffer = vec![0.0f64; 3];
test_plain_decode::<DoubleType>(
ByteBufferPtr::new(data_bytes),
3,
-1,
&mut buffer[..],
&data[..],
);
}
#[test]
fn test_plain_decode_int96() {
let mut data = vec![Int96::new(); 4];
data[0].set_data(11, 22, 33);
data[1].set_data(44, 55, 66);
data[2].set_data(10, 20, 30);
data[3].set_data(40, 50, 60);
let data_bytes = Int96Type::to_byte_array(&data[..]);
let mut buffer = vec![Int96::new(); 4];
test_plain_decode::<Int96Type>(
ByteBufferPtr::new(data_bytes),
4,
-1,
&mut buffer[..],
&data[..],
);
}
#[test]
fn test_plain_decode_bool() {
let data = vec![
false, true, false, false, true, false, true, true, false, true,
];
let data_bytes = BoolType::to_byte_array(&data[..]);
let mut buffer = vec![false; 10];
test_plain_decode::<BoolType>(
ByteBufferPtr::new(data_bytes),
10,
-1,
&mut buffer[..],
&data[..],
);
}
#[test]
fn test_plain_decode_byte_array() {
let mut data = vec![ByteArray::new(); 2];
data[0].set_data(ByteBufferPtr::new(String::from("hello").into_bytes()));
data[1].set_data(ByteBufferPtr::new(String::from("parquet").into_bytes()));
let data_bytes = ByteArrayType::to_byte_array(&data[..]);
let mut buffer = vec![ByteArray::new(); 2];
test_plain_decode::<ByteArrayType>(
ByteBufferPtr::new(data_bytes),
2,
-1,
&mut buffer[..],
&data[..],
);
}
#[test]
fn test_plain_decode_fixed_len_byte_array() {
let mut data = vec![ByteArray::default(); 3];
data[0].set_data(ByteBufferPtr::new(String::from("bird").into_bytes()));
data[1].set_data(ByteBufferPtr::new(String::from("come").into_bytes()));
data[2].set_data(ByteBufferPtr::new(String::from("flow").into_bytes()));
let data_bytes = FixedLenByteArrayType::to_byte_array(&data[..]);
let mut buffer = vec![ByteArray::default(); 3];
test_plain_decode::<FixedLenByteArrayType>(
ByteBufferPtr::new(data_bytes),
3,
4,
&mut buffer[..],
&data[..],
);
}
#[test]
#[should_panic(expected = "RleValueEncoder only supports BoolType")]
fn test_rle_value_encode_int32_not_supported() {
let mut encoder = RleValueEncoder::<Int32Type>::new();
encoder.put(&vec![1, 2, 3, 4]).unwrap();
}
#[test]
#[should_panic(expected = "RleValueDecoder only supports BoolType")]
fn test_rle_value_decode_int32_not_supported() {
let mut decoder = RleValueDecoder::<Int32Type>::new();
decoder
.set_data(ByteBufferPtr::new(vec![5, 0, 0, 0]), 1)
.unwrap();
}
#[test]
fn test_rle_value_decode_bool_decode() {
let data = vec![
BoolType::gen_vec(-1, 256),
BoolType::gen_vec(-1, 257),
BoolType::gen_vec(-1, 126),
];
test_rle_value_decode::<BoolType>(data);
}
#[test]
#[should_panic(expected = "Bit reader is not initialized")]
fn test_delta_bit_packed_not_initialized_offset() {
let decoder = DeltaBitPackDecoder::<Int32Type>::new();
decoder.get_offset();
}
#[test]
#[should_panic(expected = "Bit reader is not initialized")]
fn test_delta_bit_packed_not_initialized_get() {
let mut decoder = DeltaBitPackDecoder::<Int32Type>::new();
let mut buffer = vec![];
decoder.get(&mut buffer).unwrap();
}
#[test]
fn test_delta_bit_packed_int32_empty() {
let data = vec![vec![0; 0]];
test_delta_bit_packed_decode::<Int32Type>(data);
}
#[test]
fn test_delta_bit_packed_int32_repeat() {
let block_data = vec![
1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4,
5, 6, 7, 8,
];
test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
}
#[test]
fn test_delta_bit_packed_int32_uneven() {
let block_data = vec![1, -2, 3, -4, 5, 6, 7, 8, 9, 10, 11];
test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
}
#[test]
fn test_delta_bit_packed_int32_same_values() {
let block_data = vec![
127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127,
];
test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
let block_data = vec![
-127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127,
-127, -127,
];
test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
}
#[test]
fn test_delta_bit_packed_int32_min_max() {
let block_data = vec![
i32::min_value(),
i32::max_value(),
i32::min_value(),
i32::max_value(),
i32::min_value(),
i32::max_value(),
i32::min_value(),
i32::max_value(),
];
test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
}
#[test]
fn test_delta_bit_packed_int32_multiple_blocks() {
let data = vec![
Int32Type::gen_vec(-1, 64),
Int32Type::gen_vec(-1, 128),
Int32Type::gen_vec(-1, 64),
];
test_delta_bit_packed_decode::<Int32Type>(data);
}
#[test]
fn test_delta_bit_packed_int32_data_across_blocks() {
let data = vec![Int32Type::gen_vec(-1, 256), Int32Type::gen_vec(-1, 257)];
test_delta_bit_packed_decode::<Int32Type>(data);
}
#[test]
fn test_delta_bit_packed_int32_with_empty_blocks() {
let data = vec![
Int32Type::gen_vec(-1, 128),
vec![0; 0],
Int32Type::gen_vec(-1, 64),
];
test_delta_bit_packed_decode::<Int32Type>(data);
}
#[test]
fn test_delta_bit_packed_int64_empty() {
let data = vec![vec![0; 0]];
test_delta_bit_packed_decode::<Int64Type>(data);
}
#[test]
fn test_delta_bit_packed_int64_min_max() {
let block_data = vec![
i64::min_value(),
i64::max_value(),
i64::min_value(),
i64::max_value(),
i64::min_value(),
i64::max_value(),
i64::min_value(),
i64::max_value(),
];
test_delta_bit_packed_decode::<Int64Type>(vec![block_data]);
}
#[test]
fn test_delta_bit_packed_int64_multiple_blocks() {
let data = vec![
Int64Type::gen_vec(-1, 64),
Int64Type::gen_vec(-1, 128),
Int64Type::gen_vec(-1, 64),
];
test_delta_bit_packed_decode::<Int64Type>(data);
}
#[test]
fn test_delta_bit_packed_decoder_sample() {
let data_bytes = vec![
128, 1, 4, 3, 58, 28, 6, 0, 0, 0, 0, 8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0,
];
let buffer = ByteBufferPtr::new(data_bytes);
let mut decoder: DeltaBitPackDecoder<Int32Type> = DeltaBitPackDecoder::new();
decoder.set_data(buffer, 3).unwrap();
assert_eq!(decoder.get_offset(), 5);
let mut result = vec![0, 0, 0];
decoder.get(&mut result).unwrap();
assert_eq!(decoder.get_offset(), 34);
assert_eq!(result, vec![29, 43, 89]);
}
#[test]
fn test_delta_byte_array_same_arrays() {
let data = vec![
vec![ByteArray::from(vec![1, 2, 3, 4, 5, 6])],
vec![
ByteArray::from(vec![1, 2, 3, 4, 5, 6]),
ByteArray::from(vec![1, 2, 3, 4, 5, 6]),
],
vec![
ByteArray::from(vec![1, 2, 3, 4, 5, 6]),
ByteArray::from(vec![1, 2, 3, 4, 5, 6]),
],
];
test_delta_byte_array_decode(data);
}
#[test]
fn test_delta_byte_array_unique_arrays() {
let data = vec![
vec![ByteArray::from(vec![1])],
vec![ByteArray::from(vec![2, 3]), ByteArray::from(vec![4, 5, 6])],
vec![
ByteArray::from(vec![7, 8]),
ByteArray::from(vec![9, 0, 1, 2]),
],
];
test_delta_byte_array_decode(data);
}
#[test]
fn test_delta_byte_array_single_array() {
let data = vec![vec![ByteArray::from(vec![1, 2, 3, 4, 5, 6])]];
test_delta_byte_array_decode(data);
}
fn test_plain_decode<T: DataType>(
data: ByteBufferPtr,
num_values: usize,
type_length: i32,
buffer: &mut [T::T],
expected: &[T::T],
)
{
let mut decoder: PlainDecoder<T> = PlainDecoder::new(type_length);
let result = decoder.set_data(data, num_values);
assert!(result.is_ok());
let result = decoder.get(&mut buffer[..]);
assert!(result.is_ok());
assert_eq!(decoder.values_left(), 0);
assert_eq!(buffer, expected);
}
fn test_rle_value_decode<T: DataType>(data: Vec<Vec<T::T>>) {
test_encode_decode::<T>(data, Encoding::RLE);
}
fn test_delta_bit_packed_decode<T: DataType>(data: Vec<Vec<T::T>>) {
test_encode_decode::<T>(data, Encoding::DELTA_BINARY_PACKED);
}
fn test_delta_byte_array_decode(data: Vec<Vec<ByteArray>>) {
test_encode_decode::<ByteArrayType>(data, Encoding::DELTA_BYTE_ARRAY);
}
fn test_encode_decode<T: DataType>(data: Vec<Vec<T::T>>, encoding: Encoding) {
let col_descr = create_test_col_desc_ptr(-1, T::get_physical_type());
let mut encoder =
get_encoder::<T>(col_descr.clone(), encoding, Rc::new(MemTracker::new()))
.expect("get encoder");
for v in &data[..] {
encoder.put(&v[..]).expect("ok to encode");
}
let bytes = encoder.flush_buffer().expect("ok to flush buffer");
let expected: Vec<T::T> = data.iter().flat_map(|s| s.clone()).collect();
let mut decoder = get_decoder::<T>(col_descr.clone(), encoding).expect("get decoder");
let mut result = vec![T::T::default(); expected.len()];
decoder
.set_data(bytes, expected.len())
.expect("ok to set data");
let mut result_num_values = 0;
while decoder.values_left() > 0 {
result_num_values += decoder
.get(&mut result[result_num_values..])
.expect("ok to decode");
}
assert_eq!(result_num_values, expected.len());
assert_eq!(result, expected);
}
fn create_and_check_decoder<T: DataType>(
encoding: Encoding,
err: Option<ParquetError>,
)
{
let descr = create_test_col_desc_ptr(-1, T::get_physical_type());
let decoder = get_decoder::<T>(descr, encoding);
match err {
Some(parquet_error) => {
assert!(decoder.is_err());
assert_eq!(decoder.err().unwrap(), parquet_error);
},
None => {
assert!(decoder.is_ok());
assert_eq!(decoder.unwrap().encoding(), encoding);
},
}
}
fn create_test_col_desc_ptr(type_len: i32, t: Type) -> ColumnDescPtr {
let ty = SchemaType::primitive_type_builder("t", t)
.with_length(type_len)
.build()
.unwrap();
Rc::new(ColumnDescriptor::new(
Rc::new(ty),
None,
0,
0,
ColumnPath::new(vec![]),
))
}
fn usize_to_bytes(v: usize) -> [u8; 4] {
unsafe { mem::transmute::<u32, [u8; 4]>(v as u32) }
}
trait ToByteArray<T: DataType> {
fn to_byte_array(data: &[T::T]) -> Vec<u8>;
}
impl<T> ToByteArray<T> for T
where T: DataType
{
default fn to_byte_array(data: &[T::T]) -> Vec<u8> {
let mut v = vec![];
let type_len = ::std::mem::size_of::<T::T>();
v.extend_from_slice(unsafe {
::std::slice::from_raw_parts(data.as_ptr() as *const u8, data.len() * type_len)
});
v
}
}
impl ToByteArray<BoolType> for BoolType {
fn to_byte_array(data: &[bool]) -> Vec<u8> {
let mut v = vec![];
for i in 0..data.len() {
if i % 8 == 0 {
v.push(0);
}
if data[i] {
set_array_bit(&mut v[..], i);
}
}
v
}
}
impl ToByteArray<Int96Type> for Int96Type {
fn to_byte_array(data: &[Int96]) -> Vec<u8> {
let mut v = vec![];
for d in data {
unsafe {
let copy = ::std::slice::from_raw_parts(d.data().as_ptr() as *const u8, 12);
v.extend_from_slice(copy);
};
}
v
}
}
impl ToByteArray<ByteArrayType> for ByteArrayType {
fn to_byte_array(data: &[ByteArray]) -> Vec<u8> {
let mut v = vec![];
for d in data {
let buf = d.data();
let len = &usize_to_bytes(buf.len());
v.extend_from_slice(len);
v.extend(buf);
}
v
}
}
impl ToByteArray<FixedLenByteArrayType> for FixedLenByteArrayType {
fn to_byte_array(data: &[ByteArray]) -> Vec<u8> {
let mut v = vec![];
for d in data {
let buf = d.data();
v.extend(buf);
}
v
}
}
}