Handling Large Amounts of Data with Parquet – Part 2

Parquet provides various configuration to let the applications control how do they want the library to handle the writes. In this blog post, we will try to go over these configurations and understand how do those configurations have an effect on the overall throughput of the writes / reads / compression.

Parquet provides following configurations which can be tweaked by the application. These configurations can be used by the applications to fit their use case.

  • Row Page Size
  • Compression Codecs
  • Dictionary for Column Values
  • Data Page Size and Dictionary Size

Row Page Size

Row Page Size Threshold decides as to when we need to flush the in-memory data structures to the row group and then append them to the parquet file.

if (memSize > nextRowGroupSize) {
  flushRowGroupToStore();
  initStore();
}

Note: Checking for the Size of the In-Memory Data Structures of the parquet Writer is a bit costly operation. So this operation does not happen for every record which is added, but we carry out this operation after certain k operations. This number k is revisited everytime we are done with those k operations.

Compression Codecs

Parquet 1.10 supports around these 6 compression codecs

  • Snappy
  • GZIP
  • LZO
  • Brotli
  • LZ4
  • ZSTD

Along with these 6 compression codecs, we also have the operation of not applying any compression codec i.e. UNCOMPRESSED. This compression codec ensures that we store the data page bytes as well as dictionary page bytes as it is.

Dictionary for the Column Values

As discussed in the previous blog article, parquet provides the ability to encode the values of a column in a dictionary. The application can control if it needs to enable the dictionary encoding or not. There are various advantages of maintaining the dictionary

  • Due to dictionary encoding, most of the times in practice the total size of the dictionary + encoded values is less than the total size of the actual raw values. This is because most of the times, our values are duplicated across different column values, so creating a dictionary out of them removes duplicate entries from the column values.
  • Due to dictionary encoding, we can provide stats filtering over those column values. We already discussed in the previous blog, that dictionary can be used to provide quick filtering to reject row groups which do not have particular terms in them. Eg. Say a dictionary for a column value does not contain the term “Sheldon”, so we do not need to read each column value to figure out which records contains the term “Sheldon”, we can easily filter out those row groups whose column dictionary does not contain those terms.

Data Page Size and Dictionary Size

Parquet allows us to specify the data page size and dictionary page sizes. Data Page Size and Dictionary Page Size configurations help us define the maximum in memory size for each and every column data values and dictionary values respectively. Dictionary Page Size is also used for deciding whether we need to fall back to the PlainValueWriter for columns instead of the DictionaryValueWriter format.

@Override
public boolean shouldFallBack() {
  // if the dictionary reaches the max byte size or the values can not be encoded on 4 bytes anymore.
  return dictionaryByteSize > maxDictionaryByteSize
      || getDictionarySize() > MAX_DICTIONARY_ENTRIES;
}

 

Encodings In Parquet

Apart from dictionary encoding and plain encoding, parquet uses multiple other encoding schemes to store the data optimally.

  • Run Length Bit Packing Hybrid Encoding
    • This encoding uses a combination of run length + bit packing encoding to store data more efficiently. In parquet, it is used for encoding boolean values.
      Eg. 
      
      We have a list of boolean values say 
      0,0,0,0,0,0,0,0,1,1,1,1,1,1,1,1 (0 = false and 1 = true)
      
      This will get encoded to
      1000,0000,1000,0001
      
      where 1000 => 8 which is number of occurences of 0
            0000 => 0 which is the number getting repeated
            1000 => 8 which is number of occurences of 1
            0001 => 1 which is the number getting repeated

      Note: There are some other nuisances as well which have not been mentioned in this article for simplicity

  • Delta Binary Packing Encoding

Some Common Question’s

Question:  What is the value one should keep for row group size? 
Answer: This totally depends on your application resource requirements. If your application demands to have a lower memory footprint for these parquet Writers, then you have no option other than to keep the row group size to minimal value e.g. 100 KB to 1 MB. But if your application can sustain higher memory pressure, then we can afford to keep this limit even to a higher value say 10 MB. With higher values of row group sizes, we have better compression guarantees and overall better write throughput for each column.

