Drink in the Data with H2O at Strata SJ 2016

It’s about to rain data in San Jose when Strata + Hadoop World comes to town March 29 – March 31st.

H2O has a waterfall of action happening at the show. Here’s a rundown of what’s on tap.
Keep it handy so you have less chance of FOMO (fear of missing out).

Hang out with H2O at Booth #1225 to learn more about how machine learning can help transform your business and find us throughout the conference:

Tuesday, March 29th

Wednesday, March 30th

  • 12:45pm – 1:15pm Meet the Makers: The brains and innovation behind the leading machine learning solution is on hand to hack with you
    • #AskArno – Arno Candel, Chief Architect and H2O algorithm expert
    • #RuReady with Matt Dowle, H2O Hacker and author of R data.table
    • #SparkUp with Michal Malohlava principal developer of Sparkling Water
    • #InWithErin – Erin LeDell, Machine Learning Scientist and H2O ensembles expert
  • 2:40pm – 3:20pm H2O highlighted in An introduction to Transamerica’s product recommendation platform
  • 5:50pm – 6:50pm Booth Crawl. Have a beer on us at Booth #1225
  • 7:00pm – 9:00pm Let it Flow with H2O – Drinks + Data at the Arcadia Lounge. Grab your invite at Booth #1225

Thursday, March 31st

  • 12:45pm – 1:15pm Ask Transamerica. Vishal Bamba and Nitin Prabhu of Transamerica join us at Booth #1225 for Q&A with you!

Databricks and H2O Make it Rain with Sparkling Water

**This blog post was first posted on the Databricks blog here

Databricks provides a cloud-based integrated workspace on top of Apache Spark for developers and data scientists. H2O.ai has been an early adopter of Apache Spark and has developed Sparkling Water to seamlessly integrate H2O.ai’s machine learning library on top of Spark.

In this blog, we will demonstrate an integration between the Databricks platform and H2O.ai’s Sparking Water that provides Databricks users with an additional set of machine learning libraries. The integration allows data scientists to utilize Sparkling Water with Spark in a notebook environment more easily, allowing them to seamlessly combine Spark with H2O and get the best of both worlds.

Let’s begin by preparing a Databricks environment to develop our spam predictor:

The first step is to log into your Databricks account and create a new library containing Sparkling Water. You can use the Maven coordinates of the Sparkling Water package, for example: h2o:sparkling-water-examples_2.10:1.5.6 (this version works with Spark 1.5)

1

The next step is to create a new cluster to run the example:

2

For this version of the Sparkling Water library we will use Spark 1.5. The name of the created cluster is “HamOrSpamCluster” – keep it handy as we will need it later.

The next step is to upload data, you can use table import and upload the smsData.txt file

3

Now the environment is ready and you can create a Databricks notebook; connect it to “HamOrSpamCluster” and start building a predictive model!

The goal of the application is to write a spam detector using a trained model to categorize incoming messages

First look at the data. It contains raw text messages that are labeled as either spam or ham.
For example:

spam +123 Congratulations – in this week’s competition draw u have won the ?1450 prize to claim just call 09050002311 b4280703. T&Cs/stop SMS 08718727868. Over 18 only 150
ham Yun ah.the ubi one say if ? wan call by tomorrow.call 67441233 look for irene.ere only got bus8,22,65,6

We need to transform these messages into vectors of numbers and then train a binomial model to predict whether the text message is either SPAM or HAM. For the transformation of a message into a vector of numbers we will use Spark MLlib string tokenization and word to vector transformers. We are going to split messages into tokens and use the TF (term frequency–inverse document frequency) technique to represent words of importance inside the training data set:

// Representation of a training message
import org.apache.spark.mllib.linalg.Vector
case class SMS(target: String, fv: Vector)
def tokenize(data: RDD[String]): RDD[Seq[String]] = {
val ignoredWords = Seq("the", "a", "", "in", "on", "at", "as", "not", "for")
val ignoredChars = Seq(',', ':', ';', '/', '<', '>', '"', '.', '(', ')', '?', '-', '\'','!','0', '1')

val texts = data.map( r=> {
var smsText = r.toLowerCase
for( c <- ignoredChars) {
smsText = smsText.replace(c, ' ')
}

val words =smsText.split(" ").filter(w => !ignoredWords.contains(w) && w.length>2).distinct

words.toSeq
})
texts
}
import org.apache.spark.mllib.feature._

def buildIDFModel(tokens: RDD[Seq[String]],
minDocFreq:Int = 4,
hashSpaceSize:Int = 1 << 10): (HashingTF, IDFModel, RDD[Vector]) = {
// Hash strings into the given space
val hashingTF = new HashingTF(hashSpaceSize)
val tf = hashingTF.transform(tokens)
// Build term frequency-inverse document frequency
val idfModel = new IDF(minDocFreq = minDocFreq).fit(tf)
val expandedText = idfModel.transform(tf)
(hashingTF, idfModel, expandedText)
}

The resulting table will contain the following lines:

