in Uncategorized

How to Build a Machine Learning App Using Sparkling Water and Apache Spark

The Sparkling Water project is nearing its one-year anniversary, which means Michal Malohlava, our main contributor, has been very busy for the better part of this past year. The Sparkling Water project combines H2O machine-learning algorithms with the execution power of Apache Spark. This means that the project is heavily dependent on two of the fastest growing machine-learning open source projects out there. With every major release of Spark or H2O there are API changes and, less frequently, major data structure changes that affect Sparkling Water. Throw Cloudera releases into the mix, and you have a plethora of git commits dedicated to maintaining a few simple calls to move data between the different platforms.

All that hard work on the backend means that users can easily benefit from programming in a uniform environment that combines both H2O and MLLib algorithms. For the data scientist using a Cloudera-supported distribution of Spark, they can easily incorporate an H2O library into their Spark application. An entry point to the H2O programming world (called H2OContext) is created and allows for the launch of H2O, parallel import of frames into memory and the use of H2O algorithms. This seamless integration into Spark makes launching a Sparkling Water application as easy as launching a Spark application:

bin/spark-submit --class water.YourSparklingWaterApp --master yarn-client sparkling-water-app-assembly.jar

Setup and Installation

Sparkling Water is certified on Cloudera and certified to work with versions of Spark installations that come prepackaged with each distribution. To install Sparkling Water, navigate to and download the version corresponding to the version of Spark available with your Cloudera cluster. Rather than downloading Spark and then distributing on the Cloudera cluster manually, simply set your SPARK_HOME to the spark directory in your opt directory:

$ export SPARK_HOME=/opt/cloudera/parcels/CDH/lib/spark

For ease of use, we are looking into taking advantage of Cloudera Manager and creating distributable H2O and Sparkling Water parcels. This will simplify the management of the various versions of Cloudera, Spark and H2O.


Figure 1 illustrates the concept of technical realization. The application developer implements a Spark application using the Spark API and Sparkling Water library. After submitting the resulting Sparkling Water application into a Spark cluster, the application can create H2OContext, which initializes H2O services on top of Spark nodes. The application can then use any functionality provided by H2O, including its algorithms and interactive UI. H2O uses its own data structure called H2OFrame to represent tabular data, but H2OContext allows H2O to share data with Spark’s RDDs.


Figure 1: Sparkling Water architecture

Figure 2 illustrates the launch sequence of Sparkling Water on a Cloudera cluster. Both Spark and H2O are in-memory processes and all computation occurs in memory with minimal writing to disk, occurring exclusively when specified by the user. Because all the data used in the modeling process needs to read into memory, the recommended method of launching Spark and H2O is through YARN, which dynamically allocates available resources. When the job is finished, you can tear down the Sparkling Water cluster and free up resources for other jobs. All Spark and Sparkling Water applications launched with YARN will be tracked and listed in the history server that you can launch on Cloudera Manager.

YARN will allocate the container to launch the application master in and when you launch with yarn-client, the spark driver runs in the client process and the application master submits a request to the resource manager to spawn the Spark Executor JVMs. Finally, after creating a Sparkling Water cluster, you have access to HDFS to read data into either H2O or Spark.


Figure 2: Sparkling Water on Cloudera [Launching on YARN]

Programming Model

The H2OContext exposes two operators for: (1) publishing Spark RDD as H2O Frame (2) publishing H2O Frame as Spark RDD. The direction from Spark to H2O makes sense when data are prepared with the help of Spark API and passed to H2O algorithms:
// ...
val srdd: SchemaRDD = sqlContext.sql("SELECT * FROM ChicagoCrimeTable where Arrest = 'true'")
// Publish the RDD as H2OFrame
val h2oFrame: H2OFrame = h2oContext.asH2OFrame(srdd)
// ...
val dlModel: DeepLearningModel = new DeepLearning().trainModel.get

The opposite direction from H2O Frame to Spark RDD is used in a situation when the user needs to expose H2O’s frames as Spark’s RDDs. For example:
val prediction: H2OFrame = dlModel.score(testFrame)
// ...
// Exposes prediction frame as RDD
val srdd: SchemaRDD = asSchemaRDD(prediction)

The H2O context simplifies the programming model by introducing implicit conversion to hide asSchemaRDD and asH2OFrame calls.

Sparkling Water excels in situations when you need to call advanced machine learning algorithms from an existing Spark workflow. Furthermore, we found that it is the perfect platform for designing and developing smarter machine learning applications. In the rest of this post, we will demonstrate how to use Sparkling Water to create a simple machine learning application that predicts arrest probability for a given crime in Chicago.

Example Application

We’ve seen some incredible applications of Deep Learning with respect to image recognition and machine translation but this specific use case has to do with public safety; in particular, how Deep Learning can be used to fight crime in the forward-thinking cities of San Francisco and Chicago. The cool thing about these two cities (and many others!) is that they are both open data cities, which means anybody can access city data ranging from transportation information to building maintenance records. So if you are a data scientist or thinking about becoming a data scientist, there are publicly available city-specific datasets you can play with. For this example, we looked at the historical crime data from both Chicago and San Francisco and joined this data with other external data, such as weather and socioeconomic factors, using Spark’s SQL context:


Figure 3: Spark + H2O Workflow

We perform the data import, ad-hoc data munging (parsing the date column, for example), and joining of tables by leveraging the power of Spark. We then publish the Spark RDD as an H2O Frame (Fig. 2).

val sc: SparkContext = // ...
implicit val sqlContext = new SQLContext(sc)
implicit val h2oContext = new H2OContext(sc).start()
import h2oContext._

val weatherTable = asSchemaRDD(createWeatherTable("hdfs://data/chicagoAllWeather.csv"))
registerRDDAsTable(weatherTable, "chicagoWeather")
// Census data
val censusTable = asSchemaRDD(createCensusTable("hdfds://data/chicagoCensus.csv"))
registerRDDAsTable(censusTable, "chicagoCensus")
// Crime data
val crimeTable  = asSchemaRDD(createCrimeTable("hdfs://data/chicagoCrimes10k.csv", "MM/dd/yyyy hh:mm:ss a", "Etc/UTC"))
registerRDDAsTable(crimeTable, "chicagoCrime")

val crimeWeather = sql("""SELECT a.Year, ..., b.meanTemp, ..., c.PER_CAPITA_INCOME
    |FROM chicagoCrime a
    |JOIN chicagoWeather b
    |ON a.Year = b.year AND a.Month = b.month AND a.Day =
    |JOIN chicagoCensus c
    |ON a.Community_Area = c.Community_Area_Number""".stripMargin)

// Publish result as H2O Frame
val crimeWeatherHF: H2OFrame = crimeWeather

// Split data into train and test datasets
val frs = splitFrame(crimeWeatherHF, Array("train.hex", "test.hex"), Array(0.8, 0.2))
val (train, test) = (frs(0), frs(1))

<p><p>Figures 4 and 5 below include some cool visualizations we made of the joined table using H2O’s Flow as part of Sparkling Water.</p></p>

<p><p><img src="" width="1074" height="762" alt="crimeDL_fig2" class="aligncenter" /></p></p>

<p><p><strong>Figure 4: San Francisco crime visualizations</strong></p></p>

<p><p><img src="" width="1080" height="816" alt="crimeDL_fig3" class="aligncenter" /></p></p>

<p><p><strong>Figure 5: Chicago crime visualizations</strong></p></p>

<p><p>Interesting how in both cities’ crime seems to occur most frequently during the winter—a surprising fact given how cold the weather gets in Chicago!</p></p>

<p><p>Using H2O Flow, we were able to look at the arrest rates of every category of recorded crimes in Chicago and compare them with the percentage of total crimes each category represents. Some crimes with the highest arrest rates also occur least frequently, and vice versa.</p></p>

<p><p><img src="" width="1022" height="768" alt="crimeDL_fig4" class="aligncenter" /></p></p>

<p><p><strong>Figure 6: Chicago arrest rates and total % of all crimes by category</strong></p></p>

<p><p>Once the data is transformed to an H2O Frame, we train a deep neural network to predict the likelihood of an arrest for a given crime.</p></p>

def DLModel(train: H2OFrame, test: H2OFrame, response: String,
epochs: Int = 10, l1: Double = 0.0001, l2: Double = 0.0001,
activation: Activation = Activation.RectifierWithDropout, hidden:Array[Int] = Array(200,200))
(implicit h2oContext: H2OContext) : DeepLearningModel = {
import h2oContext._
import hex.deeplearning.DeepLearning
import hex.deeplearning.DeepLearningModel.DeepLearningParameters

val dlParams = new DeepLearningParameters()
dlParams._train = train
dlParams._valid = test
dlParams._response_column = response
dlParams._epochs = epochs
dlParams._l1 = l1
dlParams._l2 = l2
dlParams._activation = activation
dlParams._hidden = hidden

// Create a job
val dl = new DeepLearning(dlParams)
val model = dl.trainModel.get

// Build Deep Learning model
val dlModel = DLModel(train, test, 'Arrest)
// Collect model performance metrics and predictions for test data
val (trainMetricsDL, testMetricsDL) = binomialMetrics(dlModel, train, test)

Here is a screenshot of our H2O Deep Learning model being tuned inside Flow and the resulting AUC curve from scoring the trained model against the validation dataset.</p>


Figure 7: Chicago validation data AUC

The last building block of the application is formed by a function which predicts the arrest rate probability for a new crime. The function combines the Spark API to enrich each incoming crime event with census information and H2O’s deep learning model, which scores the event:


def scoreEvent(crime: Crime, model: Model[<em>,</em>,<em>], censusTable: SchemaRDD)
              (implicit sqlContext: SQLContext, h2oContext: H2OContext): Float = {
  import h2oContext.</em>
  import sqlContext._
  // Create a single row table
  val srdd:SchemaRDD = sqlContext.sparkContext.parallelize(Seq(crime))
  // Join table with census data
  val row: DataFrame = censusTable.join(srdd, on = Option(&#039;Community_Area === &#039;Community_Area_Number)) //.printSchema
  val predictTable = model.score(row)
  val probOfArrest = predictTable.vec(&quot;true&quot;).at(0)


val crimeEvent = Crime(&quot;02/08/2015 11:43:58 PM&quot;, 1811, &quot;NARCOTICS&quot;, &quot;STREET&quot;,false, 422, 4, 7, 46, 18)
val arrestProbability = 100 * scoreEvent(crime, dlModel, censusTable)


Figure 8: Geo-mapped predictions

Because each of the crimes reported comes with latitude-longitude coordinates, we scored our hold out data using the trained model and plotted the predictions on a map of Chicago—specifically, the Downtown district. The color coding corresponds to the model’s prediction for likelihood of an arrest with red being very likely (X > 0.8) and blue being unlikely (X < 0.2). Smart analytics + resource management = safer streets.

Further Reading

If you’re interested in finding out more about Sparkling Water or H2O please join us at H2O World 2015 in Mountain View, CA. We’ll have a series of great speakers including Stanford Professors Rob Tibshirani and Stephen Boyd, Hilary Mason, the Founder of Fast Forward Labs, Erik Huddleston, the CEO of TrendKite, Danqing Zhao, Big Data Director for Macy’s and Monica Rogati, Equity Partner at Data Collective.