Question: Which compression codec should one choose for my application?
Answer: The compression codec to choose depends on the data you are handling in your application. Compression and Decompression Speed also play a crucial role in deciding the compression algorithm. Some applications can take a hit in compression speed, but they want best compression ratios. There is no “One Size Fit All” compression algorithm. But if I have to choose one, I would go with ZSTD because of the high compression ratios and decompression speed offered by this algorithm. See this for more details.

References

Handling Large Amounts of Data with Parquet – Part 1

In this era of technological advancement, we are producing data like never before. Let’s take for eg. Large Hadron Collider wherein we are producing data at the rate of 1 PB per second. Given we are producing these amounts of data, we require efficient data storage formats which can provide:

  1. High compression ratios for data containing multiple fields
  2. High read throughput for analytics use cases.

Parquet is an accepted solution worldwide to provide these guarantees. Parquet provides better compression ratio as well as better read throughput for analytical queries given its columnar data storage format.  Due to its columnar format, values for particular columns are aligned and stored together which provides

  • Better compression
    • Due to the very similar nature of the data stored side by side. All the column values are stored side by side, which in turn proves really helpful to the different compression algorithms as they perform much better with similar values to compress data from.
  • Better throughput
    • Analytics use cases mostly involved querying on particular dimensions which means with parquet we have the benefit of only extracting fields which are absolutely necessary for the query to execute. e.g. Select count(*) from parquet_files where student_name like “%martin%”. In this query, we will only extract the column student_name and save those precious IO cycles which were wasted in getting the rest of the unnecessary fields.
    • Parquet also stores some metadata information for each of the row chunks which helps us avoid reading the whole block and save precious CPU cycles.

Understanding Parquet Layout

Before looking into the layout of the parquet file, let’s understand these terms.

  • Row Group
    • Column Chunk
      • Data Page
      • Dictionary Page
  • Metadata Information

Blank Diagram (13)

Row Groups

Parquet File is divided into smaller row groups. Each of these row groups contains a subset of rows. The numbers of rows in each of these row groups is governed by the block size specified by us in the ParquetWriter. ParquetWriter keeps on adding rows to a particular row group which is kept in memory. When this memory size crosses some threshold, we start flushing this in memory row groups to a file. So essentially these collections of rows which are flushed together constitute a row group.

Column Chunks

Each of these row groups contains a subset of the rows which are then stored in column chunks. Say the data we are writing to the parquet file contains 4 columns, then all the values of column1 for this subset of rows will be stored continuously followed by the values of column2 for this subset of rows and so on.

Blank Diagram (15)

 

Data Page and Dictionary Page

Each of the column chunks is divided into data page and dictionary page. Dictionary Page is stored in case dictionary encoding is enabled.

In case dictionary encoding is enabled we store the keys in the dictionary and the references to these keys in dataColumns. On flushing, these references in the dataColumns get flushed onto the data pages and the actual keys in the dictionary get flushed onto the dictionary pages. In the case of dictionary encoding not being enabled, we store the actual data in these data columns instead of the references.

Blank Diagram (17)

 

Parquet Metadata Information

Each parquet file contains some metadata information stored with it at the end of the parquet file i.e. footer. This metadata contains information regarding these things

  • Row Groups References
    • Column Chunk References inside of those Row Group References
      • Dictionary Page References inside those column chunks
      • Data Page References inside those column chunks
      • Column Chunk Statistics
      • Column Chunk Encodings
      • Column Chunk Sizes
    • Number of rows in the Row Group
    • Size of the data in the Row Group
  • Some Additional File Metadata

Writing to a Parquet File

As we already explained in the previous sections, parquet stores data in the format of row chunks. These row chunks contain a group of records which are stored in the format of column chunks.

ParquetWriter maintains in memory column values for the last k records, so while writing a record to a parquet file, we often end up writing these values in memory. After memory limit exceeds for these maintained column values, we flush these values to the parquet file. All these records which were buffered in memory constitute a row group.  This is done to save disk IOs and to improve write throughput.

