Master
ThoughtWorks
Menü
schließen
  • Unsere Services
    • Übersicht
    • Customer Experience, Produkt und Design
    • Data Strategy, Engineering und Analytics
    • Digitale Transformation und Operations
    • Enterprise Modernization, Plattformen und Cloud
  • Unsere Kunden
    • Übersicht
    • Automobil
    • Gesundheit
    • Öffentlicher Sektor
    • Clientech, Energie und Versorgung
    • Medien
    • Handel und E-Commerce
    • Banken und Versicherungen
    • Non-Profit
    • Reise und Transport
  • Insights
    • Übersicht
    • Unsere Empfehlungen

      • Technologie

        Ausführliche Betrachtungen neuer Technologien.

      • Business

        Aktuelle Business-Insights, Strategien und Impulse für digitale Querdenker.

      • Kultur

        Insights zu Karrieremöglichkeiten und unsere Sicht auf soziale Gerechtigkeit und Inklusivität.

    • Digitale Veröffentlichungen und Tools

      • Technology Radar

        Unser Leitfaden für aktuelle Technologietrends.

      • Perspectives

        Unsere Publikation für digitale Vordenker*innen

      • Digital Fluency Model

        Ein Modell zur Priorisierung digitaler Fähigkeiten, um für das Unvorhersehbare bereit zu sein.

      • Decoder

        Der Technology-Guide für Business Entscheider

    • Alle Insights

      • Artikel

        Expertenwissen für Ihr Unternehmen.

      • Blogs

        Persönliche Perspektiven von ThoughtWorkern aus aller Welt.

      • Bücher

        Stöbern Sie durch unsere umfangreiche Bibliothek.

      • Podcasts

        Spannende Gespräche über das Neueste aus Business und Technologie.

  • Karriere
    • Übersicht
    • Bewerbungsprozess

      Finde heraus, was dich in unserem Bewerbungsprozess erwartet.

    • Hochschulabsovent*innen und Quereinsteiger*innen

      Dein Einstieg in die IT-Welt.

    • Stellenangebote

      Finde offene Stellen in deiner Region.

    • In Kontakt bleiben

      Abonniere unsere monatlichen Updates.

  • Über uns
    • Übersicht
    • Unsere Mission
    • Awards und Auszeichnungen
    • Vielfalt, Gleichberechtigung, Inklusion
    • Management
    • Partnerschaften
    • Neuigkeiten
    • Konferenzen und Events
  • Kontakt
Germany | Deutsch
  • United States United States
    English
  • China China
    中文 | English
  • India India
    English
  • Canada Canada
    English
  • Singapore Singapore
    English
  • United Kingdom United Kingdom
    English
  • Australia Australia
    English
  • Germany Germany
    English | Deutsch
  • Brazil Brazil
    English | Português
  • Spain Spain
    English | Español
  • Global Global
    English
Blogs
Wählen Sie ein Thema
Alle Themen ansehenschließen
Technologie 
Agiles Projektmanagement Cloud Continuous Delivery  Data Science & Engineering Defending the Free Internet Evolutionäre Architekturen Experience Design IoT Sprachen, Tools & Frameworks Modernisierung bestehender Alt-Systeme Machine Learning & Artificial Intelligence Microservices Plattformen Sicherheit Software Testing Technologiestrategie 
Geschäft 
Financial Services Global Health Innovation Retail  Transformation 
Karriere 
Karriere Hacks Diversity und Inclusion Social Change 
Blogs

Themen

Thema auswählen
  • Technologie
    Technologie
  • Technologie Überblick
  • Agiles Projektmanagement
  • Cloud
  • Continuous Delivery
  • Data Science & Engineering
  • Defending the Free Internet
  • Evolutionäre Architekturen
  • Experience Design
  • IoT
  • Sprachen, Tools & Frameworks
  • Modernisierung bestehender Alt-Systeme
  • Machine Learning & Artificial Intelligence
  • Microservices
  • Plattformen
  • Sicherheit
  • Software Testing
  • Technologiestrategie
  • Geschäft
    Geschäft
  • Geschäft Überblick
  • Financial Services
  • Global Health
  • Innovation
  • Retail
  • Transformation
  • Karriere
    Karriere
  • Karriere Überblick
  • Karriere Hacks
  • Diversity und Inclusion
  • Social Change
Data Science & EngineeringTechnologie

Unriddling Big Data file formats

Balaji Sivaraman Balaji Sivaraman
Nithish Sankaranarayanan Nithish Sankaranarayanan

Published: Sep 2, 2018

The conundrum of choice rears its confusing head during the early days of a big data project. The splintered nature of the data ecosystem inevitably leaves end-users spoilt for choice - right from picking out the platform (Cloudera, Hortonworks, Databricks) to choosing components like the compute engine (Tez, Impala) or an SQL framework (Hive).

