What is new in Sparkling Water 2.0.3 Release?

This release has H2O core – 3.10.1.2

Important Feature:

This architectural change allows to connect to existing h2o cluster from sparkling water. This has a benefit that we are no longer affected by Spark killing it’s executors thus we should have more stable solution in environment with lots of h2o/spark node. We are working on article on how to use this very important feature in Sparkling Water 2.0.3.

Release notes: https://0xdata.atlassian.net/secure/ReleaseNote.jspa?projectId=12000&version=16601

2.0.3 (2017-01-04)

  • Bug
    • SW-152 – ClassNotFound with spark-submit
    • SW-266 – H2OContext shouldn’t be Serializable
    • SW-276 – ClassLoading issue when running code using SparkSubmit
    • SW-281 – Update sparkling water tests so they use correct frame locking
    • SW-283 – Set spark.sql.warehouse.dir explicitly in tests because of SPARK-17810
    • SW-284 – Fix CraigsListJobTitlesApp to use local file instead of trying to get one from hdfs
    • SW-285 – Disable timeline service also in python integration tests
    • SW-286 – Add missing test in pysparkling for conversion RDD[Double] -> H2OFrame
    • SW-287 – Fix bug in SparkDataFrame converter where key wasn’t random if not specified
    • SW-288 – Improve performance of Dataset tests and call super.afterAll
    • SW-289 – Fix PySparkling numeric handling during conversions
    • SW-290 – Fixes and improvements of task used to extended h2o jars by sparkling-water classes
    • SW-292 – Fix ScalaCodeHandlerTestSuite
  • New Feature
    • SW-178 – Allow external h2o cluster to act as h2o backend in Sparkling Water
  • Improvement
    • SW-282 – Integrate SW with H2O 3.10.1.2 ( Support for external cluster )
    • SW-291 – Use absolute value for random number in sparkling-water in internal backend
    • SW-295 – H2OConf should be parameterized by SparkConf and not by SparkContext

Please visit https://community.h2o.ai to learn more about it, provide feedback and ask for assistance as needed.

@avkashchauhan | @h2oai

Spam Detection with Sparkling Water and Spark Machine Learning Pipelines

This short post presents the “ham or spam” demo, which has already been posted earlier by Michal Malohlava, using our new API in latest Sparkling Water for Spark 1.6 and earlier versions, unifying Spark and H2O Machine Learning pipelines. It shows how to create a simple Spark Machine Learning pipeline and a model based on the fitted pipeline, which can be later used for prediction whether a particular message is spam or not.

