use std::{cmp, mem};
use super::rle::{RleDecoder, RleEncoder};
use basic::Encoding;
use data_type::AsBytes;
use errors::{ParquetError, Result};
use util::{
bit_util::{ceil, log2, BitReader, BitWriter},
memory::ByteBufferPtr,
};
#[inline]
pub fn max_buffer_size(
encoding: Encoding,
max_level: i16,
num_buffered_values: usize,
) -> usize
{
let bit_width = log2(max_level as u64 + 1) as u8;
match encoding {
Encoding::RLE => {
RleEncoder::max_buffer_size(bit_width, num_buffered_values)
+ RleEncoder::min_buffer_size(bit_width)
},
Encoding::BIT_PACKED => {
ceil((num_buffered_values * bit_width as usize) as i64, 8) as usize
},
_ => panic!("Unsupported encoding type {}", encoding),
}
}
pub enum LevelEncoder {
RLE(RleEncoder),
RLE_V2(RleEncoder),
BIT_PACKED(u8, BitWriter),
}
impl LevelEncoder {
pub fn v1(encoding: Encoding, max_level: i16, byte_buffer: Vec<u8>) -> Self {
let bit_width = log2(max_level as u64 + 1) as u8;
match encoding {
Encoding::RLE => LevelEncoder::RLE(RleEncoder::new_from_buf(
bit_width,
byte_buffer,
mem::size_of::<i32>(),
)),
Encoding::BIT_PACKED => {
LevelEncoder::BIT_PACKED(bit_width, BitWriter::new_from_buf(byte_buffer, 0))
},
_ => panic!("Unsupported encoding type {}", encoding),
}
}
pub fn v2(max_level: i16, byte_buffer: Vec<u8>) -> Self {
let bit_width = log2(max_level as u64 + 1) as u8;
LevelEncoder::RLE_V2(RleEncoder::new_from_buf(bit_width, byte_buffer, 0))
}
#[inline]
pub fn put(&mut self, buffer: &[i16]) -> Result<usize> {
let mut num_encoded = 0;
match *self {
LevelEncoder::RLE(ref mut encoder) | LevelEncoder::RLE_V2(ref mut encoder) => {
for value in buffer {
if !encoder.put(*value as u64)? {
return Err(general_err!("RLE buffer is full"));
}
num_encoded += 1;
}
encoder.flush()?;
},
LevelEncoder::BIT_PACKED(bit_width, ref mut encoder) => {
for value in buffer {
if !encoder.put_value(*value as u64, bit_width as usize) {
return Err(general_err!("Not enough bytes left"));
}
num_encoded += 1;
}
encoder.flush();
},
}
Ok(num_encoded)
}
#[inline]
pub fn consume(self) -> Result<Vec<u8>> {
match self {
LevelEncoder::RLE(encoder) => {
let mut encoded_data = encoder.consume()?;
let encoded_len = encoded_data.len() - mem::size_of::<i32>();
let len = (encoded_len as i32).to_le();
let len_bytes = len.as_bytes();
encoded_data[0..len_bytes.len()].copy_from_slice(len_bytes);
Ok(encoded_data)
},
LevelEncoder::RLE_V2(encoder) => encoder.consume(),
LevelEncoder::BIT_PACKED(_, encoder) => Ok(encoder.consume()),
}
}
}
pub enum LevelDecoder {
RLE(Option<usize>, RleDecoder),
RLE_V2(Option<usize>, RleDecoder),
BIT_PACKED(Option<usize>, u8, BitReader),
}
impl LevelDecoder {
pub fn v1(encoding: Encoding, max_level: i16) -> Self {
let bit_width = log2(max_level as u64 + 1) as u8;
match encoding {
Encoding::RLE => LevelDecoder::RLE(None, RleDecoder::new(bit_width)),
Encoding::BIT_PACKED => {
LevelDecoder::BIT_PACKED(None, bit_width, BitReader::from(Vec::new()))
},
_ => panic!("Unsupported encoding type {}", encoding),
}
}
pub fn v2(max_level: i16) -> Self {
let bit_width = log2(max_level as u64 + 1) as u8;
LevelDecoder::RLE_V2(None, RleDecoder::new(bit_width))
}
#[inline]
pub fn set_data(&mut self, num_buffered_values: usize, data: ByteBufferPtr) -> usize {
match *self {
LevelDecoder::RLE(ref mut num_values, ref mut decoder) => {
*num_values = Some(num_buffered_values);
let i32_size = mem::size_of::<i32>();
let data_size = read_num_bytes!(i32, i32_size, data.as_ref()) as usize;
decoder.set_data(data.range(i32_size, data_size));
i32_size + data_size
},
LevelDecoder::BIT_PACKED(ref mut num_values, bit_width, ref mut decoder) => {
*num_values = Some(num_buffered_values);
let num_bytes = ceil((num_buffered_values * bit_width as usize) as i64, 8);
let data_size = cmp::min(num_bytes as usize, data.len());
decoder.reset(data.range(data.start(), data_size));
data_size
},
_ => panic!(),
}
}
#[inline]
pub fn set_data_range(
&mut self,
num_buffered_values: usize,
data: &ByteBufferPtr,
start: usize,
len: usize,
) -> usize
{
match *self {
LevelDecoder::RLE_V2(ref mut num_values, ref mut decoder) => {
decoder.set_data(data.range(start, len));
*num_values = Some(num_buffered_values);
len
},
_ => panic!("set_data_range() method is only supported by RLE v2 encoding type"),
}
}
#[inline]
pub fn is_data_set(&self) -> bool {
match self {
LevelDecoder::RLE(ref num_values, _) => num_values.is_some(),
LevelDecoder::RLE_V2(ref num_values, _) => num_values.is_some(),
LevelDecoder::BIT_PACKED(ref num_values, ..) => num_values.is_some(),
}
}
#[inline]
pub fn get(&mut self, buffer: &mut [i16]) -> Result<usize> {
assert!(self.is_data_set(), "No data set for decoding");
match *self {
LevelDecoder::RLE(ref mut num_values, ref mut decoder)
| LevelDecoder::RLE_V2(ref mut num_values, ref mut decoder) => {
let len = cmp::min(num_values.unwrap(), buffer.len());
let values_read = decoder.get_batch::<i16>(&mut buffer[0..len])?;
*num_values = num_values.map(|len| len - values_read);
Ok(values_read)
},
LevelDecoder::BIT_PACKED(ref mut num_values, bit_width, ref mut decoder) => {
let len = cmp::min(num_values.unwrap(), buffer.len());
let values_read =
decoder.get_batch::<i16>(&mut buffer[..len], bit_width as usize);
*num_values = num_values.map(|len| len - values_read);
Ok(values_read)
},
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use util::test_common::random_numbers_range;
fn test_internal_roundtrip(enc: Encoding, levels: &[i16], max_level: i16, v2: bool) {
let size = max_buffer_size(enc, max_level, levels.len());
let mut encoder = if v2 {
LevelEncoder::v2(max_level, vec![0; size])
} else {
LevelEncoder::v1(enc, max_level, vec![0; size])
};
encoder.put(&levels).expect("put() should be OK");
let encoded_levels = encoder.consume().expect("consume() should be OK");
let byte_buf = ByteBufferPtr::new(encoded_levels);
let mut decoder;
if v2 {
decoder = LevelDecoder::v2(max_level);
decoder.set_data_range(levels.len(), &byte_buf, 0, byte_buf.len());
} else {
decoder = LevelDecoder::v1(enc, max_level);
decoder.set_data(levels.len(), byte_buf);
};
let mut buffer = vec![0; levels.len()];
let num_decoded = decoder.get(&mut buffer).expect("get() should be OK");
assert_eq!(num_decoded, levels.len());
assert_eq!(buffer, levels);
}
fn test_internal_roundtrip_incremental(
enc: Encoding,
levels: &[i16],
max_level: i16,
v2: bool,
)
{
let size = max_buffer_size(enc, max_level, levels.len());
let mut encoder = if v2 {
LevelEncoder::v2(max_level, vec![0; size])
} else {
LevelEncoder::v1(enc, max_level, vec![0; size])
};
encoder.put(&levels).expect("put() should be OK");
let encoded_levels = encoder.consume().expect("consume() should be OK");
let byte_buf = ByteBufferPtr::new(encoded_levels);
let mut decoder;
if v2 {
decoder = LevelDecoder::v2(max_level);
decoder.set_data_range(levels.len(), &byte_buf, 0, byte_buf.len());
} else {
decoder = LevelDecoder::v1(enc, max_level);
decoder.set_data(levels.len(), byte_buf);
}
let mut buffer = vec![0; levels.len() * 2];
let mut total_decoded = 0;
let mut safe_stop = levels.len() * 2;
while safe_stop > 0 {
safe_stop -= 1;
let num_decoded = decoder
.get(&mut buffer[total_decoded..total_decoded + 1])
.expect("get() should be OK");
if num_decoded == 0 {
break;
}
total_decoded += num_decoded;
}
assert!(
safe_stop > 0,
"Failed to read values incrementally, reached safe stop"
);
assert_eq!(total_decoded, levels.len());
assert_eq!(&buffer[0..levels.len()], levels);
}
fn test_internal_roundtrip_underflow(
enc: Encoding,
levels: &[i16],
max_level: i16,
v2: bool,
)
{
let size = max_buffer_size(enc, max_level, levels.len());
let mut encoder = if v2 {
LevelEncoder::v2(max_level, vec![0; size])
} else {
LevelEncoder::v1(enc, max_level, vec![0; size])
};
let num_encoded = encoder.put(&levels[0..1]).expect("put() should be OK");
let encoded_levels = encoder.consume().expect("consume() should be OK");
assert_eq!(num_encoded, 1);
let byte_buf = ByteBufferPtr::new(encoded_levels);
let mut decoder;
if v2 {
decoder = LevelDecoder::v2(max_level);
decoder.set_data_range(1, &byte_buf, 0, byte_buf.len());
} else {
decoder = LevelDecoder::v1(enc, max_level);
decoder.set_data(1, byte_buf);
}
let mut buffer = vec![0; levels.len()];
let num_decoded = decoder.get(&mut buffer).expect("get() should be OK");
assert_eq!(num_decoded, num_encoded);
assert_eq!(buffer[0..num_decoded], levels[0..num_decoded]);
}
fn test_internal_roundtrip_overflow(
enc: Encoding,
levels: &[i16],
max_level: i16,
v2: bool,
)
{
let size = max_buffer_size(enc, max_level, levels.len());
let mut encoder = if v2 {
LevelEncoder::v2(max_level, vec![0; size])
} else {
LevelEncoder::v1(enc, max_level, vec![0; size])
};
let mut found_err = false;
for _ in 0..100 {
match encoder.put(&levels) {
Err(err) => {
assert!(format!("{}", err).contains("Not enough bytes left"));
found_err = true;
break;
},
Ok(_) => {},
}
}
if !found_err {
panic!("Failed test: no buffer overflow");
}
}
#[test]
fn test_roundtrip_one() {
let levels = vec![0, 1, 1, 1, 1, 0, 0, 0, 0, 1];
let max_level = 1;
test_internal_roundtrip(Encoding::RLE, &levels, max_level, false);
test_internal_roundtrip(Encoding::BIT_PACKED, &levels, max_level, false);
test_internal_roundtrip(Encoding::RLE, &levels, max_level, true);
}
#[test]
fn test_roundtrip() {
let levels = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
let max_level = 10;
test_internal_roundtrip(Encoding::RLE, &levels, max_level, false);
test_internal_roundtrip(Encoding::BIT_PACKED, &levels, max_level, false);
test_internal_roundtrip(Encoding::RLE, &levels, max_level, true);
}
#[test]
fn test_roundtrip_incremental() {
let levels = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
let max_level = 10;
test_internal_roundtrip_incremental(Encoding::RLE, &levels, max_level, false);
test_internal_roundtrip_incremental(Encoding::BIT_PACKED, &levels, max_level, false);
test_internal_roundtrip_incremental(Encoding::RLE, &levels, max_level, true);
}
#[test]
fn test_roundtrip_all_zeros() {
let levels = vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0];
let max_level = 1;
test_internal_roundtrip(Encoding::RLE, &levels, max_level, false);
test_internal_roundtrip(Encoding::BIT_PACKED, &levels, max_level, false);
test_internal_roundtrip(Encoding::RLE, &levels, max_level, true);
}
#[test]
fn test_roundtrip_random() {
let mut levels = Vec::new();
let max_level = 5;
random_numbers_range::<i16>(120, 0, max_level, &mut levels);
test_internal_roundtrip(Encoding::RLE, &levels, max_level, false);
test_internal_roundtrip(Encoding::BIT_PACKED, &levels, max_level, false);
test_internal_roundtrip(Encoding::RLE, &levels, max_level, true);
}
#[test]
fn test_roundtrip_underflow() {
let levels = vec![1, 1, 2, 3, 2, 1, 1, 2, 3, 1];
let max_level = 3;
test_internal_roundtrip_underflow(Encoding::RLE, &levels, max_level, false);
test_internal_roundtrip_underflow(Encoding::BIT_PACKED, &levels, max_level, false);
test_internal_roundtrip_underflow(Encoding::RLE, &levels, max_level, true);
}
#[test]
fn test_roundtrip_overflow() {
let levels = vec![1, 1, 2, 3, 2, 1, 1, 2, 3, 1];
let max_level = 3;
test_internal_roundtrip_overflow(Encoding::RLE, &levels, max_level, false);
test_internal_roundtrip_overflow(Encoding::BIT_PACKED, &levels, max_level, false);
test_internal_roundtrip_overflow(Encoding::RLE, &levels, max_level, true);
}
#[test]
fn test_rle_decoder_set_data_range() {
let buffer = ByteBufferPtr::new(vec![5, 198, 2, 5, 42, 168, 10, 0, 2, 3, 36, 73]);
let max_rep_level = 1;
let mut decoder = LevelDecoder::v2(max_rep_level);
assert_eq!(decoder.set_data_range(10, &buffer, 0, 3), 3);
let mut result = vec![0; 10];
let num_decoded = decoder.get(&mut result).expect("get() should be OK");
assert_eq!(num_decoded, 10);
assert_eq!(result, vec![0, 1, 1, 0, 0, 0, 1, 1, 0, 1]);
let max_def_level = 2;
let mut decoder = LevelDecoder::v2(max_def_level);
assert_eq!(decoder.set_data_range(10, &buffer, 3, 5), 5);
let mut result = vec![0; 10];
let num_decoded = decoder.get(&mut result).expect("get() should be OK");
assert_eq!(num_decoded, 10);
assert_eq!(result, vec![2, 2, 2, 0, 0, 2, 2, 2, 2, 2]);
}
#[test]
#[should_panic(
expected = "set_data_range() method is only supported by RLE v2 encoding type"
)]
fn test_bit_packed_decoder_set_data_range() {
let buffer = ByteBufferPtr::new(vec![1, 2, 3, 4, 5]);
let max_level = 1;
let mut decoder = LevelDecoder::v1(Encoding::BIT_PACKED, max_level);
decoder.set_data_range(10, &buffer, 0, 3);
}
#[test]
fn test_bit_packed_decoder_set_data() {
let buffer = ByteBufferPtr::new(vec![1, 2, 3, 4, 5]);
let max_level = 1;
let mut decoder = LevelDecoder::v1(Encoding::BIT_PACKED, max_level);
assert_eq!(decoder.set_data(1024, buffer.all()), buffer.len());
assert_eq!(decoder.set_data(3, buffer.all()), 1);
}
#[test]
#[should_panic(expected = "No data set for decoding")]
fn test_rle_level_decoder_get_no_set_data() {
let max_rep_level = 2;
let mut decoder = LevelDecoder::v1(Encoding::RLE, max_rep_level);
let mut buffer = vec![0; 16];
decoder.get(&mut buffer).unwrap();
}
#[test]
#[should_panic(expected = "No data set for decoding")]
fn test_bit_packed_level_decoder_get_no_set_data() {
let max_rep_level = 2;
let mut decoder = LevelDecoder::v1(Encoding::BIT_PACKED, max_rep_level);
let mut buffer = vec![0; 16];
decoder.get(&mut buffer).unwrap();
}
}