Blank Diagram (19)

Reading from a Parquet File

Before reading the records from the parquet file stream, we need to be aware of the layout of the file. By layout, we mean the following things

  • Row Groups Offsets
  • Column Chunks Offsets within those row groups
  • Data Page and Dictionary Page Offsets

To know this layout, we first read the file metadata. This file metadata is always stored at a known offset in the file. After reading this file metadata, we infer the offsets for the different row groups and the different column chunks stored within those row groups. After getting these offsets, we can iterate over these row groups and column chunks to get the desired values.

Parquet File also offers the capability of filtering these row groups. This filtering for those row groups can be done via

  • Filter Predicates
    • These filter predicates are applied at job submission to see if they can be potentially used to drop entire row groups. This could help us save I/Os which could improve the application performance tremendously. These filter predicates are also applied during column assembly to drop individual records if any.

Examples described below

Filter Predicate: Filter all records which have the term "Sheldon" 
                  for column 1

Row Group Dictionaries
  Row-Group 1, Column 1 -> "1: Leonard, 2: Howard, 3: Raj"
  Row-Group 2, Column 1 -> "1: Penny, 2: Amy, 3: Bernadette"
  Row-Group 3, Column 1 -> "1: Stuart, 2: Memo, 3: Sheldon"

Now with the help of dictionary pages, we can easily filter out 
the row groups which does not have the records which contain the 
term "Sheldon" for column 1

 

Filter Predicate: Filter all records which have the rating > 50 

Row Groups
   Row-Group 1, Column Rating -> (Min -> 20, Max -> 30)
   Row-Group 2, Column Rating -> (Min -> 45, Max -> 80)
   Row-Group 3, Column Rating -> (Min -> 40, Max -> 45)

Now with the help of column page metadatas, we can easily filter out 
the row groups which does not records which have rating > 50.
  • Metadata Filters
    • With the help of metadata filters, you can filter out row groups by looking at their row group metadata fields. Eg. These filters involve filtering row groups via start and end offsets for these row groups. We can create a metadata filter which will make sure to filter the row groups which have the start and end offsets within an offset range.
    • For more details see OffsetMetadataFilter, RangeMetadataFilter

See Next Blog Article for more details on various configurations provided by parquet for encoding your data.

References

 

Exactly Once Semantics with Non-Idempotent Requests

Problem Statement

Currently, in the case of non-idempotent requests eg. incrementing a simple counter, we don’t really have a good way to deal with failures/retries. This is because it is very difficult to make sure whether the request failed before the operation completed on the server or after. So which leads to some common known problems

  • Duplicate data in case of multiple retries
    • Before 1st Increment ( INCR ) Request: Counter X = 10
    • After 1st Increment Request: Counter X = 11. But the client did not receive the ack for the request due to some network issue.
    • Client Retries again, 2nd Increment Request: Counter X = 12. This time client did receive the ack and hence did not retry again.
  • Loss of data in case of no retries.
    • Before 1st Increment ( INCR ) Request: Counter X = 10
    • The increment request was not able to complete due to a networking issue. But the client did not retry again as the client was not sure if the request was successfully executed on the server

Untitled document (7).jpg

Today’s Distributed Systems use this mechanism to deal with this situation

  • Generating a RequestID
  • Making the request to the server with this RequestID
  • The server makes sure that this RequestID has not been processed before by looking up in an in-memory cache of RequestsIDs it has processed before.
  • If not processed, it processes the request and adds the RequestID to its cache.
  • After t seconds, it cleans the entries in this cache. So that this cache does not end up consuming a lot of space/memory.
def putRequest(id: RequestId, key: K, value: V) = {
  if (isDuplicateRequest(id)) {
    return ERROR_CODE_DUPLICATE_REQUEST;
  } else {
    store.put(key, value)
  }
}

But this above-mentioned methodology is not robust. There might be cases, where this strategy might not work out.  Let’s see a case where it might fail

  • A Client sends a request to the server. The server successfully serves the request and makes the update.
  • Request Times Out and now the client is network partitioned so the client cannot make the request to the server for the next t seconds.
  • Now when the client can talk back to the server, it tries to issue the request again. But unfortunately, the requestID has been expired from the store because of which the request will succeed again and the update will be applied again 🙁 🙁

