Handling Large Amounts of Data with Parquet – Part 2

Parquet provides various configuration to let the applications control how do they want the library to handle the writes. In this blog post, we will try to go over these configurations and understand how do those configurations have an effect on the overall throughput of the writes / reads / compression.

Parquet provides following configurations which can be tweaked by the application. These configurations can be used by the applications to fit their use case.

  • Row Page Size
  • Compression Codecs
  • Dictionary for Column Values
  • Data Page Size and Dictionary Size

Row Page Size

Row Page Size Threshold decides as to when we need to flush the in-memory data structures to the row group and then append them to the parquet file.

if (memSize > nextRowGroupSize) {
  flushRowGroupToStore();
  initStore();
}

Note: Checking for the Size of the In-Memory Data Structures of the parquet Writer is a bit costly operation. So this operation does not happen for every record which is added, but we carry out this operation after certain k operations. This number k is revisited everytime we are done with those k operations.

Compression Codecs

Parquet 1.10 supports around these 6 compression codecs

  • Snappy
  • GZIP
  • LZO
  • Brotli
  • LZ4
  • ZSTD

Along with these 6 compression codecs, we also have the operation of not applying any compression codec i.e. UNCOMPRESSED. This compression codec ensures that we store the data page bytes as well as dictionary page bytes as it is.

Dictionary for the Column Values

As discussed in the previous blog article, parquet provides the ability to encode the values of a column in a dictionary. The application can control if it needs to enable the dictionary encoding or not. There are various advantages of maintaining the dictionary

  • Due to dictionary encoding, most of the times in practice the total size of the dictionary + encoded values is less than the total size of the actual raw values. This is because most of the times, our values are duplicated across different column values, so creating a dictionary out of them removes duplicate entries from the column values.
  • Due to dictionary encoding, we can provide stats filtering over those column values. We already discussed in the previous blog, that dictionary can be used to provide quick filtering to reject row groups which do not have particular terms in them. Eg. Say a dictionary for a column value does not contain the term “Sheldon”, so we do not need to read each column value to figure out which records contains the term “Sheldon”, we can easily filter out those row groups whose column dictionary does not contain those terms.

Data Page Size and Dictionary Size

Parquet allows us to specify the data page size and dictionary page sizes. Data Page Size and Dictionary Page Size configurations help us define the maximum in memory size for each and every column data values and dictionary values respectively. Dictionary Page Size is also used for deciding whether we need to fall back to the PlainValueWriter for columns instead of the DictionaryValueWriter format.

@Override
public boolean shouldFallBack() {
  // if the dictionary reaches the max byte size or the values can not be encoded on 4 bytes anymore.
  return dictionaryByteSize > maxDictionaryByteSize
      || getDictionarySize() > MAX_DICTIONARY_ENTRIES;
}

 

Encodings In Parquet

Apart from dictionary encoding and plain encoding, parquet uses multiple other encoding schemes to store the data optimally.

  • Run Length Bit Packing Hybrid Encoding
    • This encoding uses a combination of run length + bit packing encoding to store data more efficiently. In parquet, it is used for encoding boolean values.
      Eg. 
      
      We have a list of boolean values say 
      0,0,0,0,0,0,0,0,1,1,1,1,1,1,1,1 (0 = false and 1 = true)
      
      This will get encoded to
      1000,0000,1000,0001
      
      where 1000 => 8 which is number of occurences of 0
            0000 => 0 which is the number getting repeated
            1000 => 8 which is number of occurences of 1
            0001 => 1 which is the number getting repeated

      Note: There are some other nuisances as well which have not been mentioned in this article for simplicity

  • Delta Binary Packing Encoding

Some Common Question’s

Question:  What is the value one should keep for row group size? 
Answer: This totally depends on your application resource requirements. If your application demands to have a lower memory footprint for these parquet Writers, then you have no option other than to keep the row group size to minimal value e.g. 100 KB to 1 MB. But if your application can sustain higher memory pressure, then we can afford to keep this limit even to a higher value say 10 MB. With higher values of row group sizes, we have better compression guarantees and overall better write throughput for each column.

Question: Which compression codec should one choose for my application?
Answer: The compression codec to choose depends on the data you are handling in your application. Compression and Decompression Speed also play a crucial role in deciding the compression algorithm. Some applications can take a hit in compression speed, but they want best compression ratios. There is no “One Size Fit All” compression algorithm. But if I have to choose one, I would go with ZSTD because of the high compression ratios and decompression speed offered by this algorithm. See this for more details.

References

Handling Large Amounts of Data with Parquet – Part 1

In this era of technological advancement, we are producing data like never before. Let’s take for eg. Large Hadron Collider wherein we are producing data at the rate of 1 PB per second. Given we are producing these amounts of data, we require efficient data storage formats which can provide:

  1. High compression ratios for data containing multiple fields
  2. High read throughput for analytics use cases.