One of the important decisions in a project is choosing the right file format when persisting data in the Hadoop Distributed File System, HDFS. And, we have put together a non-exhaustive checklist that should help you, when evaluating multiple file formats:
  • What is the most frequent way in which your data will be accessed?
  • How is your data being stored at source (structured or unstructured)?
  • What kind of partitioning needs do you have to take into account?
Here are a few advantages that choosing the right file format can have on the overall success and health of a big data project:
  • The powerful interplay between the file format and query execution engines that lead to great performance numbers (for example, include Parquet + Impala, ORC + Tez)
  • File formats that resonate well with the overall project architecture (for example, ORC coupled with streaming ingestion tools such as Flink & Storm)
And, here are a few consequences of getting the file format decision wrong:
  • Migrating data between file formats, although possible, is often a painstaking manual task that entails risks like data loss
  • The lack of synergy between the file format and compute engine can lead to bottlenecks in the overall architecture
To help explain what we mean a little better, we have used a real-life project to share with you, the cascading effect that an informed (or uninformed) file format choice can have on the entire data pipeline.

Our tech stack

Our goal (within the context of the project) was to set up a data engineering infrastructure on the client’s premises from the ground-up. And, this infrastructure would take advantage of the multiple microservices that were developed.

The tech stack
Users used mobile and web applications to generate retail configuration data across multiple vectors such as products, promotions, orders, and inventory. Our data lake read the data as raw facts from relational data stores backing the microservices. Our data scientists then used the data lake to build intelligent models and power dashboards with insights that would assist our users in making better decisions.

For setting up our data lake, we zeroed in on the following ingestion tool/file format choices:
  • Sqoop/Parquet
  • Hive HCatalog Streaming API/ORC
  • Sqoop/Avro
  • Raw Text File Import/CSV
As most of the raw facts were to be read from the highly structured RDBMS', we steered clear of CSV (a plain-text format with very little compression) and Avro. Instead, we opted for columnar data stores such as Apache ORC or Apache Parquet since they provide better compression. Additionally, they tend well to vectorization, thus leading to lower-latency queries.

After a quick series of proofs-of-concept, we decided to use ORC supporting ACID properties (ACID-ORC going forward) as the file format of choice. We'd identify the following two as the essential reasons for our decision:
  • Transactional support provided by Hive - In early 2016, ORC was the only big data format providing support for ACID transactions and was the natural choice at the time
  • Hive HCatalog Streaming API - This meant we could write a bare minimal data ingestion library using simple Scala code to read data through JDBC abstractions and write them to Hive

ETL setup

Before getting into the ORC file format, let us quickly have a look at our ETL setup to understand the data pipeline at a high level.
ETL setup
All of our ingestion from external relational databases was done using HCatalog Streaming API. These tables were ingested into the datalake schema in Hive, where we stored raw facts. Then, they were primarily read by computation jobs written in Spark 1.6 for the purpose of computing rolled up (aggregated) data to be stored in a separate datamarts schema in Hive. This data was used downstream for analysis by our data scientists and for displaying metrics to the users.

Of note is, all the tables in our datalake schema were stored in the ACID-ORC file format, while all the tables in our datamarts schema were stored in the Parquet file format. This was due to a bug in Spark 1.6, which didn’t allow us to write to partitioned ORC tables. (This bug has since been fixed.)

ACID ORC primer and the need for compactions

With the overview in place, let's get under the hood of ACID-ORC.

ORC, in order to provide transactional properties, relies on the following metadata:
  • originalTransaction - Transaction ID of the transaction which inserted the row.
  • bucket - The bucket number this row belongs to
  • rowId - Monotonically increasing unique row ids
  • currentTransaction - Transaction ID making changes
When a query reads a table, Hive performs the following sequence of operations:
  1. A list of transactions valid for read is obtained by considering the isolation level
  2. Files containing records are sorted by -  originalTransaction ascending, bucket ascending, rowId ascending, and currentTransaction descending
  3. Only the first record with a currentTransaction that is in the list of valid transactions to read is returned. This corresponds to the last visible update to a row.
When we first write to an ACID-ORC table, it creates what is known as Delta files.
  • Delta files are the units of transactions in Hive. Each transaction creates upto ‘b’ delta files (where ‘b’ is the number of buckets).
  • These delta files are grouped together into a folder that's named using the following rules: delta_mintxnid_maxtxnid where mintxnid corresponds to minimum transaction ID and maxtxnid is the maximum transaction ID used in a particular transaction batch
