What is new in Sparkling Water 2.0.3 Release?

This release has H2O core – 3.10.1.2

Important Feature:

This architectural change allows to connect to existing h2o cluster from sparkling water. This has a benefit that we are no longer affected by Spark killing it’s executors thus we should have more stable solution in environment with lots of h2o/spark node. We are working on article on how to use this very important feature in Sparkling Water 2.0.3.

Release notes: https://0xdata.atlassian.net/secure/ReleaseNote.jspa?projectId=12000&version=16601

2.0.3 (2017-01-04)

  • Bug
    • SW-152 – ClassNotFound with spark-submit
    • SW-266 – H2OContext shouldn’t be Serializable
    • SW-276 – ClassLoading issue when running code using SparkSubmit
    • SW-281 – Update sparkling water tests so they use correct frame locking
    • SW-283 – Set spark.sql.warehouse.dir explicitly in tests because of SPARK-17810
    • SW-284 – Fix CraigsListJobTitlesApp to use local file instead of trying to get one from hdfs
    • SW-285 – Disable timeline service also in python integration tests
    • SW-286 – Add missing test in pysparkling for conversion RDD[Double] -> H2OFrame
    • SW-287 – Fix bug in SparkDataFrame converter where key wasn’t random if not specified
    • SW-288 – Improve performance of Dataset tests and call super.afterAll
    • SW-289 – Fix PySparkling numeric handling during conversions
    • SW-290 – Fixes and improvements of task used to extended h2o jars by sparkling-water classes
    • SW-292 – Fix ScalaCodeHandlerTestSuite
  • New Feature
    • SW-178 – Allow external h2o cluster to act as h2o backend in Sparkling Water
  • Improvement
    • SW-282 – Integrate SW with H2O 3.10.1.2 ( Support for external cluster )
    • SW-291 – Use absolute value for random number in sparkling-water in internal backend
    • SW-295 – H2OConf should be parameterized by SparkConf and not by SparkContext

Please visit https://community.h2o.ai to learn more about it, provide feedback and ask for assistance as needed.

@avkashchauhan | @h2oai

sparklyr: R interface for Apache Spark

This post is reposted from Rstudio’s announcement on sparklyr – Rstudio’s extension for Spark

sparklyr-illustration

  • Connect to Spark from R. The sparklyr package provides a complete dplyr backend.
  • Filter and aggregate Spark datasets then bring them into R for analysis and visualization.
  • Use Spark’s distributed machine learning library from R.
  • Create extensions that call the full Spark API and provide interfaces to Spark packages.

Installation

You can install the sparklyr package from CRAN as follows:

install.packages("sparklyr")

You should also install a local version of Spark for development purposes:

library(sparklyr)
spark_install(version = "1.6.2")

To upgrade to the latest version of sparklyr, run the following command and restart your r session:

devtools::install_github("rstudio/sparklyr")

If you use the RStudio IDE, you should also download the latest preview release of the IDE which includes several enhancements for interacting with Spark (see the RStudio IDE section below for more details).

Connecting to Spark

You can connect to both local instances of Spark as well as remote Spark clusters. Here we’ll connect to a local instance of Spark via the spark_connect function:

library(sparklyr)
sc <- spark_connect(master = "local")

The returned Spark connection (sc) provides a remote dplyr data source to the Spark cluster.

For more information on connecting to remote Spark clusters see the Deployment section of the sparklyr website.

Using dplyr

We can new use all of the available dplyr verbs against the tables within the cluster.

We’ll start by copying some datasets from R into the Spark cluster (note that you may need to install the nycflights13 and Lahman packages in order to execute this code):

install.packages(c("nycflights13", "Lahman"))
library(dplyr)
iris_tbl <- copy_to(sc, iris)
flights_tbl <- copy_to(sc, nycflights13::flights, "flights")
batting_tbl <- copy_to(sc, Lahman::Batting, "batting")
src_tbls(sc)
## [1] "batting" "flights" "iris"

To start with here’s a simple filtering example:

