Sunday, 21 June 2015

The case for standardising data lake ingestion

The recent Hadoop Summit in San Jose brought out a few themes, some familiar and some hyperbole. One standout observation that is indisputable is that conversations in the conference were no longer about "what" but instead were about "how". Hadoop in particular and the big data ecosystem in general is not only game-changing but has been here for a while now. The market is in need of best practices, reference architectures and success stories of implemented solutions and not just slideware. In a first of a series targeted at enterprises that want to know how to adopt Hadoop as the data lake, this post highlights the topic of governance and makes the case for standardising ingestion into the data lake.

A previous post highlighted governance as one of the requirements that the enterprise needs from its data lake. In fact, there has been active mainstream discussion too on data lake governance of late, an example being this O'reilly webcast by a cybersecurity company, and more so as also acknowledged by Hortonworks in their recent announcement of the Apache Atlas project. In this post, we argue that there is no better way to get governance right than by standardising ingestion into the data lake. By defining as standards what data can go in to the data lake and how, we posit that data governance is realised with minimal effort. In contrast, the typical approach to usage of Hadoop has been to load the data in first and start analysing, implicitly deferring to a later time the question of "what needs to happen in production?" While it is indeed cool that Hadoop supports "load first, ask questions later" approaches, it only works for data science experiments.

Indeed, there are more facets to the governance question, including who gets access to the data thus loaded and what those interfaces look like. However, decisions about data access can usually be taken and implemented on an as-needed basis after data is loaded into the data lake. On the data ingestion side, though, such just-in-time, or, worse, post-facto decision making is not conducive to guaranteeing key governance principles of data lineage, traceability, and metadata capture. It is absolutely necessary then to "fix" data governance at ingestion time for all enterprise data, and then proceed to define governed access patterns.

To start on the journey of standardising data lake ingestion, identify the sources from whom data need to be sourced and loaded on to the data lake. For a large enterprise, data comes from a host of different types of source systems including mainframes, relational databases, file systems, logs and even object stores. Fortunately, the Hadoop ecosystem comes with a myriad of data sourcing options that can get data from all these sources starting from legacy mainframes onwards to the latest of NoSQL databases.

Typically, the particular source system also dictates the type of data that is made available (e.g. full dump, change data capture or CDC, incremental data) and its format (e.g. binary, JSON, delimited). Though not required to be standardised, the type of data and its format needs to be taken into account in the design of the data ingestion pipeline.
   
Next, outline the ways in which the data would get to the data lake. Various data transport mechanisms are usually in play in a large enterprise (e.g. the data bus, file transfer clients). New paradigms like the direct, parallel, source-to-Hadoop transfer enabled by Sqoop are also available from the Hadoop ecosystem.

The third step is to define the type of processing to perform on the data in the ingestion pipeline which usually depends on the business criticality of the data.

Business-critical data needs to meet data quality standards, needs to adhere to specifications as laid out by the data stewards/owners, and has to be processed and ready in a business-consumable form. Ingestion of such data requires a processing framework that can allow for different data quality checks and validation rules to be deployed, while at the same time be able to consume data in its native form as made available by the earlier-defined standard sourcing methods. Also, business-critical data has stricter SLAs and tighter ingestion windows.

Non-critical data is usually loaded in to the data lake with a view to do data science experiments and discover potential for improving the business. Compared to the business-critical data, though the rigour around such data is lesser, there are specifications to follow to verify data quality from source, carry out transformations and data formats for materialising processed data that allows for easy discovery-oriented access. SLAs for ingestion could still apply to non-critical data as well. What is more, non-critical and business-critical data coexist on the same data lake in a multi-tenant deployment model resulting in the need for careful design and repeated reuse of ingestion patterns.

In either of the cases, it is necessary to ensure that flexibility of data processing, a unique value proposition offered by Hadoop in comparison to traditional data warehouses, is not lost. So, various steps in the processing pipeline need to be customisable without sacrificing governance properties.

As a final step in the standardisation process, identify all the points in the data ingestion pipeline from which metadata needs to be logged and lineage information needs to be captured. This serves a variety of purposes including satisfying audit and compliance requirements.

Rather surprisingly, for a technology ecosystem that has seen so much hype over the years, there has been little effort to define such a standardised ingestion framework. Part of the reason is surely the complexity associated with supporting all of the enterprise's data with all of the various ingestion patterns. Late last year, LinkedIn (a major power user of Hadoop) revealed the existence of Gobblin as an overarching ingestion framework in play for all of its data ingestion needs.

At The Data Team, we have engineered and built a standardised data lake ingestion framework for all of an enterprise's data using completely open-source components that serves the above needs. Moreover, the framework automates almost all the common tasks typically associated with data lake ingestion using a configuration-driven approach. Emphasis is placed on convention over configuration to ease the pain associated with on-boarding new data onto the data lake. Metadata and lineage information is logged at every step in the ingestion pipeline. A standardised template of options is provided for each processing step so that new projects can easily choose what's applicable for a particular data ingestion and immediately on-boarding data in a governed manner. At the same time, since enterprise needs do vary, the design also allows new processing additions using custom code in a plug-and-play manner.