So how can we ensure exactly once writes or updates in a data store?

Solution

So for that, we need to break our non-idempotent operation into these two operations.

  • PREPARE Operation
  • COMMIT Operation

 

Untitled document (10)
PREPARE and COMMIT Operations

Now let’s dig deep into implementation details for these two operations.

PREPARE Operation

  • Client Issuing the request generates a random requestID
    • In this, we just add an entry on the server against the requestID and along with the requestID, we also store the operation OP we want to carry i.e. INC or DEC.
    • This requestID represents an undergoing operation.
    • This adding of this requestID in the DataStore is idempotent i.e. we can make add this requestID to the datastore multiple times with the same outcome and hence idempotent.
  • Even if this PREPARE operation fails, we should be able to handle this as this PREPARE is an idempotent request which means that we can execute this operation multiple times without any side effect.
PREPARE(id: requestID, key: K)

Insert Entry (X, OP)  for requestID in Table Ongoing_Requests

COMMIT Operation

  • In this operation, we COMMIT the results for this requestID.
    • For this requestID in the datastore, we apply the operation OP on Key X.
    • After applying the operation, we remove the entry for requestID from the datastore.
  • Even if this COMMIT operation is repeated multiple times, still we won’t have any side effects or unwanted state. 
    • In case of COMMIT operation being repeated, we won’t carry the same operation again because we would have already deleted the requestID in case of the first COMMIT operation. Hence the COMMIT operation is idempotent.
COMMIT(id: requestID) 

Update Value = OP(X) for Key X in Table Data
Delete Entry(X, V) for requestID in Table Ongoing_Requests
  • In Ongoing_Requests Table, we store the entries Keys which are undergoing update operations
  • In the DATA Table, we have the actual Key Value Pairs ( Key, Value )

Conclusion

So breaking any non-idempotent requests to PREPARE and COMMIT operations, change the nature of this non-idempotent request to idempotent which means we are good with any number of failures or retries. Even in case of failures and network partitions, retries of operation ( PREPARE or COMMIT ) will always make sure that the idempotent nature is maintained.

References

Building Replicated Distributed Systems with Kafka

Just a few days I was having this conversation with one of my colleagues. The conversation went as follows:

Me: Why we are going ahead with Design Y and not Design X even though the later design could improve the performance by a whole lot.
Friend: That was because implementing Design X meant that we had to support replication for fault tolerance. Also, replication seems to be one of the most challenging problems in computer science. So sadly, we went ahead with some sub-optimal design as we did not had any other choice.
Me: Hmm yeah, building replicated systems is really challenging.

In today’s world, we are building and designing systems at scale like never seen before. Every microservice that we build is either stateful or stateless. Now let’s talk a bit about these two kinds of microservices

Stateless Service

These services are somewhat easy to manage and scaling semantics are somewhat easy when compared to stateful services. We don’t need to worry about nodes going down or service becoming unserviceable because these services are stateless, so one can directly start using another node (serviceInstance).

Stateful Service

These services are somewhat challenging to manage because they have some state so we need to build these services considering the ramifications of nodes going down and service becoming unserviceable. So essentially, we need to make these service fault tolerant.

There might be two kinds of stateful services:

  • Some Services serve as a cache in front of another stateful service, to guarantee better performance. These services do have a fallback, so fault tolerance is not that big of an issue for these services strictly from state loss perspective.
  • Some other services do not have any fallback, so state which is present on the service is not present anywhere else. So we need to make sure that we provide durability, so that even if some nodes are down (up to a certain limit), there is no data/state loss.

blank-diagram-page-1-44.jpeg

In the context of this blog, we are going to talk about how can we make sure that these stateful services without fallback remain fault tolerant.