# filter by departure delay and print the first few records
flights_tbl %>% filter(dep_delay == 2)
## Source:   query [?? x 19]
## Database: spark connection master=local[8] app=sparklyr local=TRUE
## 
##     year month   day dep_time sched_dep_time dep_delay arr_time
##    <int> <int> <int>    <int>          <int>     <dbl>    <int>
## 1   2013     1     1      517            515         2      830
## 2   2013     1     1      542            540         2      923
## 3   2013     1     1      702            700         2     1058
## 4   2013     1     1      715            713         2      911
## 5   2013     1     1      752            750         2     1025
## 6   2013     1     1      917            915         2     1206
## 7   2013     1     1      932            930         2     1219
## 8   2013     1     1     1028           1026         2     1350
## 9   2013     1     1     1042           1040         2     1325
## 10  2013     1     1     1231           1229         2     1523
## # ... with more rows, and 12 more variables: sched_arr_time <int>,
## #   arr_delay <dbl>, carrier <chr>, flight <int>, tailnum <chr>,
## #   origin <chr>, dest <chr>, air_time <dbl>, distance <dbl>, hour <dbl>,
## #   minute <dbl>, time_hour <dbl>

Introduction to dplyr provides additional dplyr examples you can try. For example, consider the last example from the tutorial which plots data on flight delays:

delay <- flights_tbl %>% 
  group_by(tailnum) %>%
  summarise(count = n(), dist = mean(distance), delay = mean(arr_delay)) %>%
  filter(count > 20, dist < 2000, !is.na(delay)) %>%
  collect

# plot delays
library(ggplot2)
ggplot(delay, aes(dist, delay)) +
  geom_point(aes(size = count), alpha = 1/2) +
  geom_smooth() +
  scale_size_area(max_size = 2)

ggplot2-flights

Window Functions

dplyr window functions are also supported, for example:

batting_tbl %>%
  select(playerID, yearID, teamID, G, AB:H) %>%
  arrange(playerID, yearID, teamID) %>%
  group_by(playerID) %>%
  filter(min_rank(desc(H)) <= 2 & H > 0)
## Source:   query [?? x 7]
## Database: spark connection master=local[8] app=sparklyr local=TRUE
## Groups: playerID
## 
##     playerID yearID teamID     G    AB     R     H
##        <chr>  <int>  <chr> <int> <int> <int> <int>
## 1  abbotpa01   2000    SEA    35     5     1     2
## 2  abbotpa01   2004    PHI    10    11     1     2
## 3  abnersh01   1992    CHA    97   208    21    58
## 4  abnersh01   1990    SDN    91   184    17    45
## 5  abreujo02   2014    CHA   145   556    80   176
## 6  acevejo01   2001    CIN    18    34     1     4
## 7  acevejo01   2004    CIN    39    43     0     2
## 8  adamsbe01   1919    PHI    78   232    14    54
## 9  adamsbe01   1918    PHI    84   227    10    40
## 10 adamsbu01   1945    SLN   140   578    98   169
## # ... with more rows

For additional documentation on using dplyr with Spark see the dplyr section of the sparklyr website.

Using SQL

It’s also possible to execute SQL queries directly against tables within a Spark cluster. The spark_connection object implements a DBI interface for Spark, so you can use dbGetQuery to execute SQL and return the result as an R data frame:

library(DBI)
iris_preview <- dbGetQuery(sc, "SELECT * FROM iris LIMIT 10")
iris_preview
##    Sepal_Length Sepal_Width Petal_Length Petal_Width Species
## 1           5.1         3.5          1.4         0.2  setosa
## 2           4.9         3.0          1.4         0.2  setosa
## 3           4.7         3.2          1.3         0.2  setosa
## 4           4.6         3.1          1.5         0.2  setosa
## 5           5.0         3.6          1.4         0.2  setosa
## 6           5.4         3.9          1.7         0.4  setosa
## 7           4.6         3.4          1.4         0.3  setosa
## 8           5.0         3.4          1.5         0.2  setosa
## 9           4.4         2.9          1.4         0.2  setosa
## 10          4.9         3.1          1.5         0.1  setosa

Machine Learning

You can orchestrate machine learning algorithms in a Spark cluster via the machine learning functions within sparklyr. These functions connect to a set of high-level APIs built on top of DataFrames that help you create and tune machine learning workflows.