Finally, as further validation of the need to standardise ingestion into the data lake, consider the fact that one of last year's more significant acquisitions in the big data space was for a technology that helped find what data already resides on a Hadoop cluster and generate metadata about them. Oh, inverted world! Apparently, such a configuration-driven ingestion into Hadoop is patented. Oh, cruel world! 

Sunday, 15 February 2015

Guaranteeing exactly-once load semantics in moving data from Kafka to HDFS

In a previous post, the importance of fault-tolerance in the design of distributed systems and their applications was highlighted. This post continues in the same vein.

A good use of Hadoop in a mature enterprise is as an ETL platform to compliment and offload from the warehouse. In this use, data is loaded in its raw form and then typically intensive transformation jobs are migrated to Hadoop both from the warehouse and from standalone ETL servers and tools. By designing transformations as standard, reusable processing patterns, significant cost savings can be obtained by the enterprise. More on this in a later post.

A key architectural principle in this Hadoop-as-an-ETL use case is that of exactly-once semantics for processing (in general) and writes (in particular) in the presence of failures. The principle refers to the problem of whether the data ingestion pipeline can guarantee that data is loaded into Hadoop exactly once and ruling out both the scenarios of data loss (as is possible with "at most-once" write semantics) and data duplication (as is possible with "at least-once" write semantics). If the ingestion guarantee is at least-once, then downstream processing needs to perform data de-duplication, a task made easy or difficult by the properties of the input data being ingested. Of course, at most once semantics is not acceptable where critical data is involved. The problem discussed here is not as much of a concern for mainstream RDBMS where data writes are transactional.

Consider a technology stack where data is being loaded into Hadoop from Kafka, an increasingly  popular enterprise-wide message broker (see, for example, Loggly's explanation for why they chose Kafka). One approach for getting data from Kafka into Hadoop is by using Storm, a popular choice advocated by many practitioners. As we found out and confirmed from experts as well, despite the use of Trident, the Storm-HDFS connector does not provide a mechanism for ingesting Kafka data into HDFS that guarantees exactly-once write semantics. Let us see why.

The data pipeline includes the Storm-Kafka opaque transactional spout which supports transactional semantics, and the Storm processing pipeline itself which is fault-tolerant and supports exactly-once processing semantics using the Trident implementation. However, even the use of the Storm-HDFS trident state fails to provide exactly-once semantics. Instead, what it provides is at least-once write semantics. To understand this, let us dig through the behaviour.

Writing data into HDFS using the connector is controlled by a notion of batching that is configurable (either based on time or by file size).  Once the batch threshold is reached, the bolt can be configured to rotate the file into another HDFS directory and continue writing to a new HDFS file. A Storm worker is responsible for writing into one HDFS file with each bolt worker writing to a unique file. Until the destination file is rotated though, the Storm worker continues to keep the file open for further writes.  Since the target write batch size is different from the notion of batch as implemented (opaquely) by the Trident API, there is in practice multiple Trident batches of data being written to the same HDFS file by each worker.

Consider a failure in the process of writing to a HDFS file by the Storm-HDFS Trident-based connector. Assume the failure is in the worker process itself. As is the guarantee provided by Trident, the worker is restarted by Storm and the Trident batch whose write failed will be attempted again. Since a new worker is started, a new destination HDFS file will be opened and written to with data from the just-failed Trident batch. The previous destination HDFS file is left behind as an orphan with no worker process owning it, so there isn't an explicit file close that happens and the file is not rotated. Note that this orphaned file could be storing data from prior, successfully-written Trident batches. To avoid data loss, this orphaned file cannot be ignored but will have to be included in downstream processing by a separate, out-of-band process. However, some of the data from the failed Trident batch could have been written to the orphaned file, while the entire batch is again written in to the new HDFS file. In including the orphaned file in downstream processing, there is the chance that the same data appears twice. Thus, the use of the Storm-HDFS connector provides at least-once semantics, and orphaned files need to be handled explicitly by the downstream process to avoid data loss.

A second approach to loading data from Kafka into Hadoop would be to use the Hadoop MR API and write a Map function to consume data from Kafka. The function updates the Kafka offsets after data upto a threshold is consumed. The consumption offsets are stored in a fault-tolerant system that provides durable and atomic changes like Zookeeper (or even HDFS itself as file move is an atomic operation). If the MR job consuming from Kafka fails, then it can be simply started again. This works because the Kafka offsets are updated only after the data pull into HDFS happens and the the changes to the offsets themselves are atomic. In retrying the same job, the data is pulled from Kafka from the same offset as before thus ensuring the same data gets pulled in without data duplication or data loss. The Camus project from LinkedIn and the Kangaroo project from Conductor are implementations of this method. By controlling the size of the threshold, the batching effects of this approach can be controlled, though as yet it is not conducive for near real-time loads (i.e. in the order of seconds) into HDFS that the Storm approach is capable of.