menü

Unriddling Big Data file formats

The conundrum of choice rears its confusing head during the early days of a big data project. The splintered nature of the data ecosystem inevitably leaves end-users spoilt for choice - right from picking out the platform (Cloudera, Hortonworks, Databricks) to choosing components like the compute engine (Tez, Impala) or an SQL framework (Hive).

One of the important decisions in a project is choosing the right file format when persisting data in the Hadoop Distributed File System, HDFS. And, we have put together a non-exhaustive checklist that should help you, when evaluating multiple file formats:
  • What is the most frequent way in which your data will be accessed?
  • How is your data being stored at source (structured or unstructured)?
  • What kind of partitioning needs do you have to take into account?
Here are a few advantages that choosing the right file format can have on the overall success and health of a big data project:
  • The powerful interplay between the file format and query execution engines that lead to great performance numbers (for example, include Parquet + Impala, ORC + Tez)
  • File formats that resonate well with the overall project architecture (for example, ORC coupled with streaming ingestion tools such as Flink & Storm)
And, here are a few consequences of getting the file format decision wrong:
  • Migrating data between file formats, although possible, is often a painstaking manual task that entails risks like data loss
  • The lack of synergy between the file format and compute engine can lead to bottlenecks in the overall architecture
To help explain what we mean a little better, we have used a real-life project to share with you, the cascading effect that an informed (or uninformed) file format choice can have on the entire data pipeline.

Our tech stack

Our goal (within the context of the project) was to set up a data engineering infrastructure on the client’s premises from the ground-up. And, this infrastructure would take advantage of the multiple microservices that were developed.

The tech stack
Users used mobile and web applications to generate retail configuration data across multiple vectors such as products, promotions, orders, and inventory. Our data lake read the data as raw facts from relational data stores backing the microservices. Our data scientists then used the data lake to build intelligent models and power dashboards with insights that would assist our users in making better decisions.

For setting up our data lake, we zeroed in on the following ingestion tool/file format choices: As most of the raw facts were to be read from the highly structured RDBMS', we steered clear of CSV (a plain-text format with very little compression) and Avro. Instead, we opted for columnar data stores such as Apache ORC or Apache Parquet since they provide better compression. Additionally, they tend well to vectorization, thus leading to lower-latency queries.

After a quick series of proofs-of-concept, we decided to use ORC supporting ACID properties (ACID-ORC going forward) as the file format of choice. We'd identify the following two as the essential reasons for our decision:
  • Transactional support provided by Hive - In early 2016, ORC was the only big data format providing support for ACID transactions and was the natural choice at the time
  • Hive HCatalog Streaming API - This meant we could write a bare minimal data ingestion library using simple Scala code to read data through JDBC abstractions and write them to Hive

ETL setup

Before getting into the ORC file format, let us quickly have a look at our ETL setup to understand the data pipeline at a high level.
ETL setup
All of our ingestion from external relational databases was done using HCatalog Streaming API. These tables were ingested into the datalake schema in Hive, where we stored raw facts. Then, they were primarily read by computation jobs written in Spark 1.6 for the purpose of computing rolled up (aggregated) data to be stored in a separate datamarts schema in Hive. This data was used downstream for analysis by our data scientists and for displaying metrics to the users.

Of note is, all the tables in our datalake schema were stored in the ACID-ORC file format, while all the tables in our datamarts schema were stored in the Parquet file format. This was due to a bug in Spark 1.6, which didn’t allow us to write to partitioned ORC tables. (This bug has since been fixed.)

ACID ORC primer and the need for compactions

With the overview in place, let's get under the hood of ACID-ORC.

ORC, in order to provide transactional properties, relies on the following metadata:
  • originalTransaction - Transaction ID of the transaction which inserted the row.
  • bucket - The bucket number this row belongs to
  • rowId - Monotonically increasing unique row ids
  • currentTransaction - Transaction ID making changes
When a query reads a table, Hive performs the following sequence of operations:
  1. A list of transactions valid for read is obtained by considering the isolation level
  2. Files containing records are sorted by -  originalTransaction ascending, bucket ascending, rowId ascending, and currentTransaction descending
  3. Only the first record with a currentTransaction that is in the list of valid transactions to read is returned. This corresponds to the last visible update to a row.
When we first write to an ACID-ORC table, it creates what is known as Delta files.
  • Delta files are the units of transactions in Hive. Each transaction creates upto ‘b’ delta files (where ‘b’ is the number of buckets).
  • These delta files are grouped together into a folder that's named using the following rules: delta_mintxnid_maxtxnid where mintxnid corresponds to minimum transaction ID and maxtxnid is the maximum transaction ID used in a particular transaction batch
Traditionally as the number of delta files increases, the stress on the name node increases (as more file metadata has to be saved and delta files are usually small). Moreover, read performance takes a hit as excessive computation is necessary to identify the most recent record. This is the main reason why Hive recommends running regular compactions on the data.
Hive recommends running regular compactions on the data
Unfortunately only when the table has had at least one compaction run on it (i.e., it has one base file available) can it be read by Spark 1.6. If all we have are delta files, Spark - up until 2.0, cannot read the table. Hive, of course, can read the table either way.

The key takeaway from this section is that for the ingested raw facts to be usable by our downstream consumers we had to run major compactions at the end of each of our ingestion jobs.

The problem statement

Once the above infrastructure was in place and the jobs had been running for a while, we began noticing that either our ingestion jobs failed randomly or our Spark 1.6 jobs were not writing any data into our insights tables.

After debugging the issue, we observed that the jobs were failing when we triggered major compactions on the ingested tables. The failure meant that even though the data had been successfully ingested into the data lake, it remained unusable.

This also led to a lot of manual work for our data engineering team as every time a job failed; we had to manually trigger compactions on the affected tables and partitions. Also, we had to also manually trigger any downstream jobs that had failed as a result of this data unavailability. This ensured no negative effect on our data scientists or users.

Of course, like any good engineering team, we chose the simplest approach to solve this problem in our first attempt: Retry the compaction 'N' times and only fail the job after that. (In our case, N was 3.) Our initial reaction to this solution was somewhat positive. We did notice a reduction in job failures due to compaction issues but, not a total halt.

This was when we decided to take a closer look at Hive's transactional API and understand what was really going on.

Deep dive into Hive compactions

Whenever a compaction request is issued to Hive, the following sequence of actions are set in motion:
  1. Compaction request is placed in the compaction queue
  2. When a worker becomes available, it begins to resolve table and partition spec of the next compaction request in the queue
  3. Resolves the kind of compaction requested (major/minor)
  4. Find the compaction high water mark which is the highest committed transaction to be considered for compaction. High Water Mark is a transaction ID such that no transaction < high water mark is Open.
  5. Launch job to compact only files which were created by transactions with txnid <= high water mark.
It’s common knowledge that writing to Hive using the transactional API is done by opening multiple transactions and writing delta files to the target table. These delta files are then compacted into a single base file. This process can be demonstrated in the following simple diagram:
Delta files are then compacted into a single base file
The issue our team faced only occurred when multiple jobs simultaneously open transactions to write to different target tables. Hive compactions fail if there is a transaction with a smaller ID still open. This can be demonstrated in the following diagram:
Hive compactions fail if there is a transaction with a smaller ID still open
If we were to try and word the issue, it would read, “If we had multiple jobs ingesting data at the same time and one of them had a long-running Hive transaction for any reason, all subsequent jobs would fail as they would be unable to trigger major compactions on their respective target tables.”

Our data pipeline was composed of multiple ingestion jobs running simultaneously at various points of the day. These ingestion jobs, when they succeeded, triggered multiple Spark 1.6 computation jobs of their own. And, as we've explained, a failure in one ingestion job due to compaction had a cascading effect leading to failures in subsequent ingestion jobs. This led to failures or a lack of data for all our computation jobs.

Final solution

After the above analysis, we realized a file format like NonACID ORC or Parquet would better suit our purposes. Most of our jobs tended to be micro-batches or ETL in nature, meaning they would only be triggered in well-defined intervals. Therefore, we really didn’t have to leverage the transactional properties of the ACID-ORC format.

As of writing this article, we have migrated all our ingestion jobs to write to NonACID-ORC tables using Hive's APIs. As a result of this approach, we've noticed significant improvements in our query execution speeds, as well as a complete disappearance of the issues that plagued us earlier.

Conclusion

Sharing this case study gives us an opportunity to emphasize on the importance of choosing a Big Data storage format. We wanted this post to serve as a reminder of the difficulties one might face when having to course-correct from an incorrect decision. As you’ve seen, our team put in a lot of effort to identify and debug the problem with their architecture. After many months of trial and error, and poring over Hive documentation proved unhelpful, we dove head-first into the Hive/ORC source code to understand the internal implementation details. This kind of getting-your-hands-dirty approach is not unheard of in the Big Data ecosystem, which continues to rapidly grow and mature with the discovery of new use-cases and solutions. Our hope in sharing this journey is for the community to rally together to better understand this ecosystem's unique nature.