Let’s first see how can we make these services fault-tolerant:

  • Provide Replication Support in your Service
    • This means that whenever you make a write request to one of your primary servers, make sure you send this write request to all the other replicas as well.
  • One of the other solutions is to use have other open source solution or service which can help you out in your use case but that might involve a lot of changes in the source code of the open source solution
    • Repurposing an open source solution to one’s use case is somewhat difficult because you need to understand the whole implementation and its intricate details. Also managing a different fork of the open source solution is often difficult.
  • There might be some cloud-based solutions provided by different cloud service providers e.g DynamoDB, S3 or RDS by Amazon
    • These solutions sometimes do not fit our use-cases as sometimes these cloud services (provided by cloud providers) cannot be repurposed for all of our use cases.

So essentially, we might need to build our own systems which have to support replication for availability.

copy-of-blank-diagram-page-1-2.jpeg

Question: So whats the big deal now, we just need to support replication which shouldn’t be that big of a deal, isn’t it?

Answer: You are totally mistaken here. Solving replication problem i.e. keeping two systems consistent is one challenging task. There is a whole bunch of literature just focussed on how can you make sure that two systems are in the same state aka replication. See this for more details.

A Brief Introduction to Replication

From Wikipedia,

Replication involves sharing information so as to ensure consistency between redundant resources, such as software or hardware components, to improve reliability, fault-tolerance, or accessibility.

There are two kinds of replication models:

  • Synchronous Replication: This guarantee exactly same replicas at the expense of higher overhead for write calls to the primary server. This overhead would be because every replica would have its own latencies and all of these latencies would always come in critical path while making a WRITE REQUEST via the primary server.
  • Asynchronous Replication: This guarantee better response time to writes but at the expense of availability, because there might be some data loss if the primary dies in between the two syncs. So some data which was there on primary and was not present on its replicas would be lost.

blank-diagram-page-1-40.jpeg

Both of these replication techniques have their downsides, as already said synchronous replication guarantees high availability and no data loss at the expense of higher WRITE latencies whereas asynchronous replication provides much better WRITE latencies on the expense lower number of network calls and batch syncs.

Note: Also, both of these techniques are not so straightforward to implement and need some deep understanding of the common pitfalls of the replication process in general. See this link for more details.

Kafka to the Rescue

Kafka is an open source stream processing system. We would be repurposing Kafka to solve our replication use case. As already told, replication is a challenging problem to implement unless you have had experiences with it before-hand at production scale.

Kafka provides these following guarantees which we will leverage to implement replication for our service.

  • Exactly once Semantics while writing to the Kafka service
    • Kafka Service makes sure along with Kafka client that all the writes made to the Kafka service are idempotent i.e. Kafka makes sure that there are no duplicate messages or no messages which are not committed.
  • Ordered Delivery of Messages
    • Kafka Service also makes sure that the messages written by producers in a particular order are also read in that particular order by the consumers.
  • Atomic Updates to the partitions
    • Kafka service also makes sure that you can write messages in an atomic fashion to multiple Kafka partitions. Read this for more details.

FYI: You need to have a bit of understanding of Kafka partitions to understand further. Read this to understand further around Kafka. If you don’t have time to go through the Kafka partitions, consider them as logical entities where producers write data and consumers read data in an orderly fashion and this partition is replicated across different nodes, so you need not worry about the fault tolerance of these partitions. Every data which is ever written to a Kafka partition is written at a particular offset and also while reading, consumer specifies the offset from which it wants to consumes the data.

Using Kafka to solve replication

Firstly, we need to make sure that we have one Kafka partition for every Replica for our primary server.

Eg. If you want to have 2 Replicas for every primary server, then you need to have 2 Kafka Partitions, one for each of these Replicas.

copy-of-blank-diagram-page-1-1.jpeg

After we have this setup ready

  1. The primary server needs to make sure that for every WRITE Request made, apart from updating its internal state, it will also write these WRITE Requests to all the Kafka partitions belonging to the different replicas for this primary server itself.
  2. Every Kafka consumer running on those Replicas will make sure that it consumes all the WRITE Requests from the assigned Kafka partition and update its internal state corresponding to the WRITE Request.
  3. So eventually all these replicas will have the same state as of primary server.