Traditionally as the number of delta files increases, the stress on the name node increases (as more file metadata has to be saved and delta files are usually small). Moreover, read performance takes a hit as excessive computation is necessary to identify the most recent record. This is the main reason why Hive recommends running regular compactions on the data.
Hive recommends running regular compactions on the data
Unfortunately only when the table has had at least one compaction run on it (i.e., it has one base file available) can it be read by Spark 1.6. If all we have are delta files, Spark - up until 2.0, cannot read the table. Hive, of course, can read the table either way.

The key takeaway from this section is that for the ingested raw facts to be usable by our downstream consumers we had to run major compactions at the end of each of our ingestion jobs.

The problem statement

Once the above infrastructure was in place and the jobs had been running for a while, we began noticing that either our ingestion jobs failed randomly or our Spark 1.6 jobs were not writing any data into our insights tables.

After debugging the issue, we observed that the jobs were failing when we triggered major compactions on the ingested tables. The failure meant that even though the data had been successfully ingested into the data lake, it remained unusable.

This also led to a lot of manual work for our data engineering team as every time a job failed; we had to manually trigger compactions on the affected tables and partitions. Also, we had to also manually trigger any downstream jobs that had failed as a result of this data unavailability. This ensured no negative effect on our data scientists or users.

Of course, like any good engineering team, we chose the simplest approach to solve this problem in our first attempt: Retry the compaction 'N' times and only fail the job after that. (In our case, N was 3.) Our initial reaction to this solution was somewhat positive. We did notice a reduction in job failures due to compaction issues but, not a total halt.

This was when we decided to take a closer look at Hive's transactional API and understand what was really going on.

Deep dive into Hive compactions

Whenever a compaction request is issued to Hive, the following sequence of actions are set in motion:
  1. Compaction request is placed in the compaction queue
  2. When a worker becomes available, it begins to resolve table and partition spec of the next compaction request in the queue
  3. Resolves the kind of compaction requested (major/minor)
  4. Find the compaction high water mark which is the highest committed transaction to be considered for compaction. High Water Mark is a transaction ID such that no transaction < high water mark is Open.
  5. Launch job to compact only files which were created by transactions with txnid <= high water mark.
It’s common knowledge that writing to Hive using the transactional API is done by opening multiple transactions and writing delta files to the target table. These delta files are then compacted into a single base file. This process can be demonstrated in the following simple diagram:
Delta files are then compacted into a single base file
The issue our team faced only occurred when multiple jobs simultaneously open transactions to write to different target tables. Hive compactions fail if there is a transaction with a smaller ID still open. This can be demonstrated in the following diagram:
Hive compactions fail if there is a transaction with a smaller ID still open
If we were to try and word the issue, it would read, “If we had multiple jobs ingesting data at the same time and one of them had a long-running Hive transaction for any reason, all subsequent jobs would fail as they would be unable to trigger major compactions on their respective target tables.”

Our data pipeline was composed of multiple ingestion jobs running simultaneously at various points of the day. These ingestion jobs, when they succeeded, triggered multiple Spark 1.6 computation jobs of their own. And, as we've explained, a failure in one ingestion job due to compaction had a cascading effect leading to failures in subsequent ingestion jobs. This led to failures or a lack of data for all our computation jobs.

Final solution

After the above analysis, we realized a file format like NonACID ORC or Parquet would better suit our purposes. Most of our jobs tended to be micro-batches or ETL in nature, meaning they would only be triggered in well-defined intervals. Therefore, we really didn’t have to leverage the transactional properties of the ACID-ORC format.

As of writing this article, we have migrated all our ingestion jobs to write to NonACID-ORC tables using Hive's APIs. As a result of this approach, we've noticed significant improvements in our query execution speeds, as well as a complete disappearance of the issues that plagued us earlier.

Conclusion

Sharing this case study gives us an opportunity to emphasize on the importance of choosing a Big Data storage format. We wanted this post to serve as a reminder of the difficulties one might face when having to course-correct from an incorrect decision. As you’ve seen, our team put in a lot of effort to identify and debug the problem with their architecture. After many months of trial and error, and poring over Hive documentation proved unhelpful, we dove head-first into the Hive/ORC source code to understand the internal implementation details. This kind of getting-your-hands-dirty approach is not unheard of in the Big Data ecosystem, which continues to rapidly grow and mature with the discovery of new use-cases and solutions. Our hope in sharing this journey is for the community to rally together to better understand this ecosystem's unique nature.
Weitere Blogposts
Data Science & Engineering

Fertile data: why building quality into your process is important

Danilo Sato
Mehr hier
Data Science & Engineering

Retail analytics: from hours to seconds using R

Bharani Subramaniam
Mehr hier
Data Science & Engineering

NoSQL Databases: An Overview

Pramod Sadalage
Mehr hier
Master
Datenschutz | Modern Slavery statement | Barrierefreies Webdesign
Connect with us
×

WeChat

QR code to ThoughtWorks China WeChat subscription account
© 2021 ThoughtWorks, Inc.