I was inspired by the recent Google I/O talk on Cloud Dataflow, a data processing service used internally at Google, which evolved from a model based on MapReduce and successor stream processing technologies such as MillWheel and FlumeJava. Based on the premise of focusing on your application logic rather than the underlying infrastructure, I set out to determine how difficult would it be to create a Twitter hashtag-suggestion application similar to the one demoed at Google I/O. I used Cask Data Application Platform (CDAP), open source technology that allows developers to easily build Hadoop applications without requiring specialized knowledge of distributed systems.
My goal was to create an application that suggests hashtags to users based on a prefix they enter. For example, suggestions for the prefix “ob” might return “#Obama,” “#ObamaCare,” “#Obey,” etc. The biggest challenge was to make the suggestions current with the Twitter stream and weed out older tags. A hashtag such as “#NEDvsCRC” (the hashtag for the 2014 World Cup game between the Netherlands and Costa Rica) might not be relevant anymore. So the app should suggest timely hashtags based on the context of recent Twitter data.
Overcoming the data challenges
To build this type of streaming application, we have to overcome a few challenges. The app needs to handle high volumes of data; it needs to interact with the storage system in realtime to store aggregates with a high degree of parallelism and consistency; and the suggestions lookup should be fast and not slow down as the number of tags increases. The first two challenges mean that we need to update the hashtag counts and suggestions as the hashtags come in, while the last one suggests that storing and sorting all the hashtags ever seen for a given prefix might affect performance. One possibility is to maintain a count of hashtags seen over a specific time period; and for a given prefix, we can then return the five tags with largest counts during that period.
Let’s do a quick back-of-the-envelope calculation on the processing and storage needs for this application. On average, Twitter receives 500 million Tweets per day. They have approx 0.60 hashtags per Tweet (about 40% of all tweets have hashtags). That comes to about 300 million hashtags per day. Assuming the average size of each hashtag to be 5 characters, that gives us about 1.5 billion prefixes to process and store. If we store the counts of each hashtag per hour, that requires 300 million * 24 * 8B or ~ 58GB/day. So the storage requirement comes close to 60GB daily and we need to handle 300 million events daily. The storage calculation is a pessimistic estimate, since there will be duplicates in the hashtags. But as we will see later, we might want to store auxiliary data (top five hashtag recommendations for each prefix) for realtime performance.
Based on the problem description, we can logically represent the solution as a DAG (or a Flow, in CDAP parlance). Each processing node in a Flow graph is called a Flowlet. A Flow can be used to develop our realtime processing solution. We also need to store hashtag counts and the hashtag suggestions for each prefix. For this data storage requirement, we can use a CDAP Dataset. Datasets can be accessed in any Flowlet (or other CDAP components such as MapReduce and Procedures) to perform read/write operations; these operations are wrapped in Transactions by CDAP and thus we need not worry about addressing conflicts and concentrate only on the functionality. One possibility is to maintain a count of hashtags seen over a specific time period; and for a given prefix, we can then return the five tags with largest counts during that period.
Directed Acyclic Graph (DAG) of the TwitterFlow
In the above Flow graph, each Flowlet performs a part of the overall Application logic. The liveTweet Flowlet receives realtime Tweets, extracts the hashtags and sends it to the counterInc Flowlet. The counterInc Flowlet increments the count of that hashtag in a TimeSeriesTable Dataset, tagCount, at a granularity of an hour. This allows us to calculate a score (using a geometric decay function) based on the count history (e.g., past 12 hours) of that tag. The counterInc Flowlet computes this score and sends “<HashTag, Score>” to prefixGen Flowlet.
The prefixGen Flowlet simply generates the prefixes for each hashtag (for the hashtag #SFO, the prefixes are “S”, “SF” and “SFO”) and sends “<Prefix, HashTag, Score>” to the updateReco Flowlet. The updateReco Flowlet uses a KeyValueTable Dataset, suggestMap, to update the hashtag recommendation for the received prefix. The prefix acts as the Key, while the Map of recommendations “<HashTag, Score>” acts as the Value in that Dataset. Upon receiving the “<Prefix, HashTag, Score>” data, the Flowlet retrieves the current suggestions (a Map of “<HashTag, Score>”) from suggestMap and compares the scores with the received score. Then it takes the map entries (including the newly received “<HashTag, Score>”) corresponding to the top five scores and saves it to suggestMap for that prefix Key.
For example, if we receive a hashtag #Ebola, liveTweet sends the tag (“Ebola”) to counterInc. counterInc Flowlet increments the count of the tag “Ebola” for the current hour by 1. It also computes a score, say 12.5, for Ebola based on its count history and sends “<Ebola, 12.5>” to prefixGen. prefixGen generates all the prefixes and sends each of one them to updateReco. Finally, updateReco retrieves the current suggestions for the prefix and compares it to the received score, 12.5. If this score is among the top five, it will save “<Ebola, 12.5>” as one of the recommendations (and discards the one with the lowest score). Otherwise, it retains the old recommendations. This way, we make sure that the recommendations are always up-to-date and that the recommendation lookup for a given prefix essentially takes the same time as the data size grows.
Also, the TimeToLive (TTL) parameter can be set for each of the Datasets so that the rows which are out-of-date (not written to, for a TTL period) can be automatically deleted. Alternatively, we can use a Workflow to schedule a periodic MapReduce job that removes stale data from the tables.
Overall, the entire application takes approx 300 lines of code to implement (including the usage of the Twitter4j APIs).
CDAP and MillWheel
Similar to Google’s MillWheel, CDAP provides realtime processing with a programming model that allows complex streaming systems to be created without requiring distributed systems expertise. CDAP also opens up its platform for additional use cases, especially with the Datasets API that provides developers with higher-level abstractions for common data patterns. Additionally, CDAP allows the integration of batch processing with stream analytics in the same application, which significantly increases an application’s capabilities.
Developers can depend on CDAP’s framework-level correctness and fault-tolerance guarantees, thus vastly restricting the surface area over which bugs and errors can manifest. CDAP’s transaction engine Tephra eliminates the need to checkpoint and replay events when providing exactly-once processing semantics.
If you are interested in building your own realtime data applications, CDAP gives developers access to the same type of powerful technology that is being used at Google. Check out the app I built and download the CDAP SDK!