Learning CDAP with Elasticsearch

Elasticsearch is a popular search engine based on Apache Lucene™. Unlike relational databases, Elasticsearch stores information in documents; each document has a type (with a mapping) that gives information about its schema, and similar documents are stored together in an index. Elasticsearch even allows time-based indices, so documents can be stored with other records created the same day. These documents are then searchable on any field (or on a combination of fields), and Elasticsearch supports full-text search. You may be familiar with Elasticsearch’s power if you use StackOverflow (Stack Exchange) or Wikipedia (Wikimedia); The search functionality for both websites is powered by Elasticsearch.

Experimenting with Elasticsearch is incredibly easy. You can simply download Elasticsearch from their website, unzip it, and run it out of the box. While Elasticsearch also supports easy loading of data (in JSON form), batch loading can be tricky, especially if your data is stored in a traditional SQL database.

Fortunately, CDAP makes it easy to quickly write documents to Elasticsearch. Since this project was my first major change to the CDAP codebase, I will also detail how easy it is to add a source or sink to CDAP.

Batch sources, sinks, and transforms extend the Cask Data Application Platform (CDAP) classes BatchSource, BatchSink, and BatchTransform respectively.  As a result, implementing your own source, sink, or transform is as simple as extending the appropriate class and overriding a few methods. Specifically, I overrode initialize and transform to create a batch sink. Elasticsearch has classes for Hadoop-integration—ESOutputFormat and ESInputFormat—so I used the initialize method of both the Elasticsearch Source and Elasticsearch Batch Sink to set the job configuration parameters, such as the OutputFormatClass and SpeculativeExecution, as required by Elasticsearch.

CDAP’s ETL pipeline can be thought of as a flow from a source to an optional transform or sequence of transforms (where aspects of the data may be changed or edited) to a sink, where the data is written. Each of these three components manipulates the data in some way: the source must transform the data from whichever format it is in into a StructuredRecord, (CDAP’s representation of the data); the transform can then edit the StructuredRecord to modify the data itself or to change the structure of the data; and the sink must transform the StructuredRecord into whichever format the sink is expecting. In the transform method for the Elasticsearch Sink, I converted the StructuredRecord to a Text object storing the JSON document. I then emitted the data using the Emitter, a parameter to the transform methods of all components of the ETL pipeline, and voila! My sink was complete.

In creating the Elasticsearch sink, I modified the dependencies to add Elasticsearch-Hadoop. Because Elasticsearch-Hadoop marks all dependencies as “provided” in scope, I had to import some of the necessary dependencies separately from Elasticsearch-Hadoop. Besides these minor issues involving missing dependencies, Elasticsearch was incredibly easy to integrate with Hadoop.

Sample configuration using the CDAP UI

Sample configuration
To use the sink, you need only input the appropriate fields: the connection address to the Elasticsearch server; the field in the data that contains a unique id for the document; and the index and type for the document.  If the index and/or type don’t already exist, they will automatically be created.

After inputting the information about the source and pressing “start,” CDAP and Hadoop will read the data from the source, apply the appropriate transformations, and load it into Elasticsearch.  Because Elasticsearch will deduce an appropriate mapping for your data, you don’t even need to specify the schema. You can then visualize it with Kibana and search on it with Elasticsearch’s tool, Marvel.

CDAP’s new user interface (see below) makes it easy to experiment with Elasticsearch. Simply choose the appropriate source and sink from the left, and double click on the icons once they appear to enter the configuration. After pressing publish, you will start reading from or writing data to Elasticsearch.

You can start experimenting with CDAP and Elasticsearch in our upcoming CDAP 3.2 release. If you’d like a sneak peek, you can clone the CDAP repository on Github (https://github.com/caskdata/cdap.git) and build the standalone zip with the command:

MAVEN_OPTS="-Xmx1024m -XX:MaxPermSize=128m" mvn package -pl cdap-standalone,cdap-app-templates/cdap-etl,cdap-examples -am -amd -DskipTests -P examples,templates,dist,release,unit-tests

Unzip the SDK, located in cdap-standalone/target; then from the SDK, run the command to start CDAP: ./bin/cdap.sh. You can then visit http://localhost:9999 and interact with the CDAP UI.  More information about downloading and building CDAP can be found at the product’s Github repository.


<< Return to Cask Blog