Here’s an example where we use ml_linear_regression to fit a linear regression model. We’ll use the built-in mtcars dataset, and see if we can predict a car’s fuel consumption (mpg) based on its weight (wt), and the number of cylinders the engine contains (cyl). We’ll assume in each case that the relationship between mpg and each of our features is linear.

# copy mtcars into spark
mtcars_tbl <- copy_to(sc, mtcars)

# transform our data set, and then partition into 'training', 'test'
partitions <- mtcars_tbl %>%
  filter(hp >= 100) %>%
  mutate(cyl8 = cyl == 8) %>%
  sdf_partition(training = 0.5, test = 0.5, seed = 1099)

# fit a linear model to the training dataset
fit <- partitions$training %>%
  ml_linear_regression(response = "mpg", features = c("wt", "cyl"))
fit
## Call: ml_linear_regression(., response = "mpg", features = c("wt", "cyl"))
## 
## Coefficients:
## (Intercept)          wt         cyl 
##   37.066699   -2.309504   -1.639546

For linear regression models produced by Spark, we can use summary() to learn a bit more about the quality of our fit, and the statistical significance of each of our predictors.

summary(fit)
## Call: ml_linear_regression(., response = "mpg", features = c("wt", "cyl"))
## 
## Deviance Residuals::
##     Min      1Q  Median      3Q     Max 
## -2.6881 -1.0507 -0.4420  0.4757  3.3858 
## 
## Coefficients:
##             Estimate Std. Error t value  Pr(>|t|)    
## (Intercept) 37.06670    2.76494 13.4059 2.981e-07 ***
## wt          -2.30950    0.84748 -2.7252   0.02341 *  
## cyl         -1.63955    0.58635 -2.7962   0.02084 *  
## ---
## Signif. codes:  0 '***' 0.001 '**' 0.01 '*' 0.05 '.' 0.1 ' ' 1
## 
## R-Squared: 0.8665
## Root Mean Squared Error: 1.799

Spark machine learning supports a wide array of algorithms and feature transformations and as illustrated above it’s easy to chain these functions together with dplyr pipelines. To learn more see the machine learning section.

Reading and Writing Data

You can read and write data in CSV, JSON, and Parquet formats. Data can be stored in HDFS, S3, or on the lcoal filesystem of cluster nodes.

temp_csv <- tempfile(fileext = ".csv")
temp_parquet <- tempfile(fileext = ".parquet")
temp_json <- tempfile(fileext = ".json")

spark_write_csv(iris_tbl, temp_csv)
iris_csv_tbl <- spark_read_csv(sc, "iris_csv", temp_csv)

spark_write_parquet(iris_tbl, temp_parquet)
iris_parquet_tbl <- spark_read_parquet(sc, "iris_parquet", temp_parquet)

spark_write_csv(iris_tbl, temp_json)
iris_json_tbl <- spark_read_csv(sc, "iris_json", temp_json)

src_tbls(sc)
## [1] "batting"      "flights"      "iris"         "iris_csv"    
## [5] "iris_json"    "iris_parquet" "mtcars"

Extensions

The facilities used internally by sparklyr for its dplyr and machine learning interfaces are available to extension packages. Since Spark is a general purpose cluster computing system there are many potential applications for extensions (e.g. interfaces to custom machine learning pipelines, interfaces to 3rd party Spark packages, etc.).

Here’s a simple example that wraps a Spark text file line counting function with an R function:

# write a CSV 
tempfile <- tempfile(fileext = ".csv")
write.csv(nycflights13::flights, tempfile, row.names = FALSE, na = "")

# define an R interface to Spark line counting
count_lines <- function(sc, path) {
  spark_context(sc) %>% 
    invoke("textFile", path, 1L) %>% 
      invoke("count")
}

# call spark to count the lines of the CSV
count_lines(sc, tempfile)
## [1] 336777

To learn more about creating extensions see the Extensions section of the sparklyr website.

dplyr Utilities

You can cache a table into memory with:

tbl_cache(sc, "batting")

and unload from memory using:

tbl_uncache(sc, "batting")

Connection Utilities