Parquet is an accepted solution worldwide to provide these guarantees. Parquet provides better compression ratio as well as better read throughput for analytical queries given its columnar data storage format.  Due to its columnar format, values for particular columns are aligned and stored together which provides

  • Better compression
    • Due to the very similar nature of the data stored side by side. All the column values are stored side by side, which in turn proves really helpful to the different compression algorithms as they perform much better with similar values to compress data from.
  • Better throughput
    • Analytics use cases mostly involved querying on particular dimensions which means with parquet we have the benefit of only extracting fields which are absolutely necessary for the query to execute. e.g. Select count(*) from parquet_files where student_name like “%martin%”. In this query, we will only extract the column student_name and save those precious IO cycles which were wasted in getting the rest of the unnecessary fields.
    • Parquet also stores some metadata information for each of the row chunks which helps us avoid reading the whole block and save precious CPU cycles.

Understanding Parquet Layout

Before looking into the layout of the parquet file, let’s understand these terms.

  • Row Group
    • Column Chunk
      • Data Page
      • Dictionary Page
  • Metadata Information

Blank Diagram (13)

Row Groups

Parquet File is divided into smaller row groups. Each of these row groups contains a subset of rows. The numbers of rows in each of these row groups is governed by the block size specified by us in the ParquetWriter. ParquetWriter keeps on adding rows to a particular row group which is kept in memory. When this memory size crosses some threshold, we start flushing this in memory row groups to a file. So essentially these collections of rows which are flushed together constitute a row group.

Column Chunks

Each of these row groups contains a subset of the rows which are then stored in column chunks. Say the data we are writing to the parquet file contains 4 columns, then all the values of column1 for this subset of rows will be stored continuously followed by the values of column2 for this subset of rows and so on.

Blank Diagram (15)

 

Data Page and Dictionary Page

Each of the column chunks is divided into data page and dictionary page. Dictionary Page is stored in case dictionary encoding is enabled.

In case dictionary encoding is enabled we store the keys in the dictionary and the references to these keys in dataColumns. On flushing, these references in the dataColumns get flushed onto the data pages and the actual keys in the dictionary get flushed onto the dictionary pages. In the case of dictionary encoding not being enabled, we store the actual data in these data columns instead of the references.

Blank Diagram (17)

 

Parquet Metadata Information

Each parquet file contains some metadata information stored with it at the end of the parquet file i.e. footer. This metadata contains information regarding these things

  • Row Groups References
    • Column Chunk References inside of those Row Group References
      • Dictionary Page References inside those column chunks
      • Data Page References inside those column chunks
      • Column Chunk Statistics
      • Column Chunk Encodings
      • Column Chunk Sizes
    • Number of rows in the Row Group
    • Size of the data in the Row Group
  • Some Additional File Metadata

Writing to a Parquet File

As we already explained in the previous sections, parquet stores data in the format of row chunks. These row chunks contain a group of records which are stored in the format of column chunks.

ParquetWriter maintains in memory column values for the last k records, so while writing a record to a parquet file, we often end up writing these values in memory. After memory limit exceeds for these maintained column values, we flush these values to the parquet file. All these records which were buffered in memory constitute a row group.  This is done to save disk IOs and to improve write throughput.

Blank Diagram (19)

Reading from a Parquet File

Before reading the records from the parquet file stream, we need to be aware of the layout of the file. By layout, we mean the following things

  • Row Groups Offsets
  • Column Chunks Offsets within those row groups
  • Data Page and Dictionary Page Offsets

To know this layout, we first read the file metadata. This file metadata is always stored at a known offset in the file. After reading this file metadata, we infer the offsets for the different row groups and the different column chunks stored within those row groups. After getting these offsets, we can iterate over these row groups and column chunks to get the desired values.

Parquet File also offers the capability of filtering these row groups. This filtering for those row groups can be done via

  • Filter Predicates
    • These filter predicates are applied at job submission to see if they can be potentially used to drop entire row groups. This could help us save I/Os which could improve the application performance tremendously. These filter predicates are also applied during column assembly to drop individual records if any.

Examples described below

Filter Predicate: Filter all records which have the term "Sheldon" 
                  for column 1

Row Group Dictionaries
  Row-Group 1, Column 1 -> "1: Leonard, 2: Howard, 3: Raj"
  Row-Group 2, Column 1 -> "1: Penny, 2: Amy, 3: Bernadette"
  Row-Group 3, Column 1 -> "1: Stuart, 2: Memo, 3: Sheldon"

Now with the help of dictionary pages, we can easily filter out 
the row groups which does not have the records which contain the 
term "Sheldon" for column 1

 

Filter Predicate: Filter all records which have the rating > 50 

Row Groups
   Row-Group 1, Column Rating -> (Min -> 20, Max -> 30)
   Row-Group 2, Column Rating -> (Min -> 45, Max -> 80)
   Row-Group 3, Column Rating -> (Min -> 40, Max -> 45)

Now with the help of column page metadatas, we can easily filter out 
the row groups which does not records which have rating > 50.
  • Metadata Filters
    • With the help of metadata filters, you can filter out row groups by looking at their row group metadata fields. Eg. These filters involve filtering row groups via start and end offsets for these row groups. We can create a metadata filter which will make sure to filter the row groups which have the start and end offsets within an offset range.
    • For more details see OffsetMetadataFilter, RangeMetadataFilter

See Next Blog Article for more details on various configurations provided by parquet for encoding your data.

References