A real time stream processing framework usually involves two fundamental constructs: processors and queues. A processor reads events from a queue, executes user code to process them, and optionally writing events to another queue for additional downstream processors to consume. Queues are provided and managed by the framework. Queues transfer data and act as a buffer between processors, so that the processors can operate and scale independently. For example, a web server access log analytics application can look like this:
One key differentiator among various frameworks is the queue semantics, which commonly varies along these lines:
- Delivery Guarantee: At least once, at most once, exactly-once.
- Fault Tolerance: Failures are transparent to user and automatic recovery.
- Durability: Data can survive failures and restarts.
- Scalability: Characteristics and limitations when adding more producers/consumers.
- Performance: Throughput and latency for queue operations.
In the open-source Cask Data Application Platform (CDAP), we wanted to provide a real-time stream processing framework that is dynamically scalable, strongly consistent and with an exactly-once delivery guarantee. With such strong guarantees, developers are free to perform any form of data manipulation without worrying about inconsistency, potential reprocessing or failure. It helps developers build their big data application even if they do not have strong distributed systems background. Moreover, it is possible to relax these strong guarantees to trade-off for higher performance if needed; it is always easier than doing it the other way around.
The basic operations that can be performed on a queue are enqueue and dequeue. Producers write data to the head of the queue (enqueue) and consumers read data from the tail of the queue (dequeue). We say a queue is scalable when you enqueue faster as a whole by adding more producers and dequeue faster as a whole by adding more consumers. Ideally, the scaling is linear, meaning doubling the amount of producers/consumers will double the rate of enqueue/dequeue and is only bounded by the size of the cluster. In order to support linear scalability for producers, the queue needs to be backed by a storage system that scales linearly with the number of concurrent writers. For consumers to be linearly scalable, the queue can be partitioned such that each consumer only processes a subset of queue data.
Another aspect of queue scalability is that it should scale horizontally. This means the upper bound of the queue performance can be increased by adding more nodes to the cluster. It is important because it makes sure that the queue can keep working regardless of cluster size and can keep up with the growth in data volume.
Partitioned HBase Queue
We chose Apache HBase as the storage layer for the queue. It is designed and optimized for strong row-level consistency and horizontal scalability. It provides very good concurrent write performance and its support of ordered scans fits well for a partitioned consumer. We use the HBase Coprocessors for efficient scan filtering and queue cleanup. In order to have the exactly-once semantics on the queue, we use Tephra’s transaction support for HBase.
Producers and consumers operate independently. Each producer enqueues by performing batch HBase Puts and each consumer dequeues by performing HBase Scans. There is no link between the number of producers and consumers and they can scale separately.
The queue has a notion of consumer groups. A consumer group is a collection of consumers partitioned by the same key such that each event published to the queue is consumed by exactly one consumer within the group. The use of consumer groups allows you to partition the same queue with different keys and to scale independently based on the operational characteristics of the data. Using the access log analytics example above, the producer and consumer groups might look like this:
There are two producers running for the Log Parser and they are writing to the queue concurrently. On the consuming side, there are two consumer groups. The Unique User Counter group has two consumers, using UserID as the partitioning key and the Page View Counter group contains three consumers, using PageID as the partitioning key.
Queue rowkey format
Since an event emitted by a producer can be consumed by one or more consumer groups, we write the event to one or more rows in an HBase table, with one row designated for each consumer group. The event payload and metadata are stored in separate columns, while the row key follows this format:
The two interesting parts of the row key are the Partition ID and the Entry ID. The Partition ID determines the row key prefix for a given consumer. This allows consumer to read only the data it needs to process using a prefix scan on the table during dequeue. The Partition ID consists of two parts: a Consumer Group ID and an Consumer ID. The producer computes one partition ID per consumer group and writes to those rows on enqueue.
The Entry ID in the row key contains the transaction information. It consists of the producer transaction write pointer issued by Tephra and a monotonic increasing counter. The counter is generated locally by the producer and is needed to make row key unique for the event since a producer can enqueue more than one event within the same transaction.
On dequeue, the consumer will use the transaction writer pointer to determine if that queue entry has been committed and hence can be consumed. The row key is always unique because of the inclusion of a transaction write pointer and counter. This makes producers operate independently and never have write conflicts.
In order to generate the Partition ID, a producer needs to know the size and the partitioning key of each consumer group. The consumer groups information is recorded transactionally when the application starts as well as when there are any changes in group size.
Changing producers and consumers
It is straightforward to increase or decrease producers since each producer operates independently. Adding or removing producer processes will do the job. However, when the size of consumer group needs to change, coordination is needed to update the consumer group information correctly. The steps can be summarized by this diagram:
Pausing and resuming are fast operations as they are coordinated using Apache ZooKeeper and executed in parallel. For example, with the web access log analytics application we mentioned above, changing the consumer groups information may look something like this:
With this queue design, the enqueue and dequeue performance is on par with batch HBase Puts and HBase Scans respectively, with some overhead for talking to the Tephra server. That overhead can be greatly reduced by batching multiple events in the same transaction.
Finally, to prevent “hotspotting“, we pre-split the HBase table based on the cluster size and apply salting on the row key to better distribute writes which otherwise would have been sequential due to monotonically increasing transaction writepointer.
We’ve tested the performance on a small ten-node HBase cluster and the result is impressive. Using a 1K bytes payload with batch size of 500 events, we achieved a throughput of 100K events per second produced and consumed, running with three producers and ten consumers. We also observed the throughput increases linearly when we add more producers and consumers: for example, it increased to 200K events per second when we doubled the number of producers and consumers.
With the help of HBase and a combination of best practices, we successfully built a linearly scalable, distributed transactional queue system and used it to provide a real time stream processing framework in CDAP: dynamically scalable, strongly consistent, and with an exactly-once delivery guarantee.