Some of the replicas might be lagging behind but this might be because of some of the other systemic issues (which is not at all under our control) but eventually, all of the replicas will have the same state.

blank-diagram-page-1-461.jpeg
Figure: This diagram shows the interaction between the client and the primary server along with the replicas.

Implementation

One of the pros of this design is that it should not take many man-hours to implement. In this design, we need not worry about which replication do we want to use and what guarantees do we want to give in our system. This design should work out of the box for most of the systems.

Few things which we might need to implement in this design as well

  • Mechanism to figure out which are the partitions associated with the replicas and if not already there, create and assign partitions to those replicas.
  • After consuming the write request, making atomic updates to all of the replica partitions. Kafka already exposes APIs for making atomic updates to multiple partitions.
  • Reading at a particular offset from Kafka. This should be a really minimal task, as there are many open source libraries (or code segments) which will do the same stuff for you.

Now let’s discuss the positive points of this design

  • We can easily guarantee eventual consistency with this design, but if you want monotonic consistency, then that can be achieved by making sure that reads are always served by the last updated replica (or by most lagging replica in terms of updates) and this can be easily figured out by checking the uncommitted offsets for all of these replica’s Kafka partitions.

blank-diagram-page-1-451.jpeg

  • High Write throughput for most of the requests on Kafka cluster, see this for more details. Also in most of the benchmarks done, it seems Kafka provides single digit latencies for most of the requests.  Let’s look at down into the request latencies
    Old_Request_Latency = PWT_1 + PWT_2 + ....... + PWT_n
    New_Request_Latency = PWT_1 + KWT_2 + .... + KWT_n
    
    where 
    PWT_1 = Time taken to process request on Node 1
    PWT_2 = Time taken to process request on Node 2
    and so on
    
    KWT_2 = Time taken to write requst to kafka partition for replica 2
    KWT_3 = Time taken to write requst to kafka partition for replica 3
    and so on
    
    Old_Request_Latency encapsulated writing requests to all 
    of the available replicas.
    New_Request_Latency includes writing request to one of the 
    primary servers and making sure the write request is written 
    on all the concerned partitions

    Essentially latencies cannot be compared between these two subsystems, but having said that as there is an extra hop for introducing Kafka, there would be some additional overhead which should be really minimal considering latencies of Kafka.

  • If one of the replicas is having high latencies at some moment (because of high GC pauses or disk being slow), then that could increase latencies for the WRITE requests in general. But in our case using Kafka, we can easily get over this issue as we would just be adding the WRITE REQUEST to the REPLICA’S KAFKA PARTITION so it would be the responsibility of the REPLICA to sync itself whenever it has some CPU cycles.

Apart from these pros, there are obviously some cons in this design as well.

  • Although Kafka gives higher write throughput for the majority of the requests, there will be an additional overhead of adding another network hop 🙁 in this design. The impact should be really less of adding Kafka because Kafka is really sensitive towards WRITE latencies but still there would be some impact nonetheless.

Note: Write latencies for smaller payloads would increase by a higher percentage when compared to latencies for bigger payloads. This is because of the majority of the time for smaller payloads is spent on the network roundTrip and if we increase the number of network roundTrips, then this time is bound to increase. So if we can batch the write requests into a single write request and then write it to a Kafka partition, then the overhead of this approach should be really minimal.

Also, there is one important optimization that we can do in this current design to improve the write latency.

In the current design, we have N Kafka partitions for these n replicas. What if we have only single Kafka partition for all of these n replicas, then we will have better write throughput as we need not make write request to every Kafka partition (n Kafka partitions in total). The only requirement is that all of these n Kafka consumers (running on n different replicas) should belong to different consumer groups because Kafka does not allow any two consumers of a single consumer group to consume the same KTP.

So having these n kafka consumers each belonging to n different consumer groups, should improve the write throughput and latency as these n kafka consumers will read the WRITE Requests from a single KTP.

Kafka - COnsumer groups - Page 1 (3)
Figure: Diagram explaining the working of replication with single partition used across all replicas

References: