One of the important decisions in a project is choosing the right file format when persisting data in the Hadoop Distributed File System, HDFS. And, we have put together a non-exhaustive checklist that should help you, when evaluating multiple file formats:
- What is the most frequent way in which your data will be accessed?
- How is your data being stored at source (structured or unstructured)?
- What kind of partitioning needs do you have to take into account?
- The powerful interplay between the file format and query execution engines that lead to great performance numbers (for example, include Parquet + Impala, ORC + Tez)
- File formats that resonate well with the overall project architecture (for example, ORC coupled with streaming ingestion tools such as Flink & Storm)
- Migrating data between file formats, although possible, is often a painstaking manual task that entails risks like data loss
- The lack of synergy between the file format and compute engine can lead to bottlenecks in the overall architecture
Our tech stackOur goal (within the context of the project) was to set up a data engineering infrastructure on the client’s premises from the ground-up. And, this infrastructure would take advantage of the multiple microservices that were developed.
Users used mobile and web applications to generate retail configuration data across multiple vectors such as products, promotions, orders, and inventory. Our data lake read the data as raw facts from relational data stores backing the microservices. Our data scientists then used the data lake to build intelligent models and power dashboards with insights that would assist our users in making better decisions.
For setting up our data lake, we zeroed in on the following ingestion tool/file format choices:
After a quick series of proofs-of-concept, we decided to use ORC supporting ACID properties (ACID-ORC going forward) as the file format of choice. We'd identify the following two as the essential reasons for our decision:
- Transactional support provided by Hive - In early 2016, ORC was the only big data format providing support for ACID transactions and was the natural choice at the time
- Hive HCatalog Streaming API - This meant we could write a bare minimal data ingestion library using simple Scala code to read data through JDBC abstractions and write them to Hive
ETL setupBefore getting into the ORC file format, let us quickly have a look at our ETL setup to understand the data pipeline at a high level.
All of our ingestion from external relational databases was done using HCatalog Streaming API. These tables were ingested into the datalake schema in Hive, where we stored raw facts. Then, they were primarily read by computation jobs written in Spark 1.6 for the purpose of computing rolled up (aggregated) data to be stored in a separate datamarts schema in Hive. This data was used downstream for analysis by our data scientists and for displaying metrics to the users.
Of note is, all the tables in our datalake schema were stored in the ACID-ORC file format, while all the tables in our datamarts schema were stored in the Parquet file format. This was due to a bug in Spark 1.6, which didn’t allow us to write to partitioned ORC tables. (This bug has since been fixed.)
ACID ORC primer and the need for compactionsWith the overview in place, let's get under the hood of ACID-ORC.
ORC, in order to provide transactional properties, relies on the following metadata:
- originalTransaction - Transaction ID of the transaction which inserted the row.
- bucket - The bucket number this row belongs to
- rowId - Monotonically increasing unique row ids
- currentTransaction - Transaction ID making changes
- A list of transactions valid for read is obtained by considering the isolation level
- Files containing records are sorted by - originalTransaction ascending, bucket ascending, rowId ascending, and currentTransaction descending
- Only the first record with a currentTransaction that is in the list of valid transactions to read is returned. This corresponds to the last visible update to a row.
- Delta files are the units of transactions in Hive. Each transaction creates upto ‘b’ delta files (where ‘b’ is the number of buckets).
- These delta files are grouped together into a folder that's named using the following rules: delta_mintxnid_maxtxnid where mintxnid corresponds to minimum transaction ID and maxtxnid is the maximum transaction ID used in a particular transaction batch
Unfortunately only when the table has had at least one compaction run on it (i.e., it has one base file available) can it be read by Spark 1.6. If all we have are delta files, Spark - up until 2.0, cannot read the table. Hive, of course, can read the table either way.
The key takeaway from this section is that for the ingested raw facts to be usable by our downstream consumers we had to run major compactions at the end of each of our ingestion jobs.
The problem statementOnce the above infrastructure was in place and the jobs had been running for a while, we began noticing that either our ingestion jobs failed randomly or our Spark 1.6 jobs were not writing any data into our insights tables.
After debugging the issue, we observed that the jobs were failing when we triggered major compactions on the ingested tables. The failure meant that even though the data had been successfully ingested into the data lake, it remained unusable.
This also led to a lot of manual work for our data engineering team as every time a job failed; we had to manually trigger compactions on the affected tables and partitions. Also, we had to also manually trigger any downstream jobs that had failed as a result of this data unavailability. This ensured no negative effect on our data scientists or users.
Of course, like any good engineering team, we chose the simplest approach to solve this problem in our first attempt: Retry the compaction 'N' times and only fail the job after that. (In our case, N was 3.) Our initial reaction to this solution was somewhat positive. We did notice a reduction in job failures due to compaction issues but, not a total halt.
This was when we decided to take a closer look at Hive's transactional API and understand what was really going on.
Deep dive into Hive compactionsWhenever a compaction request is issued to Hive, the following sequence of actions are set in motion:
- Compaction request is placed in the compaction queue
- When a worker becomes available, it begins to resolve table and partition spec of the next compaction request in the queue
- Resolves the kind of compaction requested (major/minor)
- Find the compaction high water mark which is the highest committed transaction to be considered for compaction. High Water Mark is a transaction ID such that no transaction < high water mark is Open.
- Launch job to compact only files which were created by transactions with txnid <= high water mark.
The issue our team faced only occurred when multiple jobs simultaneously open transactions to write to different target tables. Hive compactions fail if there is a transaction with a smaller ID still open. This can be demonstrated in the following diagram:
If we were to try and word the issue, it would read, “If we had multiple jobs ingesting data at the same time and one of them had a long-running Hive transaction for any reason, all subsequent jobs would fail as they would be unable to trigger major compactions on their respective target tables.”
Our data pipeline was composed of multiple ingestion jobs running simultaneously at various points of the day. These ingestion jobs, when they succeeded, triggered multiple Spark 1.6 computation jobs of their own. And, as we've explained, a failure in one ingestion job due to compaction had a cascading effect leading to failures in subsequent ingestion jobs. This led to failures or a lack of data for all our computation jobs.
Final solutionAfter the above analysis, we realized a file format like NonACID ORC or Parquet would better suit our purposes. Most of our jobs tended to be micro-batches or ETL in nature, meaning they would only be triggered in well-defined intervals. Therefore, we really didn’t have to leverage the transactional properties of the ACID-ORC format.
As of writing this article, we have migrated all our ingestion jobs to write to NonACID-ORC tables using Hive's APIs. As a result of this approach, we've noticed significant improvements in our query execution speeds, as well as a complete disappearance of the issues that plagued us earlier.