You can view the Spark web console using the spark_web function:

spark_web(sc)

You can show the log using the spark_log function:

spark_log(sc, n = 10)
## 16/09/24 07:50:59 INFO ContextCleaner: Cleaned accumulator 224
## 16/09/24 07:50:59 INFO ContextCleaner: Cleaned accumulator 223
## 16/09/24 07:50:59 INFO ContextCleaner: Cleaned accumulator 222
## 16/09/24 07:50:59 INFO BlockManagerInfo: Removed broadcast_64_piece0 on localhost:56324 in memory (size: 20.6 KB, free: 483.0 MB)
## 16/09/24 07:50:59 INFO ContextCleaner: Cleaned accumulator 220
## 16/09/24 07:50:59 INFO Executor: Finished task 0.0 in stage 67.0 (TID 117). 2082 bytes result sent to driver
## 16/09/24 07:50:59 INFO TaskSetManager: Finished task 0.0 in stage 67.0 (TID 117) in 122 ms on localhost (1/1)
## 16/09/24 07:50:59 INFO DAGScheduler: ResultStage 67 (count at NativeMethodAccessorImpl.java:-2) finished in 0.122 s
## 16/09/24 07:50:59 INFO TaskSchedulerImpl: Removed TaskSet 67.0, whose tasks have all completed, from pool 
## 16/09/24 07:50:59 INFO DAGScheduler: Job 47 finished: count at NativeMethodAccessorImpl.java:-2, took 0.125238 s

Finally, we disconnect from Spark:

spark_disconnect(sc)

RStudio IDE

The latest RStudio Preview Release of the RStudio IDE includes integrated support for Spark and the sparklyr package, including tools for:

  • Creating and managing Spark connections
  • Browsing the tables and columns of Spark DataFrames
  • Previewing the first 1,000 rows of Spark DataFrames

Once you’ve installed the sparklyr package, you should find a new Spark pane within the IDE. This pane includes a New Connection dialog which can be used to make connections to local or remote Spark instances:

spark-connect

Once you’ve connected to Spark you’ll be able to browse the tables contained within the Spark cluster:

spark-tab

The Spark DataFrame preview uses the standard RStudio data viewer:

spark-connect

The RStudio IDE features for sparklyr are available now as part of the RStudio Preview Release.

Spam Detection with Sparkling Water and Spark Machine Learning Pipelines

This short post presents the “ham or spam” demo, which has already been posted earlier by Michal Malohlava, using our new API in latest Sparkling Water for Spark 1.6 and earlier versions, unifying Spark and H2O Machine Learning pipelines. It shows how to create a simple Spark Machine Learning pipeline and a model based on the fitted pipeline, which can be later used for prediction whether a particular message is spam or not.

