use std::fs::File;
use std::io::{Seek, SeekFrom, Write};
use std::rc::Rc;
use basic::PageType;
use byteorder::{LittleEndian, ByteOrder};
use column::page::{CompressedPage, Page, PageWriteSpec, PageWriter};
use column::writer::{ColumnWriter, get_column_writer};
use errors::{ParquetError, Result};
use file::{FOOTER_SIZE, PARQUET_MAGIC};
use file::metadata::*;
use file::properties::WriterPropertiesPtr;
use file::statistics::{to_thrift as statistics_to_thrift};
use parquet_format as parquet;
use schema::types::{self, SchemaDescriptor, SchemaDescPtr, TypePtr};
use thrift::protocol::{TCompactOutputProtocol, TOutputProtocol};
use util::io::{FileSink, Position};
pub trait FileWriter {
fn next_row_group(&mut self) -> Result<Box<RowGroupWriter>>;
fn close_row_group(&mut self, row_group_writer: Box<RowGroupWriter>) -> Result<()>;
fn close(&mut self) -> Result<()>;
}
pub trait RowGroupWriter {
fn next_column(&mut self) -> Result<Option<ColumnWriter>>;
fn close_column(&mut self, column_writer: ColumnWriter) -> Result<()>;
fn close(&mut self) -> Result<RowGroupMetaDataPtr>;
}
pub struct SerializedFileWriter {
file: File,
schema: TypePtr,
descr: SchemaDescPtr,
props: WriterPropertiesPtr,
total_num_rows: u64,
row_groups: Vec<RowGroupMetaDataPtr>,
previous_writer_closed: bool,
is_closed: bool
}
impl SerializedFileWriter {
pub fn new(
mut file: File,
schema: TypePtr,
properties: WriterPropertiesPtr
) -> Result<Self> {
Self::start_file(&mut file)?;
Ok(Self {
file: file,
schema: schema.clone(),
descr: Rc::new(SchemaDescriptor::new(schema)),
props: properties,
total_num_rows: 0,
row_groups: Vec::new(),
previous_writer_closed: true,
is_closed: false
})
}
fn start_file(file: &mut File) -> Result<()> {
file.write(&PARQUET_MAGIC)?;
Ok(())
}
fn finalise_row_group_writer(
&mut self,
mut row_group_writer: Box<RowGroupWriter>
) -> Result<()> {
let row_group_metadata = row_group_writer.close()?;
self.row_groups.push(row_group_metadata);
Ok(())
}
fn write_metadata(&mut self) -> Result<()> {
let file_metadata = parquet::FileMetaData {
version: self.props.writer_version().as_num(),
schema: types::to_thrift(self.schema.as_ref())?,
num_rows: self.total_num_rows as i64,
row_groups: self.row_groups.as_slice().into_iter().map(|v| v.to_thrift()).collect(),
key_value_metadata: None,
created_by: Some(self.props.created_by().to_owned()),
column_orders: None
};
let start_pos = self.file.seek(SeekFrom::Current(0))?;
{
let mut protocol = TCompactOutputProtocol::new(&mut self.file);
file_metadata.write_to_out_protocol(&mut protocol)?;
protocol.flush()?;
}
let end_pos = self.file.seek(SeekFrom::Current(0))?;
let mut footer_buffer: [u8; FOOTER_SIZE] = [0; FOOTER_SIZE];
let metadata_len = (end_pos - start_pos) as i32;
LittleEndian::write_i32(&mut footer_buffer, metadata_len);
(&mut footer_buffer[4..]).write(&PARQUET_MAGIC)?;
self.file.write(&footer_buffer)?;
Ok(())
}
#[inline]
fn assert_closed(&self) -> Result<()> {
if self.is_closed {
Err(general_err!("File writer is closed"))
} else {
Ok(())
}
}
#[inline]
fn assert_previous_writer_closed(&self) -> Result<()> {
if !self.previous_writer_closed {
Err(general_err!("Previous row group writer was not closed"))
} else {
Ok(())
}
}
}
impl FileWriter for SerializedFileWriter {
#[inline]
fn next_row_group(&mut self) -> Result<Box<RowGroupWriter>> {
self.assert_closed()?;
self.assert_previous_writer_closed()?;
let row_group_writer = SerializedRowGroupWriter::new(
self.descr.clone(),
self.props.clone(),
&self.file
);
self.previous_writer_closed = false;
Ok(Box::new(row_group_writer))
}
#[inline]
fn close_row_group(&mut self, row_group_writer: Box<RowGroupWriter>) -> Result<()> {
self.assert_closed()?;
let res = self.finalise_row_group_writer(row_group_writer);
self.previous_writer_closed = res.is_ok();
res
}
#[inline]
fn close(&mut self) -> Result<()> {
self.assert_closed()?;
self.assert_previous_writer_closed()?;
self.write_metadata()?;
self.is_closed = true;
Ok(())
}
}
pub struct SerializedRowGroupWriter {
descr: SchemaDescPtr,
props: WriterPropertiesPtr,
file: File,
total_rows_written: Option<u64>,
total_bytes_written: u64,
column_index: usize,
previous_writer_closed: bool,
row_group_metadata: Option<RowGroupMetaDataPtr>,
column_chunks: Vec<ColumnChunkMetaDataPtr>
}
impl SerializedRowGroupWriter {
pub fn new(
schema_descr: SchemaDescPtr,
properties: WriterPropertiesPtr,
file: &File
) -> Self {
let num_columns = schema_descr.num_columns();
Self {
descr: schema_descr,
props: properties,
file: file.try_clone().unwrap(),
total_rows_written: None,
total_bytes_written: 0,
column_index: 0,
previous_writer_closed: true,
row_group_metadata: None,
column_chunks: Vec::with_capacity(num_columns)
}
}
fn finalise_column_writer(&mut self, writer: ColumnWriter) -> Result<()> {
let (bytes_written, rows_written, metadata) = match writer {
ColumnWriter::BoolColumnWriter(typed) => typed.close()?,
ColumnWriter::Int32ColumnWriter(typed) => typed.close()?,
ColumnWriter::Int64ColumnWriter(typed) => typed.close()?,
ColumnWriter::Int96ColumnWriter(typed) => typed.close()?,
ColumnWriter::FloatColumnWriter(typed) => typed.close()?,
ColumnWriter::DoubleColumnWriter(typed) => typed.close()?,
ColumnWriter::ByteArrayColumnWriter(typed) => typed.close()?,
ColumnWriter::FixedLenByteArrayColumnWriter(typed) => typed.close()?
};
self.total_bytes_written += bytes_written;
self.column_chunks.push(Rc::new(metadata));
if let Some(rows) = self.total_rows_written {
if rows != rows_written {
return Err(general_err!(
"Incorrect number of rows, expected {} != {} rows",
rows,
rows_written
));
}
} else {
self.total_rows_written = Some(rows_written);
}
Ok(())
}
#[inline]
fn assert_closed(&self) -> Result<()> {
if self.row_group_metadata.is_some() {
Err(general_err!("Row group writer is closed"))
} else {
Ok(())
}
}
#[inline]
fn assert_previous_writer_closed(&self) -> Result<()> {
if !self.previous_writer_closed {
Err(general_err!("Previous column writer was not closed"))
} else {
Ok(())
}
}
}
impl RowGroupWriter for SerializedRowGroupWriter {
#[inline]
fn next_column(&mut self) -> Result<Option<ColumnWriter>> {
self.assert_closed()?;
self.assert_previous_writer_closed()?;
if self.column_index >= self.descr.num_columns() {
return Ok(None);
}
let sink = FileSink::new(&self.file);
let page_writer = Box::new(SerializedPageWriter::new(sink));
let column_writer = get_column_writer(
self.descr.column(self.column_index),
self.props.clone(),
page_writer
);
self.column_index += 1;
self.previous_writer_closed = false;
Ok(Some(column_writer))
}
#[inline]
fn close_column(&mut self, column_writer: ColumnWriter) -> Result<()> {
let res = self.finalise_column_writer(column_writer);
self.previous_writer_closed = res.is_ok();
res
}
#[inline]
fn close(&mut self) -> Result<RowGroupMetaDataPtr> {
if self.row_group_metadata.is_none() {
self.assert_previous_writer_closed()?;
let row_group_metadata =
RowGroupMetaData::builder(self.descr.clone())
.set_column_metadata(self.column_chunks.clone())
.set_total_byte_size(self.total_bytes_written as i64)
.set_num_rows(self.total_rows_written.unwrap_or(0) as i64)
.build()?;
self.row_group_metadata = Some(Rc::new(row_group_metadata));
}
let metadata = self.row_group_metadata.as_ref().unwrap().clone();
Ok(metadata)
}
}
pub struct SerializedPageWriter<T: Write + Position> {
sink: T
}
impl<T: Write + Position> SerializedPageWriter<T> {
pub fn new(sink: T) -> Self {
Self { sink: sink }
}
#[inline]
fn serialize_page_header(&mut self, header: parquet::PageHeader) -> Result<usize> {
let start_pos = self.sink.pos();
{
let mut protocol = TCompactOutputProtocol::new(&mut self.sink);
header.write_to_out_protocol(&mut protocol)?;
protocol.flush()?;
}
Ok((self.sink.pos() - start_pos) as usize)
}
#[inline]
fn serialize_column_chunk(&mut self, chunk: parquet::ColumnChunk) -> Result<()> {
let mut protocol = TCompactOutputProtocol::new(&mut self.sink);
chunk.write_to_out_protocol(&mut protocol)?;
protocol.flush()?;
Ok(())
}
}
impl<T: Write + Position> PageWriter for SerializedPageWriter<T> {
fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
let uncompressed_size = page.uncompressed_size();
let compressed_size = page.compressed_size();
let num_values = page.num_values();
let encoding = page.encoding();
let page_type = page.page_type();
let mut page_header = parquet::PageHeader {
type_: page_type.into(),
uncompressed_page_size: uncompressed_size as i32,
compressed_page_size: compressed_size as i32,
crc: None,
data_page_header: None,
index_page_header: None,
dictionary_page_header: None,
data_page_header_v2: None
};
match page.compressed_page() {
&Page::DataPage {
def_level_encoding,
rep_level_encoding,
ref statistics,
..
} => {
let data_page_header = parquet::DataPageHeader {
num_values: num_values as i32,
encoding: encoding.into(),
definition_level_encoding: def_level_encoding.into(),
repetition_level_encoding: rep_level_encoding.into(),
statistics: statistics_to_thrift(statistics.as_ref())
};
page_header.data_page_header = Some(data_page_header);
},
&Page::DataPageV2 {
num_nulls,
num_rows,
def_levels_byte_len,
rep_levels_byte_len,
is_compressed,
ref statistics,
..
} => {
let data_page_header_v2 = parquet::DataPageHeaderV2 {
num_values: num_values as i32,
num_nulls: num_nulls as i32,
num_rows: num_rows as i32,
encoding: encoding.into(),
definition_levels_byte_length: def_levels_byte_len as i32,
repetition_levels_byte_length: rep_levels_byte_len as i32,
is_compressed: Some(is_compressed),
statistics: statistics_to_thrift(statistics.as_ref())
};
page_header.data_page_header_v2 = Some(data_page_header_v2);
},
&Page::DictionaryPage { is_sorted, .. } => {
let dictionary_page_header = parquet::DictionaryPageHeader {
num_values: num_values as i32,
encoding: encoding.into(),
is_sorted: Some(is_sorted)
};
page_header.dictionary_page_header = Some(dictionary_page_header);
}
}
let start_pos = self.sink.pos();
let header_size = self.serialize_page_header(page_header)?;
self.sink.write_all(page.data())?;
let mut spec = PageWriteSpec::new();
spec.page_type = page_type;
spec.uncompressed_size = uncompressed_size + header_size;
spec.compressed_size = compressed_size + header_size;
spec.offset = start_pos;
spec.bytes_written = self.sink.pos() - start_pos;
if page_type == PageType::DATA_PAGE || page_type == PageType::DATA_PAGE_V2 {
spec.num_values = num_values;
}
Ok(spec)
}
fn write_metadata(&mut self, metadata: &ColumnChunkMetaData) -> Result<()> {
self.serialize_column_chunk(metadata.to_thrift())
}
fn close(&mut self) -> Result<()> {
self.sink.flush()?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::error::Error;
use std::io::Cursor;
use super::*;
use basic::{Compression, Encoding, Repetition, Type};
use column::page::PageReader;
use compression::{Codec, create_codec};
use file::properties::WriterProperties;
use file::reader::{FileReader, SerializedFileReader, SerializedPageReader};
use file::statistics::{Statistics, from_thrift, to_thrift};
use record::RowAccessor;
use util::memory::ByteBufferPtr;
use util::test_common::get_temp_file;
#[test]
fn test_file_writer_error_after_close() {
let file = get_temp_file("test_file_writer_error_after_close", &[]);
let schema = Rc::new(types::Type::group_type_builder("schema").build().unwrap());
let props = Rc::new(WriterProperties::builder().build());
let mut writer = SerializedFileWriter::new(file, schema, props).unwrap();
writer.close().unwrap();
{
let res = writer.next_row_group();
assert!(res.is_err());
if let Err(err) = res {
assert_eq!(err.description(), "File writer is closed");
}
}
{
let res = writer.close();
assert!(res.is_err());
if let Err(err) = res {
assert_eq!(err.description(), "File writer is closed");
}
}
}
#[test]
fn test_row_group_writer_error_after_close() {
let file = get_temp_file("test_file_writer_row_group_error_after_close", &[]);
let schema = Rc::new(types::Type::group_type_builder("schema").build().unwrap());
let props = Rc::new(WriterProperties::builder().build());
let mut writer = SerializedFileWriter::new(file, schema, props).unwrap();
let mut row_group_writer = writer.next_row_group().unwrap();
row_group_writer.close().unwrap();
let res = row_group_writer.next_column();
assert!(res.is_err());
if let Err(err) = res {
assert_eq!(err.description(), "Row group writer is closed");
}
}
#[test]
fn test_row_group_writer_error_not_all_columns_written() {
let file = get_temp_file("test_row_group_writer_error_not_all_columns_written", &[]);
let schema = Rc::new(
types::Type::group_type_builder("schema")
.with_fields(&mut vec![
Rc::new(types::Type::primitive_type_builder("col1", Type::INT32)
.build().unwrap())
])
.build()
.unwrap()
);
let props = Rc::new(WriterProperties::builder().build());
let mut writer = SerializedFileWriter::new(file, schema, props).unwrap();
let mut row_group_writer = writer.next_row_group().unwrap();
let res = row_group_writer.close();
assert!(res.is_err());
if let Err(err) = res {
assert_eq!(err.description(), "Column length mismatch: 1 != 0");
}
}
#[test]
fn test_row_group_writer_num_records_mismatch() {
let file = get_temp_file("test_row_group_writer_num_records_mismatch", &[]);
let schema = Rc::new(
types::Type::group_type_builder("schema")
.with_fields(&mut vec![
Rc::new(types::Type::primitive_type_builder("col1", Type::INT32)
.with_repetition(Repetition::REQUIRED)
.build().unwrap()),
Rc::new(types::Type::primitive_type_builder("col2", Type::INT32)
.with_repetition(Repetition::REQUIRED)
.build().unwrap())
])
.build()
.unwrap()
);
let props = Rc::new(WriterProperties::builder().build());
let mut writer = SerializedFileWriter::new(file, schema, props).unwrap();
let mut row_group_writer = writer.next_row_group().unwrap();
let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
if let ColumnWriter::Int32ColumnWriter(ref mut typed) = col_writer {
typed.write_batch(&[1, 2, 3], None, None).unwrap();
}
row_group_writer.close_column(col_writer).unwrap();
let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
if let ColumnWriter::Int32ColumnWriter(ref mut typed) = col_writer {
typed.write_batch(&[1, 2], None, None).unwrap();
}
let res = row_group_writer.close_column(col_writer);
assert!(res.is_err());
if let Err(err) = res {
assert_eq!(err.description(), "Incorrect number of rows, expected 3 != 2 rows");
}
}
#[test]
fn test_file_writer_empty_file() {
let file = get_temp_file("test_file_writer_write_empty_file", &[]);
let schema = Rc::new(
types::Type::group_type_builder("schema")
.with_fields(&mut vec![
Rc::new(types::Type::primitive_type_builder("col1", Type::INT32)
.build().unwrap())
])
.build()
.unwrap()
);
let props = Rc::new(WriterProperties::builder().build());
let mut writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props)
.unwrap();
writer.close().unwrap();
let reader = SerializedFileReader::new(file).unwrap();
assert_eq!(reader.get_row_iter(None).unwrap().count(), 0);
}
#[test]
fn test_file_writer_empty_row_groups() {
let file = get_temp_file("test_file_writer_write_empty_row_groups", &[]);
test_file_roundtrip(file, vec![]);
}
#[test]
fn test_file_writer_single_row_group() {
let file = get_temp_file("test_file_writer_write_single_row_group", &[]);
test_file_roundtrip(file, vec![vec![1, 2, 3, 4, 5]]);
}
#[test]
fn test_file_writer_multiple_row_groups() {
let file = get_temp_file("test_file_writer_write_multiple_row_groups", &[]);
test_file_roundtrip(file, vec![
vec![1, 2, 3, 4, 5],
vec![1, 2, 3],
vec![1],
vec![1, 2, 3, 4, 5, 6]
]);
}
#[test]
fn test_file_writer_multiple_large_row_groups() {
let file = get_temp_file("test_file_writer_multiple_large_row_groups", &[]);
test_file_roundtrip(file, vec![
vec![123; 1024],
vec![124; 1000],
vec![125; 15],
vec![]
]);
}
#[test]
fn test_page_writer_data_pages() {
let pages = vec![
Page::DataPage {
buf: ByteBufferPtr::new(vec![1, 2, 3, 4, 5, 6, 7, 8]),
num_values: 10,
encoding: Encoding::DELTA_BINARY_PACKED,
def_level_encoding: Encoding::RLE,
rep_level_encoding: Encoding::RLE,
statistics: Some(Statistics::int32(Some(1), Some(3), None, 7, true))
},
Page::DataPageV2 {
buf: ByteBufferPtr::new(vec![4; 128]),
num_values: 10,
encoding: Encoding::DELTA_BINARY_PACKED,
num_nulls: 2,
num_rows: 12,
def_levels_byte_len: 24,
rep_levels_byte_len: 32,
is_compressed: false,
statistics: Some(Statistics::int32(Some(1), Some(3), None, 7, true))
}
];
test_page_roundtrip(&pages[..], Compression::SNAPPY, Type::INT32);
test_page_roundtrip(&pages[..], Compression::UNCOMPRESSED, Type::INT32);
}
#[test]
fn test_page_writer_dict_pages() {
let pages = vec![
Page::DictionaryPage {
buf: ByteBufferPtr::new(vec![1, 2, 3, 4, 5]),
num_values: 5,
encoding: Encoding::RLE_DICTIONARY,
is_sorted: false
},
Page::DataPage {
buf: ByteBufferPtr::new(vec![1, 2, 3, 4, 5, 6, 7, 8]),
num_values: 10,
encoding: Encoding::DELTA_BINARY_PACKED,
def_level_encoding: Encoding::RLE,
rep_level_encoding: Encoding::RLE,
statistics: Some(Statistics::int32(Some(1), Some(3), None, 7, true))
},
Page::DataPageV2 {
buf: ByteBufferPtr::new(vec![4; 128]),
num_values: 10,
encoding: Encoding::DELTA_BINARY_PACKED,
num_nulls: 2,
num_rows: 12,
def_levels_byte_len: 24,
rep_levels_byte_len: 32,
is_compressed: false,
statistics: None
}
];
test_page_roundtrip(&pages[..], Compression::SNAPPY, Type::INT32);
test_page_roundtrip(&pages[..], Compression::UNCOMPRESSED, Type::INT32);
}
fn test_page_roundtrip(
pages: &[Page],
codec: Compression,
physical_type: Type
) {
let mut compressed_pages = vec![];
let mut total_num_values = 0i64;
let mut compressor = create_codec(codec).unwrap();
for page in pages {
let uncompressed_len = page.buffer().len();
let compressed_page = match page {
&Page::DataPage {
ref buf,
num_values,
encoding,
def_level_encoding,
rep_level_encoding,
ref statistics
} => {
total_num_values += num_values as i64;
let output_buf = compress_helper(compressor.as_mut(), buf.data());
Page::DataPage {
buf: ByteBufferPtr::new(output_buf),
num_values: num_values,
encoding: encoding,
def_level_encoding: def_level_encoding,
rep_level_encoding: rep_level_encoding,
statistics: from_thrift(physical_type, to_thrift(statistics.as_ref()))
}
},
&Page::DataPageV2 {
ref buf,
num_values,
encoding,
num_nulls,
num_rows,
def_levels_byte_len,
rep_levels_byte_len,
ref statistics,
..
} => {
total_num_values += num_values as i64;
let offset = (def_levels_byte_len + rep_levels_byte_len) as usize;
let cmp_buf = compress_helper(compressor.as_mut(), &buf.data()[offset..]);
let mut output_buf = Vec::from(&buf.data()[..offset]);
output_buf.extend_from_slice(&cmp_buf[..]);
Page::DataPageV2 {
buf: ByteBufferPtr::new(output_buf),
num_values: num_values,
encoding: encoding,
num_nulls: num_nulls,
num_rows: num_rows,
def_levels_byte_len: def_levels_byte_len,
rep_levels_byte_len: rep_levels_byte_len,
is_compressed: compressor.is_some(),
statistics: from_thrift(physical_type, to_thrift(statistics.as_ref()))
}
},
&Page::DictionaryPage {
ref buf,
num_values,
encoding,
is_sorted
} => {
let output_buf = compress_helper(compressor.as_mut(), buf.data());
Page::DictionaryPage {
buf: ByteBufferPtr::new(output_buf),
num_values: num_values,
encoding: encoding,
is_sorted
}
}
};
let compressed_page = CompressedPage::new(compressed_page, uncompressed_len);
compressed_pages.push(compressed_page);
}
let mut buffer: Vec<u8> = vec![];
let mut result_pages: Vec<Page> = vec![];
{
let cursor = Cursor::new(&mut buffer);
let mut page_writer = SerializedPageWriter::new(cursor);
for page in compressed_pages {
page_writer.write_page(page).unwrap();
}
page_writer.close().unwrap();
}
{
let mut page_reader = SerializedPageReader::new(
Cursor::new(&buffer),
total_num_values,
codec,
physical_type
).unwrap();
while let Some(page) = page_reader.get_next_page().unwrap() {
result_pages.push(page);
}
}
assert_eq!(result_pages.len(), pages.len());
for i in 0..result_pages.len() {
assert_page(&result_pages[i], &pages[i]);
}
}
fn compress_helper(compressor: Option<&mut Box<Codec>>, data: &[u8]) -> Vec<u8> {
let mut output_buf = vec![];
if let Some(cmpr) = compressor {
cmpr.compress(data, &mut output_buf).unwrap();
} else {
output_buf.extend_from_slice(data);
}
output_buf
}
fn assert_page(left: &Page, right: &Page) {
assert_eq!(left.page_type(), right.page_type());
assert_eq!(left.buffer().data(), right.buffer().data());
assert_eq!(left.num_values(), right.num_values());
assert_eq!(left.encoding(), right.encoding());
assert_eq!(to_thrift(left.statistics()), to_thrift(right.statistics()));
}
fn test_file_roundtrip(file: File, data: Vec<Vec<i32>>) {
let schema = Rc::new(
types::Type::group_type_builder("schema")
.with_fields(&mut vec![
Rc::new(types::Type::primitive_type_builder("col1", Type::INT32)
.with_repetition(Repetition::REQUIRED)
.build()
.unwrap())
])
.build()
.unwrap()
);
let props = Rc::new(WriterProperties::builder().build());
let mut file_writer = SerializedFileWriter::new(
file.try_clone().unwrap(),
schema,
props
).unwrap();
for subset in &data {
let mut row_group_writer = file_writer.next_row_group().unwrap();
let mut col_writer = row_group_writer.next_column().unwrap();
if let Some(mut writer) = col_writer {
match writer {
ColumnWriter::Int32ColumnWriter(ref mut typed) => {
typed.write_batch(&subset[..], None, None).unwrap();
},
_ => {
unimplemented!();
}
}
row_group_writer.close_column(writer).unwrap();
}
file_writer.close_row_group(row_group_writer).unwrap();
}
file_writer.close().unwrap();
let reader = SerializedFileReader::new(file).unwrap();
assert_eq!(reader.num_row_groups(), data.len());
for i in 0..reader.num_row_groups() {
let row_group_reader = reader.get_row_group(i).unwrap();
let iter = row_group_reader.get_row_iter(None).unwrap();
let res = iter.map(|elem| elem.get_int(0).unwrap()).collect::<Vec<i32>>();
assert_eq!(res, data[i]);
}
}
}