Triggering the processing of data in Hadoop—as soon as enough new data is available—helps optimize many incremental data processing use-cases, but is not trivial to implement. The ability to schedule a job (such as MapReduce or Spark) to run as soon as there’s a certain amount of unprocessed data available—for instance, in a set of HDFS files or in an HBase table—has to find a balance between the speed of delivering the results and the resources used to compute those results.
The direct solution—where a scheduler polls the source of data for its state or size at regular intervals—has a number of issues:
- The scheduler has to know how to deal with different type of sources. Adding a new source type—say, a Hive table—would require updating the scheduler to add support for it.
- Computing the state of the source can be complex. The source of data can have a complex structure, such as a hierarchy of directories on HDFS with many files that represent partitions of a data set. Data size does not always increase: sometimes it can reduce due to the application of a data retention policy where old files are scrubbed, making keeping track of progress more complicated.
- Computing the state of the source can be resource intensive. For instance, finding out the size of newly written data in an HBase table may require scanning through the actual data or collecting statistics on the table’s files in HDFS.
- Infrequent polling for a source state may cause unwanted delays in processing the data. With computing the state being complex and resource intensive, it is common to configure polling at greater intervals. Depending on the nature of the source, this can mean that data is processed with increasingly greater delays, or that many polls are redundant and unnecessarily eat up resources.
In this post, we talk about a solution that addresses some of these pain points.
Decoupling of source and scheduler
To simplify the scheduler and allow adding new types of sources without changing a scheduler we want to decouple the two. Moving the tracking of source state into the source itself (or the writer of the data) also helps to do it more efficiently because there’s a lot more insights into the ingested data on the source side. To decouple source and scheduler, we introduce the source state management system:
The source reports its state incrementally, as it writes new data, to a state management system. The scheduler polls for state changes at regular intervals and triggers the job, if needed, based on a schedule configuration. Thus, the scheduler only interacts with the state management system.
Optimize the data source querying delay by using notifications
Decoupling sources and the scheduler helps to reduce the dependency between the two, but it still leaves polling issues which can be addressed by introducing notifications. In order for the scheduler to trigger a job immediately—even when data arrives at the source in between two polls—the scheduler must be notified about the changes earlier when there’s a significant state change. For example, every time certain amount of data is written to the source, a notification can be published:
Upon receiving the notification, the scheduler polls the state of the source right away to confirm that there is enough data to start a job. If there’s enough new data, the scheduler triggers the job.
Example: Scheduling a job based on data availability in a CDAP Stream
Let’s take a look at how the open source Cask Data Application Platform (CDAP), running on Hadoop, implements a solution that allows you to schedule a job based on data availability in a Stream. One benefit of CDAP being an integrated platform is that it includes many auxiliary services that help developers to build and run applications and perform system tasks, including scheduling jobs.
In CDAP, the built-in Metrics service is used to maintain statistics for various resources managed by the platform, while the Notification service helps optimize the polling. A Scheduler uses the two to trigger a data processing job within a Workflow container.
CDAP provides an abstraction for elastically scalable real-time and batch data ingestion, called a Stream. A StreamWriter stores data as files on HDFS that can later be consumed for processing in either realtime or by a batch job. It reports the changes in the statistics of the Stream incrementally by emitting metrics to a metrics system. Once the change reaches a configured threshold, the Stream sends a notification to the notification system that then distributes it to subscribers so they can take action. This allows the scheduler to perform infrequent polling and still be able to quickly act on changes. Notifications are optional: the Metrics system acts as the “source of truth” in the end. This makes both the Notification and Metrics systems simpler and more efficient at what they do.
With the recently released v2.8.0 of CDAP, configuring the job to run when enough new data has been written to a Stream is straightforward:
The code above creates a Stream, adds a Workflow and configures the Workflow to be run when one megabyte of new data has been ingested into the Stream. More information on how to build and run PurchaseApp example can be found here, with its source code available here.
Scheduling a job in Hadoop based on the availability of data is a very common use case. At the same time, it is not a trivial feature to build and is best handled with an integrated platform. Check out the solution we’ve built—completely open-source—here at Cask. Also, stay tuned for the follow-up posts in which we cover more details about how we built Notification and Metrics services within CDAP.