Before diving into the demo steps, we would like to provide some details about the new features in the upcoming Sparkling Water 2.0:

  • Support for Apache Spark 2.0 and backwards compatibility with all previous versions.
  • The ability to run Apache Spark and Scala through H2O’s Flow UI.
  • H2O feature improvements and visualizations for MLlib algorithms, including the ability to score feature importance.
  • Visual intelligence for Apache Spark.
  • The ability to build Ensembles using H2O plus MLlib algorithms.
  • The power to export MLlib models as POJOs (Plain Old Java Objects), which can be easily run on commodity hardware.
  • A toolchain for ML pipelines.
  • Debugging support for Spark pipelines.
  • Model and data governance through Steam.
  • Bringing H2O’s powerful data munging capabilities to Apache Spark.
  • In order to run the code below, start your Spark shell with attached Sparkling Water JAR or use sparkling-shell script that already does this for you.

    You can start the Spark shell with Sparkling Water as follows:

    $SPARK_HOME/bin/spark-submit \
    --class water.SparklingWaterDriver \
    --packages ai.h2o:sparkling-water-examples_2.10:1.6.5 \
    --executor-memory=6g \
    --driver-memory=6g /dev/null
    

    Preferable Spark is Spark 1.6 and Sparkling Water 1.6.x.

    Prepare the coding environment

    Here we just import all required libraries.

    import org.apache.spark.SparkFiles
    import org.apache.spark.ml.PipelineModel
    import org.apache.spark.ml.feature._
    import org.apache.spark.ml.h2o.H2OPipeline
    import org.apache.spark.ml.h2o.features.{ColRemover, DatasetSplitter}
    import org.apache.spark.ml.h2o.models.H2ODeepLearning
    import org.apache.spark.sql.types.{StringType, StructField, StructType}
    import org.apache.spark.sql.{DataFrame, Row, SQLContext}
    import water.support.SparkContextSupport
    import water.fvec.H2OFrame
    

    Add our dataset to Spark environment. The dataset consists of 2 columns where the first one is the label ( ham or spam ) and the second one is the message itself. We don’t have to explicitly ask for Spark context since it’s already available via sc variable.

    val smsDataFileName = "smsData.txt"
    val smsDataFilePath = "examples/smalldata/" + smsDataFileName
    SparkContextSupport.addFiles(sc, smsDataFilePath)
    

    Create SQL support.

    implicit val sqlContext = SQLContext.getOrCreate(sc)
    

    Start H2O services.

    import org.apache.spark.h2o._
    implicit val h2oContext = H2OContext.getOrCreate(sc)
    

    Create helper method which loads the dataset, performs some basic filtering and at last creates Spark’s DataFrame with 2 columns – label and text.

    def load(dataFile: String)(implicit sqlContext: SQLContext): DataFrame = {
    val smsSchema = StructType(Array(
    StructField("label", StringType, nullable = false),
    StructField("text", StringType, nullable = false)))
    val rowRDD = sc.textFile(SparkFiles.get(dataFile)).map(_.split("\t")).filter(r => !r(0).isEmpty).map(p => Row(p(0),p(1)))
    sqlContext.createDataFrame(rowRDD, smsSchema)
    }
    

    Define the pipeline stages

    In Spark, a pipeline is formed of two basic elements – transformers and estimators. Estimators usually encapsulate an algorithm for model generation and their output are transformers. During fitting the pipeline stage, all transformers and estimators are executed and estimators are converted to transformers. The model generated by the pipeline contains only transformers. More about Spark pipelines can be found on Spark’s pipeline overview

    In H2O we created a new type of pipeline stage, which is called OneTimeTransformer. This transformer works similarly to Spark’s estimator in a way that it is only executed during fitting the pipeline stage. It does not however produces a transformer during fitting pipeline stage and the model generated by the pipeline does not contain this OneTimeTransformer.
    An example for one-time transformer is splitting the input data into a validation and training dataset using H2O Frames. We don’t need this one-time transformer to be executed every time we do prediction on the model. We just need this code to be executed when we are fitting the pipeline to the data.

    This pipeline stage is using Spark’s RegexTokenizer to tokenize the messages. We just specify input column and output column for tokenized messages.

    val tokenizer = new RegexTokenizer().
        setInputCol("text").
        setOutputCol("words").
        setMinTokenLength(3).
        setGaps(false).
        setPattern("[a-zA-Z]+")
    

    Remove unnecessary words using Spark’s StopWordsRemover.

    val stopWordsRemover = new StopWordsRemover().
        setInputCol(tokenizer.getOutputCol).
        setOutputCol("filtered").
        setStopWords(Array("the", "a", "", "in", "on", "at", "as", "not", "for")).
        setCaseSensitive(false)
    

    Vectorize the words using Spark’s HashingTF.

    val hashingTF = new HashingTF().
        setNumFeatures(1 << 10).
        setInputCol(tokenizer.getOutputCol).
        setOutputCol("wordToIndex")
    

    Create inverse document frequencies based on hashed words. It creates a numerical representation of how much information a
    given word provides in the whole message.

    val idf = new IDF().
        setMinDocFreq(4).
        setInputCol(hashingTF.getOutputCol).
        setOutputCol("tf_idf")
    

    This pipeline stage is one-time transformer. If setKeep(true) is called in it, it preserves specified columns instead
    of deleting them.

    val colRemover = new ColRemover().
        setKeep(true).
        setColumns(Array[String]("label", "tf_idf"))
    

    Split the dataset and store the splits with the specified keys into H2O’s distributed storage called DKV. This is one-time transformer which is executed only during fitting stage. It determines the frame, which is passed on the output in the following order:

    1. If the train key is specified using setTrainKey method and the key is also specified in the list of keys, then frame with this key is passed on the output
    2. Otherwise, if the default key Р“train.hex” is specified in the list of keys, then frame with this key is passed on the output
    3. Otherwise the first frame specified in the list of keys is passed on the output
    val splitter = new DatasetSplitter().
      setKeys(Array[String]("train.hex", "valid.hex")).
      setRatios(Array[Double](0.8)).
      setTrainKey("train.hex")
    

    Create H2O’s deep learning model.
    If the key specifying the training set is set using setTrainKey, then frame with this key is used as the training frame, otherwise it uses the frame from the previous stage as the training frame

    val dl = new H2ODeepLearning().
      setEpochs(10).
      setL1(0.001).
      setL2(0.0).
      setHidden(Array[Int](200, 200)).
      setValidKey(splitter.getKeys(1)).
      setResponseColumn("label")
    

    Create and fit the pipeline

    Create the pipeline using the stages we defined earlier. As a normal Spark pipeline, it can be formed of Spark’s transformers and estimators, but it also may contain H2O’s one-time transformers.

    val pipeline = new H2OPipeline().
      setStages(Array(tokenizer, stopWordsRemover, hashingTF, idf, colRemover, splitter, dl))
    

    Train the pipeline model by fitting it to a Spark’s DataFrame

    val data = load("smsData.txt")
    val model = pipeline.fit(data)
    

    Now we can optionally save the model to disk and load it again.

    model.write.overwrite().save("/tmp/hamOrSpamPipeline")
    val loadedModel = PipelineModel.load("/tmp/hamOrSpamPipeline")
    

    We can also save this unfitted pipeline to disk and load it again.

    pipeline.write.overwrite().save("/tmp/unfit-hamOrSpamPipeline")
    val loadedPipeline = H2OPipeline.load("/tmp/unfit-hamOrSpamPipeline")
    

    Train the pipeline model again on loaded pipeline just to show deserialized model works as it should.

    val modelOfLoadedPipeline = loadedPipeline.fit(data)
    

    Create helper function for predictions on unlabeled data. This method is using model generated by the pipeline. To make a prediction we call transform method with Spark’s Dataframe as an argument on the generated model. This call executes each transformer specified in the pipeline one after one producing Spark’s DataFrame with predictions.

    def isSpam(smsText: String,
               model: PipelineModel,
               h2oContext: H2OContext,
               hamThreshold: Double = 0.5):Boolean = {
      import h2oContext.implicits._
      val smsTextDF = sc.parallelize(Seq(smsText)).toDF("text") // convert to dataframe with one column named "text"
      val prediction: H2OFrame = model.transform(smsTextDF)
      prediction.vecs()(1).at(0) < hamThreshold
    }
    

    Try it!

    println(isSpam("Michal, h2oworld party tonight in MV?", modelOfLoadedPipeline, h2oContext))
    println(isSpam("We tried to contact you re your reply to our offer of a Video Handset? 750 anytime any networks mins? UNLIMITED TEXT?", loadedModel, h2oContext))
    

    In this article we showed how Spark’s pipelines and H2O algorithms work together seamlessly in Spark environment. We strive to be consistent with Spark API in H2O.ai and make the life of a developer/data scientist easier by hiding H2O internals and exposing the APIs that are natural for Spark users.

    Databricks and H2O Make it Rain with Sparkling Water

    **This blog post was first posted on the Databricks blog here

    Databricks provides a cloud-based integrated workspace on top of Apache Spark for developers and data scientists. H2O.ai has been an early adopter of Apache Spark and has developed Sparkling Water to seamlessly integrate H2O.ai’s machine learning library on top of Spark.

    In this blog, we will demonstrate an integration between the Databricks platform and H2O.ai’s Sparking Water that provides Databricks users with an additional set of machine learning libraries. The integration allows data scientists to utilize Sparkling Water with Spark in a notebook environment more easily, allowing them to seamlessly combine Spark with H2O and get the best of both worlds.

    Let’s begin by preparing a Databricks environment to develop our spam predictor:

    The first step is to log into your Databricks account and create a new library containing Sparkling Water. You can use the Maven coordinates of the Sparkling Water package, for example: h2o:sparkling-water-examples_2.10:1.5.6 (this version works with Spark 1.5)

    1

    The next step is to create a new cluster to run the example:

    2

    For this version of the Sparkling Water library we will use Spark 1.5. The name of the created cluster is “HamOrSpamCluster” – keep it handy as we will need it later.

    The next step is to upload data, you can use table import and upload the smsData.txt file

    3

    Now the environment is ready and you can create a Databricks notebook; connect it to “HamOrSpamCluster” and start building a predictive model!

    The goal of the application is to write a spam detector using a trained model to categorize incoming messages

    First look at the data. It contains raw text messages that are labeled as either spam or ham.
    For example:

    spam +123 Congratulations – in this week’s competition draw u have won the ?1450 prize to claim just call 09050002311 b4280703. T&Cs/stop SMS 08718727868. Over 18 only 150
    ham Yun ah.the ubi one say if ? wan call by tomorrow.call 67441233 look for irene.ere only got bus8,22,65,6

    We need to transform these messages into vectors of numbers and then train a binomial model to predict whether the text message is either SPAM or HAM. For the transformation of a message into a vector of numbers we will use Spark MLlib string tokenization and word to vector transformers. We are going to split messages into tokens and use the TF (term frequency–inverse document frequency) technique to represent words of importance inside the training data set:

    // Representation of a training message
    import org.apache.spark.mllib.linalg.Vector
    case class SMS(target: String, fv: Vector)
    def tokenize(data: RDD[String]): RDD[Seq[String]] = {
    val ignoredWords = Seq("the", "a", "", "in", "on", "at", "as", "not", "for")
    val ignoredChars = Seq(',', ':', ';', '/', '<', '>', '"', '.', '(', ')', '?', '-', '\'','!','0', '1')
    
    val texts = data.map( r=> {
    var smsText = r.toLowerCase
    for( c <- ignoredChars) {
    smsText = smsText.replace(c, ' ')
    }
    
    val words =smsText.split(" ").filter(w => !ignoredWords.contains(w) && w.length>2).distinct
    
    words.toSeq
    })
    texts
    }
    import org.apache.spark.mllib.feature._
    
    def buildIDFModel(tokens: RDD[Seq[String]],
    minDocFreq:Int = 4,
    hashSpaceSize:Int = 1 << 10): (HashingTF, IDFModel, RDD[Vector]) = {
    // Hash strings into the given space
    val hashingTF = new HashingTF(hashSpaceSize)
    val tf = hashingTF.transform(tokens)
    // Build term frequency-inverse document frequency
    val idfModel = new IDF(minDocFreq = minDocFreq).fit(tf)
    val expandedText = idfModel.transform(tf)
    (hashingTF, idfModel, expandedText)
    }
    

    The resulting table will contain the following lines:

    spam 0, 0, 0.31, 0.12, ….
    ham 0.67, 0, 0, 0, 0, 0.003, 0, 0.1

    After this we are free to experiment with different binary classification algorithms in H2O.

    To start using H2O, we need to initialize the H2O service by creating an H2OContext:

    // Create SQL support
    import org.apache.spark.sql._
    implicit val sqlContext = SQLContext.getOrCreate(sc)
    import sqlContext.implicits._
    
    // Start H2O services
    import org.apache.spark.h2o._
    @transient val h2oContext = new H2OContext(sc).start()
    

    H2OContext represents H2O running on top of a Spark cluster. You should see the following output:

    4

    For this demonstration, we will leverage the H2O Deep Learning method:

    // Define function which builds a DL model
    import org.apache.spark.h2o._
    import water.Key
    import <em>root</em>.hex.deeplearning.DeepLearning
    import <em>root</em>.hex.deeplearning.DeepLearningParameters
    import <em>root</em>.hex.deeplearning.DeepLearningModel
    
    def buildDLModel(train: Frame, valid: Frame,
    epochs: Int = 10, l1: Double = 0.001, l2: Double = 0.0,
    hidden: Array[Int] = Array[Int](200, 200))
    (implicit h2oContext: H2OContext): DeepLearningModel = {
    import h2oContext._
    // Build a model
    
    val dlParams = new DeepLearningParameters()
    dlParams._model_id = Key.make("dlModel.hex")
    dlParams._train = train
    dlParams._valid = valid
    dlParams._response_column = 'target
    dlParams._epochs = epochs
    dlParams._l1 = l1
    dlParams._hidden = hidden
    
    // Create a job
    val dl = new DeepLearning(dlParams)
    val dlModel = dl.trainModel.get
    
    // Compute metrics on both datasets
    dlModel.score(train).delete()
    dlModel.score(valid).delete()
    
    dlModel
    }
    

    Here is the final application:

    // Build the application
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.examples.h2o.DemoUtils._
    import scala.io.Source
    
    // load both columns from the table
    val data = sqlContext.sql("SELECT * FROM smsData")
    // Extract response spam or ham
    val hamSpam = data.map( r => r(0).toString)
    val message = data.map( r => r(1).toString)
    // Tokenize message content
    val tokens = tokenize(message)
    // Build IDF model
    var (hashingTF, idfModel, tfidf) = buildIDFModel(tokens)
    
    // Merge response with extracted vectors
    val resultRDD: DataFrame = hamSpam.zip(tfidf).map(v => SMS(v._1, v._2)).toDF
    
    // Publish Spark DataFrame as H2OFrame
    // This H2OFrame has to be transient because we do not want it to be serialized. When calling for example sc.parallelize(..) the object which we are trying to parallelize takes with itself all variables in its surroundings scope - apart from those marked as serialized.
    // 
    @transient val table = h2oContext.asH2OFrame(resultRDD)
    println(sc.parallelize(Array(1,2)))
    // Transform target column into categorical
    table.replace(table.find("target"), table.vec("target").toCategoricalVec()).remove()
    table.update(null)
    
    // Split table
    val keys = Array[String]("train.hex", "valid.hex")
    val ratios = Array<a href="0.8">Double</a>
    @transient val frs = split(table, keys, ratios)
    @transient val train = frs(0)
    @transient val valid = frs(1)
    table.delete()
    
    // Build a model
    @transient val dlModel = buildDLModel(train, valid)(h2oContext)
    

    And voila we have a Deep Learning Model ready to detect spam

    At this point you can explore quality of the model:

    scala
    // Collect model metrics and evaluate model quality
    import water.app.ModelMetricsSupport
    val validMetrics = ModelMetricsSupport.binomialMM(dlModel, valid)
    println(validMetrics.auc._auc)


    You can also use the H2O Flow UI by clicking on the URL provided when you instantiated the H2O Context.

    5

    At this point we have everything ready to create a spam detector:

    scala
    // Create a spam detector - a method which will return SPAM or HAM for given text message
    import water.DKV._
    // Spam detector
    def isSpam(msg: String,
    modelId: String,
    hashingTF: HashingTF,
    idfModel: IDFModel,
    h2oContext: H2OContext,
    hamThreshold: Double = 0.5):String = {
    val dlModel: DeepLearningModel = water.DKV.getGet(modelId)
    val msgRdd = sc.parallelize(Seq(msg))
    val msgVector: DataFrame = idfModel.transform(
    hashingTF.transform (
    tokenize (msgRdd))).map(v =&gt; SMS(&quot;?&quot;, v)).toDF
    val msgTable: H2OFrame = h2oContext.asH2OFrame(msgVector)
    msgTable.remove(0) // remove first column
    val prediction = dlModel.score(msgTable)
    //println(prediction)
    if (prediction.vecs()(1).at(0) &lt; hamThreshold) &quot;SPAM DETECTED!&quot; else &quot;HAM&quot;
    }

    The method uses built-in models to transform incoming text message and provide a prediction – SPAM or HAM. For example:

    6

    We’ve shown a fast and easy way to build a spam detector with Databricks and Sparkling Water. To try this out for yourself, register for a free 14-day trial of Databricks and check out the Sparkling Water example in the Databricks Guide.