spam 0, 0, 0.31, 0.12, ….
ham 0.67, 0, 0, 0, 0, 0.003, 0, 0.1

After this we are free to experiment with different binary classification algorithms in H2O.

To start using H2O, we need to initialize the H2O service by creating an H2OContext:

// Create SQL support
import org.apache.spark.sql._
implicit val sqlContext = SQLContext.getOrCreate(sc)
import sqlContext.implicits._

// Start H2O services
import org.apache.spark.h2o._
@transient val h2oContext = new H2OContext(sc).start()

H2OContext represents H2O running on top of a Spark cluster. You should see the following output:

4

For this demonstration, we will leverage the H2O Deep Learning method:

// Define function which builds a DL model
import org.apache.spark.h2o._
import water.Key
import <em>root</em>.hex.deeplearning.DeepLearning
import <em>root</em>.hex.deeplearning.DeepLearningParameters
import <em>root</em>.hex.deeplearning.DeepLearningModel

def buildDLModel(train: Frame, valid: Frame,
epochs: Int = 10, l1: Double = 0.001, l2: Double = 0.0,
hidden: Array[Int] = Array[Int](200, 200))
(implicit h2oContext: H2OContext): DeepLearningModel = {
import h2oContext._
// Build a model

val dlParams = new DeepLearningParameters()
dlParams._model_id = Key.make("dlModel.hex")
dlParams._train = train
dlParams._valid = valid
dlParams._response_column = 'target
dlParams._epochs = epochs
dlParams._l1 = l1
dlParams._hidden = hidden

// Create a job
val dl = new DeepLearning(dlParams)
val dlModel = dl.trainModel.get

// Compute metrics on both datasets
dlModel.score(train).delete()
dlModel.score(valid).delete()

dlModel
}

Here is the final application:

// Build the application

import org.apache.spark.rdd.RDD
import org.apache.spark.examples.h2o.DemoUtils._
import scala.io.Source

// load both columns from the table
val data = sqlContext.sql("SELECT * FROM smsData")
// Extract response spam or ham
val hamSpam = data.map( r => r(0).toString)
val message = data.map( r => r(1).toString)
// Tokenize message content
val tokens = tokenize(message)
// Build IDF model
var (hashingTF, idfModel, tfidf) = buildIDFModel(tokens)

// Merge response with extracted vectors
val resultRDD: DataFrame = hamSpam.zip(tfidf).map(v => SMS(v._1, v._2)).toDF

// Publish Spark DataFrame as H2OFrame
// This H2OFrame has to be transient because we do not want it to be serialized. When calling for example sc.parallelize(..) the object which we are trying to parallelize takes with itself all variables in its surroundings scope - apart from those marked as serialized.
// 
@transient val table = h2oContext.asH2OFrame(resultRDD)
println(sc.parallelize(Array(1,2)))
// Transform target column into categorical
table.replace(table.find("target"), table.vec("target").toCategoricalVec()).remove()
table.update(null)

// Split table
val keys = Array[String]("train.hex", "valid.hex")
val ratios = Array<a href="0.8">Double</a>
@transient val frs = split(table, keys, ratios)
@transient val train = frs(0)
@transient val valid = frs(1)
table.delete()

// Build a model
@transient val dlModel = buildDLModel(train, valid)(h2oContext)

And voila we have a Deep Learning Model ready to detect spam

At this point you can explore quality of the model:

scala
// Collect model metrics and evaluate model quality
import water.app.ModelMetricsSupport
val validMetrics = ModelMetricsSupport.binomialMM(dlModel, valid)
println(validMetrics.auc._auc)


You can also use the H2O Flow UI by clicking on the URL provided when you instantiated the H2O Context.

5

At this point we have everything ready to create a spam detector:

scala
// Create a spam detector - a method which will return SPAM or HAM for given text message
import water.DKV._
// Spam detector
def isSpam(msg: String,
modelId: String,
hashingTF: HashingTF,
idfModel: IDFModel,
h2oContext: H2OContext,
hamThreshold: Double = 0.5):String = {
val dlModel: DeepLearningModel = water.DKV.getGet(modelId)
val msgRdd = sc.parallelize(Seq(msg))
val msgVector: DataFrame = idfModel.transform(
hashingTF.transform (
tokenize (msgRdd))).map(v =&gt; SMS(&quot;?&quot;, v)).toDF
val msgTable: H2OFrame = h2oContext.asH2OFrame(msgVector)
msgTable.remove(0) // remove first column
val prediction = dlModel.score(msgTable)
//println(prediction)
if (prediction.vecs()(1).at(0) &lt; hamThreshold) &quot;SPAM DETECTED!&quot; else &quot;HAM&quot;
}

The method uses built-in models to transform incoming text message and provide a prediction – SPAM or HAM. For example:

6

We’ve shown a fast and easy way to build a spam detector with Databricks and Sparkling Water. To try this out for yourself, register for a free 14-day trial of Databricks and check out the Sparkling Water example in the Databricks Guide.