Sunday, 15 February 2015

Guaranteeing exactly-once load semantics in moving data from Kafka to HDFS

In a previous post, the importance of fault-tolerance in the design of distributed systems and their applications was highlighted. This post continues in the same vein.

A good use of Hadoop in a mature enterprise is as an ETL platform to compliment and offload from the warehouse. In this use, data is loaded in its raw form and then typically intensive transformation jobs are migrated to Hadoop both from the warehouse and from standalone ETL servers and tools. By designing transformations as standard, reusable processing patterns, significant cost savings can be obtained by the enterprise. More on this in a later post.

A key architectural principle in this Hadoop-as-an-ETL use case is that of exactly-once semantics for processing (in general) and writes (in particular) in the presence of failures. The principle refers to the problem of whether the data ingestion pipeline can guarantee that data is loaded into Hadoop exactly once and ruling out both the scenarios of data loss (as is possible with "at most-once" write semantics) and data duplication (as is possible with "at least-once" write semantics). If the ingestion guarantee is at least-once, then downstream processing needs to perform data de-duplication, a task made easy or difficult by the properties of the input data being ingested. Of course, at most once semantics is not acceptable where critical data is involved. The problem discussed here is not as much of a concern for mainstream RDBMS where data writes are transactional.

Consider a technology stack where data is being loaded into Hadoop from Kafka, an increasingly  popular enterprise-wide message broker (see, for example, Loggly's explanation for why they chose Kafka). One approach for getting data from Kafka into Hadoop is by using Storm, a popular choice advocated by many practitioners. As we found out and confirmed from experts as well, despite the use of Trident, the Storm-HDFS connector does not provide a mechanism for ingesting Kafka data into HDFS that guarantees exactly-once write semantics. Let us see why.

The data pipeline includes the Storm-Kafka opaque transactional spout which supports transactional semantics, and the Storm processing pipeline itself which is fault-tolerant and supports exactly-once processing semantics using the Trident implementation. However, even the use of the Storm-HDFS trident state fails to provide exactly-once semantics. Instead, what it provides is at least-once write semantics. To understand this, let us dig through the behaviour.

Writing data into HDFS using the connector is controlled by a notion of batching that is configurable (either based on time or by file size).  Once the batch threshold is reached, the bolt can be configured to rotate the file into another HDFS directory and continue writing to a new HDFS file. A Storm worker is responsible for writing into one HDFS file with each bolt worker writing to a unique file. Until the destination file is rotated though, the Storm worker continues to keep the file open for further writes.  Since the target write batch size is different from the notion of batch as implemented (opaquely) by the Trident API, there is in practice multiple Trident batches of data being written to the same HDFS file by each worker.

Consider a failure in the process of writing to a HDFS file by the Storm-HDFS Trident-based connector. Assume the failure is in the worker process itself. As is the guarantee provided by Trident, the worker is restarted by Storm and the Trident batch whose write failed will be attempted again. Since a new worker is started, a new destination HDFS file will be opened and written to with data from the just-failed Trident batch. The previous destination HDFS file is left behind as an orphan with no worker process owning it, so there isn't an explicit file close that happens and the file is not rotated. Note that this orphaned file could be storing data from prior, successfully-written Trident batches. To avoid data loss, this orphaned file cannot be ignored but will have to be included in downstream processing by a separate, out-of-band process. However, some of the data from the failed Trident batch could have been written to the orphaned file, while the entire batch is again written in to the new HDFS file. In including the orphaned file in downstream processing, there is the chance that the same data appears twice. Thus, the use of the Storm-HDFS connector provides at least-once semantics, and orphaned files need to be handled explicitly by the downstream process to avoid data loss.

A second approach to loading data from Kafka into Hadoop would be to use the Hadoop MR API and write a Map function to consume data from Kafka. The function updates the Kafka offsets after data upto a threshold is consumed. The consumption offsets are stored in a fault-tolerant system that provides durable and atomic changes like Zookeeper (or even HDFS itself as file move is an atomic operation). If the MR job consuming from Kafka fails, then it can be simply started again. This works because the Kafka offsets are updated only after the data pull into HDFS happens and the the changes to the offsets themselves are atomic. In retrying the same job, the data is pulled from Kafka from the same offset as before thus ensuring the same data gets pulled in without data duplication or data loss. The Camus project from LinkedIn and the Kangaroo project from Conductor are implementations of this method. By controlling the size of the threshold, the batching effects of this approach can be controlled, though as yet it is not conducive for near real-time loads (i.e. in the order of seconds) into HDFS that the Storm approach is capable of.




7 comments:

  1. I did see this question on the storm-user group raised by you but did not find any replies to it .Thanks for giving a detailed explanation though ,it helps .Curious is there any other forum you got the reply from or did you contact the authors of the storm-hdfs to confirm.

    ReplyDelete
    Replies
    1. Sumant, I got the confirmation from someone at Hortonworks through the customer support channel at a client.

      Delete
  2. I see that there is now a transactional trident kafka spout. Would this now ensure that storm trident writes a batch to hfs exactly once?

    ReplyDelete
  3. Transactional Kafka spout has no relation to the HDFS bolt.

    ReplyDelete
    Replies
    1. This comment has been removed by the author.

      Delete
    2. Yes, my apologies. Guess it was due to my limited understanding back then. I see the following in the readme file of the storm-hdfs github project: "Whenever a batch is replayed by storm (due to failures), the trident state implementation automatically removes duplicates from the current data file by copying the data up to the last transaction to another file." The "current data file" refers to the orphaned file? I will test and let you know.

      Delete
    3. Here is the link to the github project: https://github.com/apache/storm/tree/master/external/storm-hdfs

      Delete