Before diving into the demo steps, we would like to provide some details about the new features in the upcoming Sparkling Water 2.0:

  • Support for Apache Spark 2.0 and backwards compatibility with all previous versions.
  • The ability to run Apache Spark and Scala through H2O’s Flow UI.
  • H2O feature improvements and visualizations for MLlib algorithms, including the ability to score feature importance.
  • Visual intelligence for Apache Spark.
  • The ability to build Ensembles using H2O plus MLlib algorithms.
  • The power to export MLlib models as POJOs (Plain Old Java Objects), which can be easily run on commodity hardware.
  • A toolchain for ML pipelines.
  • Debugging support for Spark pipelines.
  • Model and data governance through Steam.
  • Bringing H2O’s powerful data munging capabilities to Apache Spark.
  • In order to run the code below, start your Spark shell with attached Sparkling Water JAR or use sparkling-shell script that already does this for you.

    You can start the Spark shell with Sparkling Water as follows:

    $SPARK_HOME/bin/spark-submit \
    --class water.SparklingWaterDriver \
    --packages ai.h2o:sparkling-water-examples_2.10:1.6.5 \
    --executor-memory=6g \
    --driver-memory=6g /dev/null
    

    Preferable Spark is Spark 1.6 and Sparkling Water 1.6.x.

    Prepare the coding environment

    Here we just import all required libraries.

    import org.apache.spark.SparkFiles
    import org.apache.spark.ml.PipelineModel
    import org.apache.spark.ml.feature._
    import org.apache.spark.ml.h2o.H2OPipeline
    import org.apache.spark.ml.h2o.features.{ColRemover, DatasetSplitter}
    import org.apache.spark.ml.h2o.models.H2ODeepLearning
    import org.apache.spark.sql.types.{StringType, StructField, StructType}
    import org.apache.spark.sql.{DataFrame, Row, SQLContext}
    import water.support.SparkContextSupport
    import water.fvec.H2OFrame
    

    Add our dataset to Spark environment. The dataset consists of 2 columns where the first one is the label ( ham or spam ) and the second one is the message itself. We don’t have to explicitly ask for Spark context since it’s already available via sc variable.

    val smsDataFileName = "smsData.txt"
    val smsDataFilePath = "examples/smalldata/" + smsDataFileName
    SparkContextSupport.addFiles(sc, smsDataFilePath)
    

    Create SQL support.

    implicit val sqlContext = SQLContext.getOrCreate(sc)
    

    Start H2O services.

    import org.apache.spark.h2o._
    implicit val h2oContext = H2OContext.getOrCreate(sc)
    

    Create helper method which loads the dataset, performs some basic filtering and at last creates Spark’s DataFrame with 2 columns – label and text.

    def load(dataFile: String)(implicit sqlContext: SQLContext): DataFrame = {
    val smsSchema = StructType(Array(
    StructField("label", StringType, nullable = false),
    StructField("text", StringType, nullable = false)))
    val rowRDD = sc.textFile(SparkFiles.get(dataFile)).map(_.split("\t")).filter(r => !r(0).isEmpty).map(p => Row(p(0),p(1)))
    sqlContext.createDataFrame(rowRDD, smsSchema)
    }
    

    Define the pipeline stages

    In Spark, a pipeline is formed of two basic elements – transformers and estimators. Estimators usually encapsulate an algorithm for model generation and their output are transformers. During fitting the pipeline stage, all transformers and estimators are executed and estimators are converted to transformers. The model generated by the pipeline contains only transformers. More about Spark pipelines can be found on Spark’s pipeline overview

    In H2O we created a new type of pipeline stage, which is called OneTimeTransformer. This transformer works similarly to Spark’s estimator in a way that it is only executed during fitting the pipeline stage. It does not however produces a transformer during fitting pipeline stage and the model generated by the pipeline does not contain this OneTimeTransformer.
    An example for one-time transformer is splitting the input data into a validation and training dataset using H2O Frames. We don’t need this one-time transformer to be executed every time we do prediction on the model. We just need this code to be executed when we are fitting the pipeline to the data.

    This pipeline stage is using Spark’s RegexTokenizer to tokenize the messages. We just specify input column and output column for tokenized messages.

    val tokenizer = new RegexTokenizer().
        setInputCol("text").
        setOutputCol("words").
        setMinTokenLength(3).
        setGaps(false).
        setPattern("[a-zA-Z]+")
    

    Remove unnecessary words using Spark’s StopWordsRemover.

    val stopWordsRemover = new StopWordsRemover().
        setInputCol(tokenizer.getOutputCol).
        setOutputCol("filtered").
        setStopWords(Array("the", "a", "", "in", "on", "at", "as", "not", "for")).
        setCaseSensitive(false)
    

    Vectorize the words using Spark’s HashingTF.

    val hashingTF = new HashingTF().
        setNumFeatures(1 << 10).
        setInputCol(tokenizer.getOutputCol).
        setOutputCol("wordToIndex")
    

    Create inverse document frequencies based on hashed words. It creates a numerical representation of how much information a
    given word provides in the whole message.

    val idf = new IDF().
        setMinDocFreq(4).
        setInputCol(hashingTF.getOutputCol).
        setOutputCol("tf_idf")
    

    This pipeline stage is one-time transformer. If setKeep(true) is called in it, it preserves specified columns instead
    of deleting them.

    val colRemover = new ColRemover().
        setKeep(true).
        setColumns(Array[String]("label", "tf_idf"))
    

    Split the dataset and store the splits with the specified keys into H2O’s distributed storage called DKV. This is one-time transformer which is executed only during fitting stage. It determines the frame, which is passed on the output in the following order:

    1. If the train key is specified using setTrainKey method and the key is also specified in the list of keys, then frame with this key is passed on the output
    2. Otherwise, if the default key Р“train.hex” is specified in the list of keys, then frame with this key is passed on the output
    3. Otherwise the first frame specified in the list of keys is passed on the output
    val splitter = new DatasetSplitter().
      setKeys(Array[String]("train.hex", "valid.hex")).
      setRatios(Array[Double](0.8)).
      setTrainKey("train.hex")
    

    Create H2O’s deep learning model.
    If the key specifying the training set is set using setTrainKey, then frame with this key is used as the training frame, otherwise it uses the frame from the previous stage as the training frame

    val dl = new H2ODeepLearning().
      setEpochs(10).
      setL1(0.001).
      setL2(0.0).
      setHidden(Array[Int](200, 200)).
      setValidKey(splitter.getKeys(1)).
      setResponseColumn("label")
    

    Create and fit the pipeline

    Create the pipeline using the stages we defined earlier. As a normal Spark pipeline, it can be formed of Spark’s transformers and estimators, but it also may contain H2O’s one-time transformers.

    val pipeline = new H2OPipeline().
      setStages(Array(tokenizer, stopWordsRemover, hashingTF, idf, colRemover, splitter, dl))
    

    Train the pipeline model by fitting it to a Spark’s DataFrame

    val data = load("smsData.txt")
    val model = pipeline.fit(data)
    

    Now we can optionally save the model to disk and load it again.

    model.write.overwrite().save("/tmp/hamOrSpamPipeline")
    val loadedModel = PipelineModel.load("/tmp/hamOrSpamPipeline")
    

    We can also save this unfitted pipeline to disk and load it again.

    pipeline.write.overwrite().save("/tmp/unfit-hamOrSpamPipeline")
    val loadedPipeline = H2OPipeline.load("/tmp/unfit-hamOrSpamPipeline")
    

    Train the pipeline model again on loaded pipeline just to show deserialized model works as it should.

    val modelOfLoadedPipeline = loadedPipeline.fit(data)
    

    Create helper function for predictions on unlabeled data. This method is using model generated by the pipeline. To make a prediction we call transform method with Spark’s Dataframe as an argument on the generated model. This call executes each transformer specified in the pipeline one after one producing Spark’s DataFrame with predictions.

    def isSpam(smsText: String,
               model: PipelineModel,
               h2oContext: H2OContext,
               hamThreshold: Double = 0.5):Boolean = {
      import h2oContext.implicits._
      val smsTextDF = sc.parallelize(Seq(smsText)).toDF("text") // convert to dataframe with one column named "text"
      val prediction: H2OFrame = model.transform(smsTextDF)
      prediction.vecs()(1).at(0) < hamThreshold
    }
    

    Try it!

    println(isSpam("Michal, h2oworld party tonight in MV?", modelOfLoadedPipeline, h2oContext))
    println(isSpam("We tried to contact you re your reply to our offer of a Video Handset? 750 anytime any networks mins? UNLIMITED TEXT?", loadedModel, h2oContext))
    

    In this article we showed how Spark’s pipelines and H2O algorithms work together seamlessly in Spark environment. We strive to be consistent with Spark API in H2O.ai and make the life of a developer/data scientist easier by hiding H2O internals and exposing the APIs that are natural for Spark users.

    Databricks and H2O Make it Rain with Sparkling Water

    **This blog post was first posted on the Databricks blog here

    Databricks provides a cloud-based integrated workspace on top of Apache Spark for developers and data scientists. H2O.ai has been an early adopter of Apache Spark and has developed Sparkling Water to seamlessly integrate H2O.ai’s machine learning library on top of Spark.

    In this blog, we will demonstrate an integration between the Databricks platform and H2O.ai’s Sparking Water that provides Databricks users with an additional set of machine learning libraries. The integration allows data scientists to utilize Sparkling Water with Spark in a notebook environment more easily, allowing them to seamlessly combine Spark with H2O and get the best of both worlds.

    Let’s begin by preparing a Databricks environment to develop our spam predictor:

    The first step is to log into your Databricks account and create a new library containing Sparkling Water. You can use the Maven coordinates of the Sparkling Water package, for example: h2o:sparkling-water-examples_2.10:1.5.6 (this version works with Spark 1.5)

    1

    The next step is to create a new cluster to run the example:

    2

    For this version of the Sparkling Water library we will use Spark 1.5. The name of the created cluster is “HamOrSpamCluster” – keep it handy as we will need it later.

    The next step is to upload data, you can use table import and upload the smsData.txt file

    3

    Now the environment is ready and you can create a Databricks notebook; connect it to “HamOrSpamCluster” and start building a predictive model!

    The goal of the application is to write a spam detector using a trained model to categorize incoming messages

    First look at the data. It contains raw text messages that are labeled as either spam or ham.
    For example:

    spam +123 Congratulations – in this week’s competition draw u have won the ?1450 prize to claim just call 09050002311 b4280703. T&Cs/stop SMS 08718727868. Over 18 only 150
    ham Yun ah.the ubi one say if ? wan call by tomorrow.call 67441233 look for irene.ere only got bus8,22,65,6

    We need to transform these messages into vectors of numbers and then train a binomial model to predict whether the text message is either SPAM or HAM. For the transformation of a message into a vector of numbers we will use Spark MLlib string tokenization and word to vector transformers. We are going to split messages into tokens and use the TF (term frequency–inverse document frequency) technique to represent words of importance inside the training data set:

    // Representation of a training message
    import org.apache.spark.mllib.linalg.Vector
    case class SMS(target: String, fv: Vector)
    def tokenize(data: RDD[String]): RDD[Seq[String]] = {
    val ignoredWords = Seq("the", "a", "", "in", "on", "at", "as", "not", "for")
    val ignoredChars = Seq(',', ':', ';', '/', '<', '>', '"', '.', '(', ')', '?', '-', '\'','!','0', '1')
    
    val texts = data.map( r=> {
    var smsText = r.toLowerCase
    for( c <- ignoredChars) {
    smsText = smsText.replace(c, ' ')
    }
    
    val words =smsText.split(" ").filter(w => !ignoredWords.contains(w) && w.length>2).distinct
    
    words.toSeq
    })
    texts
    }
    import org.apache.spark.mllib.feature._
    
    def buildIDFModel(tokens: RDD[Seq[String]],
    minDocFreq:Int = 4,
    hashSpaceSize:Int = 1 << 10): (HashingTF, IDFModel, RDD[Vector]) = {
    // Hash strings into the given space
    val hashingTF = new HashingTF(hashSpaceSize)
    val tf = hashingTF.transform(tokens)
    // Build term frequency-inverse document frequency
    val idfModel = new IDF(minDocFreq = minDocFreq).fit(tf)
    val expandedText = idfModel.transform(tf)
    (hashingTF, idfModel, expandedText)
    }
    

    The resulting table will contain the following lines:

    spam 0, 0, 0.31, 0.12, ….
    ham 0.67, 0, 0, 0, 0, 0.003, 0, 0.1

    After this we are free to experiment with different binary classification algorithms in H2O.

    To start using H2O, we need to initialize the H2O service by creating an H2OContext:

    // Create SQL support
    import org.apache.spark.sql._
    implicit val sqlContext = SQLContext.getOrCreate(sc)
    import sqlContext.implicits._
    
    // Start H2O services
    import org.apache.spark.h2o._
    @transient val h2oContext = new H2OContext(sc).start()
    

    H2OContext represents H2O running on top of a Spark cluster. You should see the following output:

    4

    For this demonstration, we will leverage the H2O Deep Learning method:

    // Define function which builds a DL model
    import org.apache.spark.h2o._
    import water.Key
    import <em>root</em>.hex.deeplearning.DeepLearning
    import <em>root</em>.hex.deeplearning.DeepLearningParameters
    import <em>root</em>.hex.deeplearning.DeepLearningModel
    
    def buildDLModel(train: Frame, valid: Frame,
    epochs: Int = 10, l1: Double = 0.001, l2: Double = 0.0,
    hidden: Array[Int] = Array[Int](200, 200))
    (implicit h2oContext: H2OContext): DeepLearningModel = {
    import h2oContext._
    // Build a model
    
    val dlParams = new DeepLearningParameters()
    dlParams._model_id = Key.make("dlModel.hex")
    dlParams._train = train
    dlParams._valid = valid
    dlParams._response_column = 'target
    dlParams._epochs = epochs
    dlParams._l1 = l1
    dlParams._hidden = hidden
    
    // Create a job
    val dl = new DeepLearning(dlParams)
    val dlModel = dl.trainModel.get
    
    // Compute metrics on both datasets
    dlModel.score(train).delete()
    dlModel.score(valid).delete()
    
    dlModel
    }
    

    Here is the final application:

    // Build the application
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.examples.h2o.DemoUtils._
    import scala.io.Source
    
    // load both columns from the table
    val data = sqlContext.sql("SELECT * FROM smsData")
    // Extract response spam or ham
    val hamSpam = data.map( r => r(0).toString)
    val message = data.map( r => r(1).toString)
    // Tokenize message content
    val tokens = tokenize(message)
    // Build IDF model
    var (hashingTF, idfModel, tfidf) = buildIDFModel(tokens)
    
    // Merge response with extracted vectors
    val resultRDD: DataFrame = hamSpam.zip(tfidf).map(v => SMS(v._1, v._2)).toDF
    
    // Publish Spark DataFrame as H2OFrame
    // This H2OFrame has to be transient because we do not want it to be serialized. When calling for example sc.parallelize(..) the object which we are trying to parallelize takes with itself all variables in its surroundings scope - apart from those marked as serialized.
    // 
    @transient val table = h2oContext.asH2OFrame(resultRDD)
    println(sc.parallelize(Array(1,2)))
    // Transform target column into categorical
    table.replace(table.find("target"), table.vec("target").toCategoricalVec()).remove()
    table.update(null)
    
    // Split table
    val keys = Array[String]("train.hex", "valid.hex")
    val ratios = Array<a href="0.8">Double</a>
    @transient val frs = split(table, keys, ratios)
    @transient val train = frs(0)
    @transient val valid = frs(1)
    table.delete()
    
    // Build a model
    @transient val dlModel = buildDLModel(train, valid)(h2oContext)
    

    And voila we have a Deep Learning Model ready to detect spam

    At this point you can explore quality of the model:

    scala
    // Collect model metrics and evaluate model quality
    import water.app.ModelMetricsSupport
    val validMetrics = ModelMetricsSupport.binomialMM(dlModel, valid)
    println(validMetrics.auc._auc)


    You can also use the H2O Flow UI by clicking on the URL provided when you instantiated the H2O Context.

    5

    At this point we have everything ready to create a spam detector:

    scala
    // Create a spam detector - a method which will return SPAM or HAM for given text message
    import water.DKV._
    // Spam detector
    def isSpam(msg: String,
    modelId: String,
    hashingTF: HashingTF,
    idfModel: IDFModel,
    h2oContext: H2OContext,
    hamThreshold: Double = 0.5):String = {
    val dlModel: DeepLearningModel = water.DKV.getGet(modelId)
    val msgRdd = sc.parallelize(Seq(msg))
    val msgVector: DataFrame = idfModel.transform(
    hashingTF.transform (
    tokenize (msgRdd))).map(v =&gt; SMS(&quot;?&quot;, v)).toDF
    val msgTable: H2OFrame = h2oContext.asH2OFrame(msgVector)
    msgTable.remove(0) // remove first column
    val prediction = dlModel.score(msgTable)
    //println(prediction)
    if (prediction.vecs()(1).at(0) &lt; hamThreshold) &quot;SPAM DETECTED!&quot; else &quot;HAM&quot;
    }

    The method uses built-in models to transform incoming text message and provide a prediction – SPAM or HAM. For example:

    6

    We’ve shown a fast and easy way to build a spam detector with Databricks and Sparkling Water. To try this out for yourself, register for a free 14-day trial of Databricks and check out the Sparkling Water example in the Databricks Guide.