H2O’s AutoML in Spark

This blog post demonstrates how H2O’s powerful automatic machine learning can be used together with the Spark in Sparkling Water.

We show the benefits of Spark & H2O integration, use Spark for data munging tasks and H2O for the modelling phase, where all these steps are wrapped inside a Spark Pipeline. The integration between Spark and H2O can be see on the figure below. All technical details behind this integration are explained in our documentation which you can access from here.

Sparkling Water Architecture

At the end of this blog post, we also show how the generated model can be taken into production using Spark Streaming application. We use Python and PySparkling for model training phase and Scala for the deployment example.

For the purpose of this blog, we use the Combined Cycle Power Plant dataset. The goal here is to predict the energy output (in megawatts), given the temperature, ambient pressure, relative humidity and exhaust vacuum values. We will alter the dataset a little bit for the blog post purposes and use only rows where the temperature is higher then 10 degrees celsius. This can be explained such as that we are interested just in plant performance in the warmer days.

Obtain Sparkling Water

First step is to download Sparkling Water. It can be downloaded from our official download page. Please make sure to use the latest Sparkling Water as the H2OAutoml in Sparkling Water is a fairly new feature available in the latest versions. Once you downloaded the Sparkling Water, please follow the instructions on the PySparkling tab on how to start PySparkling interpreter.

Download Sparkling Water

Start Sparkling Water

In order to be able to use both Spark and H2O alongside, we need to make H2O available inside the Spark cluster. This can be achieved by
running the code below.

from pysparkling import * # Import PySparkling
hc = H2OContext.getOrCreate(spark) # Start the H2OContext

This code starts H2O node on each spark executor and a special H2O node called client node inside the Spark driver.

Load Data into Sparkling Water

We use Spark to load the data into memory. For that, we can use the line below:

powerplant_df = spark.read.option("inferSchema", "true").csv("powerplant_output.csv", header=True)

This code imports the file from the specified location into the Spark cluster and creates a Spark Dataframe from it. The original datafile can be downloaded from here. It is important to specify the inferSchema option to true because otherwise, Spark won’t try to automatically infer the data types and we will have all columns with type String. This way, the types are correctly inferred.

We will use a portion of this data for the training purposes and a portion for demonstrating the predictions. We can use randomSplit method available on the Spark Dataframe to split the data as:

splits = powerplant_df.randomSplit([0.8, 0.2], 1)
train = splits[0]
for_predictions = splits[1]

We split the dataset and give 80% to one split and 20% to another. The last argument specifies the seed to the method can behave deterministically. We use the 80% part for the training purposes and the second part for scoring later.

Define the Pipeline with H2O AutoML

Now, we can define the Spark pipeline containing the H2O AutoML. Before we do that, we need to do a few imports so all classes and methods we require are available:

from pysparkling.ml import H2OAutoML
from pyspark.ml import Pipeline
from pyspark.ml.feature import SQLTransformer

And finally, we can start building the pipeline stages. The first pipeline stage is the SQLTransformer used for selecting the rows where the temperature is higher than 10 degrees celsius. The SQLTransformer is powerful Spark transformer where we can specify any Spark SQL code which we want to execute on the dataframe passed to from the pipeline.

temperatureTransformer = SQLTransformer(statement="SELECT * FROM __THIS__ WHERE TemperatureCelcius > 10")

It is important to understand that no code is executed at this stage as we are just defining the stages. We will show how to execute the whole pipeline later in the blog post.

The next pipeline stage is not a transformer, but estimator and is used for creating the H2O model using the H2O AutoML algorithm. This estimator is provided by the Sparkling Water library, but we can see that the API is unified with the other Spark pipeline stages.

automlEstimator = H2OAutoML(maxRuntimeSecs=60, predictionCol="HourlyEnergyOutputMW", ratio=0.9)

We defined the H2OAutoML estimator. The maxRuntimeSecs argument specifies how long we want to run the automl algorithm. The predictionCol specifies the response column and the ratio argument specifies how big part of dataset is used for the training purposes.
We specified that we want to use 90% of data for training purposes and 10% of data for the validation.

As we have defined both stages we need, we can define the whole pipeline:

pipeline = Pipeline(stages=[temperatureTransformer, automlEstimator])

And finally, train it on on the training dataset we prepared above:

model = automlEstimator.fit(df)

This call goes through all the pipeline stages and in case of estimators, creates a model. So as part of this call, we run the H2O AutoML algorithm and find the best model given the search criteria we specified in the arguments. The model variable contains the whole Spark pipeline model, which also internally contains the model found by automl. The H2O model stored inside is stored in the H2O MOJO format. That means that it is independent from the H2O runtime and therefore, it can be run anywhere without initializing an H2O cluster. For more information about MOJO, please visit the MOJO documentation.

Prediction

We can run generate predictions on the returned model simply as:

predicted = model.transform(for_predictions)

This call again goes through all the pipeline stages and in case it hits a stage with a model, it performs a scoring operation.

We can also see a few first results as:

predicted.take(2)

Export Model for Deployment

In the following part of the blog post, we show how to put this model into production. For that, we need to export the model, which can be done simply as:

model.write().overwrite().save("pipeline.model")

This call will store the model into the pipeline.model file. It is also helpful trick to export schema of the data. This is especially useful in the case of streaming applications where it’s hard to determine the type of data based on a single row in the input.

We can export the schema as:

with open('schema.json','w') as f:
    f.write(str(powerplant_df.schema.json()))

Deploy the Model

Now, we would like to demonstrate how the Spark pipeline with model found by automl can be put into production in case of Spark Streaming application. For the deployment, we can start a new Spark application, it can be in Scala or Python and we can load the trained pipeline model. The pipeline model contains the H2O AutoML model packaged as a MOJO and therefore, it is independent on the H2O runtime. We will use Scala to demonstrate the language independence of the exported pipeline.

The deployment consist of several steps:

  • Load the schema from the schema file.
  • Create input data stream and pass it the schema. The input data stream will point to a directory where new csv files will be coming from different streaming sources. It can also be a on-line source of streaming data.
  • Load the pipeline from the pipeline file<./li>
  • Create output data stream. For our purposes, we store the data into memory and also to a SparkSQL table so we can see immediate results.

// Start Spark
val spark = SparkSession.builder().master("local").getOrCreate()


// Load exported pipeline
import org.apache.spark.sql.types.DataType
val pipelineModel = PipelineModel.read.load("pipeline.model")


// Load exported schema of input data
val schema = StructType(DataType.fromJson(scala.io.Source.fromFile("schema.json").mkString).asInstanceOf[StructType].map {
  case StructField(name, dtype, nullable, metadata) => StructField(name, dtype, true, metadata)
  case rec => rec
})
println(schema)


// Define input stream
val inputDataStream = spark.readStream.schema(schema).csv("/path/to/folder/where/input/data/are/being/generated")


// Apply loaded model
val outputDataStream = pipelineModel.transform(inputDataStream)


// Forward output stream into memory-sink
outputDataStream.writeStream.format("memory").queryName("predictions").start()


// Query results
while(true){
  spark.sql("select * from predictions").show()
  Thread.sleep(5000)
}

Conclusion

This code demonstrates that we can relatively easily put a pipeline model into production. We used Python for the model creation and JVM-based language for the deployment. The resulting pipeline model contains model found by H2O automl algorithm, exported as MOJO. This means that we don’t need to start H2O or Sparkling Water in place where we deploy the model (but we need to ensure that Sparkling Water dependencies are on the classpath).

Sparkling Water 2.3.0 is now available!

Hi Makers!

We are happy to announce that Sparkling Water now fully supports Spark 2.3 and is available from our download page.

If you are using an older version of Spark, that’s no problem. Even though we suggest upgrading to the latest version possible, we keep the Sparkling Water releases for Spark 2.2 and 2.1 up-to-date with the latest version if we are not limited by Spark.

The last release of Sparkling Water contained several important bug fixes. The 3 major bug fixes are:

  • Handle nulls properly in H2OMojoModel. In the previous versions, running predictions on the H2OMojoModel with null values would fail. We now handle the null values as missing values and it no longer fails.

  • We marked the Spark dependencies in our maven packages as provided. This means that we assume that Spark dependencies are always provided by the run-time, which should always be true. This ensures a cleaner and more transparent Sparkling Water environment.

  • In PySparkling, the method as_h2o_frame didn’t issue an alert when we passed in a wrong input type. This method accepts only Spark DataFrames and RDDs, however, some users tried to pass different types and this method ended silently. Now we fail if the user passes a wrong data type to this method.

It is also important to mention that Spark 2.3 removed support for Scala 2.10. We’ve done the same in the release for Spark 2.3. Scala 2.10 is still supported in the older Spark versions.

The latest Sparkling Water versions also integrated with H2O 3.18.0.5 which brings several important fixes. The full change log for H2O 3.18.0.5 is available here and the full Sparkling Water change log can be viewed here.

Thank you!

Kuba

Senior Software Engineer, Sparkling Water Team

Sparkling Water 2.2.10 is now available!

Hi Makers!

There are several new features in the latest Sparkling Water. The major new addition is that we now publish Sparkling Water documentation as a website which is available here. This link is for Spark 2.2.

We have also documented and fixed a few issues with LDAP on Sparkling Water. Exact steps are provided in the documentation.

Bundled H2O was upgraded to 3.18.0.4 which brings the ordinal regression for GLM as the major change.

The last major change included in this release is the availability of the H2O AutoML and H2O Grid Search transformation for the Spark pipelines. They are now being exposed as regular Spark Estimator and can be used within your Spark pipelines. An example PySparkling AutoML script can be found here.

The full changelog can be viewed here.

Sparkling Water is already integrated with Spark 2.3 on master and the next release will be also for this latest Spark version.

Stay tuned!

New versions of H2O-3 and Sparkling Water available

Dear H2O Community,

#H2OWorld is on Monday and we can’t wait to see you there! We’ll also be live streaming the event starting at 9:25am PST. Explore the agenda here.

Today we’re excited to share that new versions of H2O-3 and Sparkling Water are available.

We invite you to download them here:
https://www.h2o.ai/download/

H2O-3.16
– MOJOs are now supported for Stacked Ensembles.
– Easily specify the meta-learner algorithm type that Stacked Ensemble should use. This can be AUTO, GLM, GBM, DRF or Deep Learning.
– GBM, DRF now support custom evaluation metrics.
– The AutoML leaderboard now uses cross-validation metrics (new default).
– Multiclass stacking is now supported in AutoML. Removed the check that caused AutoML to skip stacking for multiclass.
– The Aggregator Function is now exposed in the Python/R client.
– Support for Python 3.6.

Detailed changes and bug fixes can be found here:
https://github.com/h2oai/h2o-3/blob/master/Changes.md

Sparkling Water 2.0, 2.1, 2.2
– Support for H2O Models into Spark python pipelines.
– Improved handling of sparse vectors in internal cluster.
– Improved stability of external cluster deployment mode.
– Includes latest H2O-3.16.0.2.

Detailed changes and bug fixes can be explored here:
2.2 – https://github.com/h2oai/sparkling-water/blob/rel-2.2/doc/CHANGELOG.rst
2.1 – https://github.com/h2oai/sparkling-water/blob/rel-2.1/doc/CHANGELOG.rst
2.0 – https://github.com/h2oai/sparkling-water/blob/rel-2.0/doc/CHANGELOG.rst

Hope to see you on Monday!

The H2O.ai Team

Use H2O.ai on Azure HDInsight

This is a repost from this article on MSDN

We’re hosting an upcoming webinar to present you how to use H2O on HDInsight and to answer your questions. Sign up for our upcoming webinar on combining H2O and Azure HDInsight.

We recently announced that H2O and Microsoft Azure HDInsight have integrated to provide Data Scientists with a Leading Combination of Engines for Machine Learning and Deep Learning. Through H2O’s AI platform and its Sparkling Water solution, users can combine the fast, scalable machine learning algorithms of H2O with the capabilities of Spark, as well as drive computation from Scala/R/Python and utilize the H2O Flow UI, providing an ideal machine learning platform for application developers.

In this blog, we will provide a detailed step-by-step guide to help you set up the first H2O on HDInsight solution.

Step 1: setting up the environment

The first step is to create an HDInsight cluster with H2O installed. You can either create an HDInsight cluster and install H2O during provision time, or you can also install H2O on an existing cluster. Please note that H2O on HDInsight only works for Spark 2.0 on HDInsight 3.5 as of today, which is the default version of HDInsight.

For more information on how to create a cluster in HDInsight, please refer to the documentation here (https://docs.microsoft.com/en-us/azure/hdinsight/hdinsight-hadoop-provision-linux-clusters). For more information on how to install an application on an existing cluster, please refer to the documentation here (https://docs.microsoft.com/en-us/azure/hdinsight/hdinsight-apps-install-applications)

Please be noted that we’ve recently updated our UI with less clicks, so you need to click “custom” button to install applications on HDInsight.

hdi-image1

Step 2: Setting up the environment

After installing H2O on HDInsight, you can simply use the built-in Jupyter notebooks to write your first H2O on HDInsight applications. You can simply go to (https://yourclustername.azurehdinsight.net/jupyter) to open the Jupyter Notebook. You will see a folder named “H2O-PySparkling-Examples”.

hdi-image2

There are a few examples in the folder, but I recommend starting with the one named “Sentiment_analysis_with_Sparkling_Water.ipynb”. Most of the details on how to use the H2O PySparkling Water APIs are already covered in the Notebook itself, so here I will give some high-level overviews.

The first thing you need to do is to configure the environment. Most of the configurations are already taken care by the system, such as the FLOW UI address, Spark jar location, the Sparkling water egg file, etc.

There are three important parameter to configure: the driver memory, executor memory, and the number of executors. The default values are optimized for the default 4 node cluster, but your cluster size might vary.

Tuning these parameters are outside of scope of this blog, as it is more of a Spark resource tuning problem. There are a few good reference articles such as this one.

Note that all spark applications deployed using a Jupyter Notebook will have “yarn-cluster” deploy-mode. This means that the spark driver node will be allocated on any worker node of the cluster, not on the head nodes.

In this example, we simply allocate 75% of an HDInsight cluster worker nodes to the driver and executors (21 GB each), and put 3 executors, since the default HDInsight cluster size is 4 worker nodes (3 executors + 1 driver)

hdi-image3

Please refer to the Jupyter Notebook tutorial for more information on how to use Jupyter Notebooks on HDInsight.

The second step here is to create an H2O context. Since one default spark context is already configured in the Jupyter Notebook (called sc), in H2O, we just need to call

h2o_context = pysparkling.H2OContext.getOrCreate(sc)

so H2O can recognize the default spark context.

After executing this line of code, H2O will print out the status, as well as the YARN application it is using.

hdi-image4

After this, you can use H2O APIs plus the Spark APIs to write your applications. To learn more about Sparkling Water APIs, refer to the H2O GitHub site here.

hdi-image5

This sentiment analysis example has a few steps to analyze the data:

  1. Load data to Spark and H2O frames
  2. Data munging using H2O API
    • Remove columns
    • Refine Time Column into Year/Month/Day/DayOfWeek/Hour columns
  3. Data munging using Spark API
    • Select columns Score, Month, Day, DayOfWeek, Summary
    • Define UDF to transform score (0..5) to binary positive/negative
    • Use TF-IDF to vectorize summary column
  4. Model building using H2O API
    • Use H2O Grid Search to tune hyper parameters
    • Select the best Deep Learning model

Please refer to the Jupyter Notebook for more details.

Step 3: use FLOW UI to monitor the progress and visualize the model

H2O Flow is an interactive web-based computational user interface where you can combine code execution, text, mathematics, plots and rich media into a single document, much like Jupyter Notebooks. With H2O Flow, you can capture, rerun, annotate, present, and share your workflow. H2O Flow allows you to use H2O interactively to import files, build models, and iteratively improve them. Based on your models, you can make predictions and add rich text to create vignettes of your work – all within Flow’s browser-based environment. In this blog, we will only focus on its visualization part.

H2O FLOW web service lives in the Spark driver and is routed through the HDInsight gateway, so it can only be accessed when the spark application/Notebook is running

You can click the available link in the Jupyter Notebook, or you can directly access this URL: https://yourclustername-h2o.apps.azurehdinsight.net/flow/index.html

In this example, we will demonstrate its visualization capabilities. Simply click “Model > List Grid Search Results” (since we are trying to use Grid Search to tune hyper parameters)

hdi-image6

Then you can access the 4 grid search results:

hdi-image7

And you can view the details of each model. For example, you can visualize the ROC curve as below:

hdi-image8

In Jupyter Notebooks, you can also view the performance in text format:

hdi-image9

Summary
In this blog, we have walked you through the detailed steps on how to create your first H2O application on HDInsight for your machine learning applications. For more information on H2O, please visit H2O site; For more information on HDInsight, please visit the HDInsight site

This blog-post is co-authored by Pablo Marin(@pablomarin), Solution Architect in Microsoft.

Sparkling Water on the Spark-Notebook

This is a guest post from our friends at Kensu.

In the space of Data Science development in enterprises, two outstanding scalable technologies are Spark and H2O. Spark is a generic distributed computing framework and H2O is a very performant scalable platform for AI.
Their complementarity is best exploited with the use of Sparkling Water. Sparkling Water is the solution to get the best of Spark – its elegant APIs, RDDs, multi-tenant Context and H2O’s speed, columnar-compression and fully-featured Machine Learning and Deep-Learning algorithms in an enterprise ready fashion.

Examples of Sparkling Water pipelines are readily available in the H2O github repository, we have revisited these examples using the Spark-Notebook.

The Spark-Notebook is an open source notebook (web-based environment for code edition, execution, and data visualization), focused on Scala and Spark. The Spark-Notebook is part of the Adalog suite of Kensu.io which addresses agility, maintainability and productivity for data science teams. Adalog offers to data scientists a short work cycle to deploy their work to the business reality and to managers a set of data governance giving a consistent view on the impact of data activities on the market.

This new material allows diving into Sparkling Water in an interactive and dynamic way.

Working with Sparking Water in the Spark-Notebook scaffolds an ideal platform for big data /data science agile development. Most notably, this gives the data scientist the power to:

  • Write rich documentation of his work alongside the code, thus improving the capacity to index knowledge
  • Experiment quickly through interactive execution of individual code cells and share the results of these experiments with his colleagues.
  • Visualize the data he/she is feeding H2O through an extensive list of widgets and automatic makeup of computation results.

Most of the H2O/Sparkling water examples have been ported to the Spark-Notebook and are available in a github repository.

We are focussing here on the Chicago crime dataset example and looking at:

  • How to take advantage of both H2O and Spark-Notebook technologies,
  • How to install the Spark-Notebook,
  • How to use it to deploy H2O jobs on a spark cluster,
  • How to read, transform and join data with Spark,
  • How to render data on a geospatial map,
  • How to apply deep learning or Gradient Boosted Machine (GBM) models using Sparkling Water

Installing the Spark-Notebook:

Installation is very straightforward on a local machine. Follow the steps described in the Spark-Notebook documentation and in a few minutes, you will have it working. Please note that Sparkling Water works only with Scala 2.11 and Spark 2.02 and above currently.
For larger projects, you may also be interested to read the documentation on how to connect the notebook to an on-premise or cloud computing cluster.

The Sparkling Water notebooks repo should be cloned in the “notebooks” directory of your Spark-Notebook installation.

Integrating H2O with the Spark-Notebook:

In order to integrate Sparkling Water with the Spark-Notebook, we need to tell the notebook to load the Sparkling Water package and specify custom spark configuration, if required. Spark then automatically distributes the H2O libraries on each of your Spark executors. Declaring Sparkling Water dependencies induces some libraries to come along by transitivity, therefore take care to ensure duplication or multiple versions of some dependencies is avoided.
The notebook metadata defines custom dependencies (ai.h2o) and dependencies to not include (because they’re already available, i.e. spark, scala and jetty). The custom local repos allow us to define where dependencies are stored locally and thus avoid downloading these each time a notebook is started.

"customLocalRepo": "/tmp/spark-notebook",
"customDeps": [
  "ai.h2o % sparkling-water-core_2.11 % 2.0.2",
  "ai.h2o % sparkling-water-examples_2.11 % 2.0.2",
  "- org.apache.hadoop % hadoop-client %   _",
  "- org.apache.spark  % spark-core_2.11    %   _",
  "- org.apache.spark % spark-mllib_2.11 % _",
  "- org.apache.spark % spark-repl_2.11 % _",
  "- org.scala-lang    %     _         %   _",
  "- org.scoverage     %     _         %   _",
  "- org.eclipse.jetty.aggregate % jetty-servlet % _"
],
"customSparkConf": {
  "spark.ext.h2o.repl.enabled": "false"
},

With these dependencies set, we can start using Sparkling Water and initiate an H2O context from within the notebook.

Benchmark example – Chicago Crime Scenes:

As an example, we can revisit the Chicago Crime Sparkling Water demo. The Spark-Notebook we used for this benchmark can be seen in a read-only mode here.

Step 1: The Three datasets are loaded as spark data frames:

  • Chicago weather data : Min, Max and Mean temperature per day
  • Chicago Census data : Average poverty, unemployment, education level and gross income per Chicago Community Area
  • Chicago historical crime data : Crime description, date, location, community area, etc. Also contains a flag telling whether the criminal has been arrested or not.

The three tables are joined using Spark into a big table with location and date as keys. A view of the first entries of the table are generated by the notebook’s automatic rendering of tables (See a sample on the table below).

spark_tables

Geospatial charts widgets are also available in the Spark-Notebook, for example, the 100 first crimes in the table:

geospatial

Step 2: We can transform the spark data frame into an H2O Frame and randomly split the H2O Frame into training and validation frames containing 80% and 20% of the rows, respectively. This is a memory to memory transformation, effectively copying and formatting data in the spark data frame into an equivalent representation in the H2O nodes (spawned by Sparkling Water into the spark executors).
We can verify that the frames are loaded into H2O by looking at the H2O Flow UI (available on port 54321 of your spark-notebook installation). We can access it by calling “openFlow” in a notebook cell.

h2oflow

Step 3: From the Spark-Notebook, we train two H2O machine learning models on the training H2O frame. For comparison, we are constructing a Deep Learning MLP model and a Gradient Boosting Machine (GBM) model. Both models are using all the data frame columns as features: time, weather, location, and neighborhood census data. Models are living in the H2O context and thus visible in the H2O flow UI. Sparkling Water functions allow us to access these from the SparkContext.

We compare the classification performance of the two models by looking at the area under the curve (AUC) on the validation dataset. The AUC measures the discrimination power of the model, that is the ability of the model to correctly classify crimes that lead to an arrest or not. The higher, the better.

The Deep Learning model leads to a 0.89 AUC while the GBM gets to 0.90 AUC. The two models are therefore quite comparable in terms of discrimination power.

Flow2

Step 4: Finally, the trained model is used to measure the probability of arrest for two specific crimes:

  • A “narcotics” related crime on 02/08/2015 11:43:58 PM in a street of community area “46” in district 4 with FBI code 18.

    The probability of being arrested predicted by the deep learning model is 99.9% and by the GBM is 75.2%.

  • A “deceptive practice” related crime on 02/08/2015 11:00:39 PM in a residence of community area “14” in district 9 with FBI code 11.

    The probability of being arrested predicted by the deep learning model is 1.4% and by the GBM is 12%.

The Spark-Notebook allows for a quick computation and visualization of the results:

spark_notebook

Summary

Combining Spark and H2O within the Spark-Notebook is a very nice set-up for scalable data science. More examples are available in the online viewer. If you are interested in running them, install the Spark-Notebook and look in this repository. From that point , you are on track for enterprise-ready interactive scalable data science.

Loic Quertenmont,
Data Scientist @ Kensu.io

What is new in Sparkling Water 2.0.3 Release?

This release has H2O core – 3.10.1.2

Important Feature:

This architectural change allows to connect to existing h2o cluster from sparkling water. This has a benefit that we are no longer affected by Spark killing it’s executors thus we should have more stable solution in environment with lots of h2o/spark node. We are working on article on how to use this very important feature in Sparkling Water 2.0.3.

Release notes: https://0xdata.atlassian.net/secure/ReleaseNote.jspa?projectId=12000&version=16601

2.0.3 (2017-01-04)

  • Bug
    • SW-152 – ClassNotFound with spark-submit
    • SW-266 – H2OContext shouldn’t be Serializable
    • SW-276 – ClassLoading issue when running code using SparkSubmit
    • SW-281 – Update sparkling water tests so they use correct frame locking
    • SW-283 – Set spark.sql.warehouse.dir explicitly in tests because of SPARK-17810
    • SW-284 – Fix CraigsListJobTitlesApp to use local file instead of trying to get one from hdfs
    • SW-285 – Disable timeline service also in python integration tests
    • SW-286 – Add missing test in pysparkling for conversion RDD[Double] -> H2OFrame
    • SW-287 – Fix bug in SparkDataFrame converter where key wasn’t random if not specified
    • SW-288 – Improve performance of Dataset tests and call super.afterAll
    • SW-289 – Fix PySparkling numeric handling during conversions
    • SW-290 – Fixes and improvements of task used to extended h2o jars by sparkling-water classes
    • SW-292 – Fix ScalaCodeHandlerTestSuite
  • New Feature
    • SW-178 – Allow external h2o cluster to act as h2o backend in Sparkling Water
  • Improvement
    • SW-282 – Integrate SW with H2O 3.10.1.2 ( Support for external cluster )
    • SW-291 – Use absolute value for random number in sparkling-water in internal backend
    • SW-295 – H2OConf should be parameterized by SparkConf and not by SparkContext

Please visit https://community.h2o.ai to learn more about it, provide feedback and ask for assistance as needed.

@avkashchauhan | @h2oai

sparklyr: R interface for Apache Spark

This post is reposted from Rstudio’s announcement on sparklyr – Rstudio’s extension for Spark

sparklyr-illustration

  • Connect to Spark from R. The sparklyr package provides a complete dplyr backend.
  • Filter and aggregate Spark datasets then bring them into R for analysis and visualization.
  • Use Spark’s distributed machine learning library from R.
  • Create extensions that call the full Spark API and provide interfaces to Spark packages.

Installation

You can install the sparklyr package from CRAN as follows:

install.packages("sparklyr")

You should also install a local version of Spark for development purposes:

library(sparklyr)
spark_install(version = "1.6.2")

To upgrade to the latest version of sparklyr, run the following command and restart your r session:

devtools::install_github("rstudio/sparklyr")

If you use the RStudio IDE, you should also download the latest preview release of the IDE which includes several enhancements for interacting with Spark (see the RStudio IDE section below for more details).

Connecting to Spark

You can connect to both local instances of Spark as well as remote Spark clusters. Here we’ll connect to a local instance of Spark via the spark_connect function:

library(sparklyr)
sc <- spark_connect(master = "local")

The returned Spark connection (sc) provides a remote dplyr data source to the Spark cluster.

For more information on connecting to remote Spark clusters see the Deployment section of the sparklyr website.

Using dplyr

We can new use all of the available dplyr verbs against the tables within the cluster.

We’ll start by copying some datasets from R into the Spark cluster (note that you may need to install the nycflights13 and Lahman packages in order to execute this code):

install.packages(c("nycflights13", "Lahman"))
library(dplyr)
iris_tbl <- copy_to(sc, iris)
flights_tbl <- copy_to(sc, nycflights13::flights, "flights")
batting_tbl <- copy_to(sc, Lahman::Batting, "batting")
src_tbls(sc)
## [1] "batting" "flights" "iris"

To start with here’s a simple filtering example:

# filter by departure delay and print the first few records
flights_tbl %>% filter(dep_delay == 2)
## Source:   query [?? x 19]
## Database: spark connection master=local[8] app=sparklyr local=TRUE
## 
##     year month   day dep_time sched_dep_time dep_delay arr_time
##    <int> <int> <int>    <int>          <int>     <dbl>    <int>
## 1   2013     1     1      517            515         2      830
## 2   2013     1     1      542            540         2      923
## 3   2013     1     1      702            700         2     1058
## 4   2013     1     1      715            713         2      911
## 5   2013     1     1      752            750         2     1025
## 6   2013     1     1      917            915         2     1206
## 7   2013     1     1      932            930         2     1219
## 8   2013     1     1     1028           1026         2     1350
## 9   2013     1     1     1042           1040         2     1325
## 10  2013     1     1     1231           1229         2     1523
## # ... with more rows, and 12 more variables: sched_arr_time <int>,
## #   arr_delay <dbl>, carrier <chr>, flight <int>, tailnum <chr>,
## #   origin <chr>, dest <chr>, air_time <dbl>, distance <dbl>, hour <dbl>,
## #   minute <dbl>, time_hour <dbl>

Introduction to dplyr provides additional dplyr examples you can try. For example, consider the last example from the tutorial which plots data on flight delays:

delay <- flights_tbl %>% 
  group_by(tailnum) %>%
  summarise(count = n(), dist = mean(distance), delay = mean(arr_delay)) %>%
  filter(count > 20, dist < 2000, !is.na(delay)) %>%
  collect

# plot delays
library(ggplot2)
ggplot(delay, aes(dist, delay)) +
  geom_point(aes(size = count), alpha = 1/2) +
  geom_smooth() +
  scale_size_area(max_size = 2)

ggplot2-flights

Window Functions

dplyr window functions are also supported, for example:

batting_tbl %>%
  select(playerID, yearID, teamID, G, AB:H) %>%
  arrange(playerID, yearID, teamID) %>%
  group_by(playerID) %>%
  filter(min_rank(desc(H)) <= 2 & H > 0)
## Source:   query [?? x 7]
## Database: spark connection master=local[8] app=sparklyr local=TRUE
## Groups: playerID
## 
##     playerID yearID teamID     G    AB     R     H
##        <chr>  <int>  <chr> <int> <int> <int> <int>
## 1  abbotpa01   2000    SEA    35     5     1     2
## 2  abbotpa01   2004    PHI    10    11     1     2
## 3  abnersh01   1992    CHA    97   208    21    58
## 4  abnersh01   1990    SDN    91   184    17    45
## 5  abreujo02   2014    CHA   145   556    80   176
## 6  acevejo01   2001    CIN    18    34     1     4
## 7  acevejo01   2004    CIN    39    43     0     2
## 8  adamsbe01   1919    PHI    78   232    14    54
## 9  adamsbe01   1918    PHI    84   227    10    40
## 10 adamsbu01   1945    SLN   140   578    98   169
## # ... with more rows

For additional documentation on using dplyr with Spark see the dplyr section of the sparklyr website.

Using SQL

It’s also possible to execute SQL queries directly against tables within a Spark cluster. The spark_connection object implements a DBI interface for Spark, so you can use dbGetQuery to execute SQL and return the result as an R data frame:

library(DBI)
iris_preview <- dbGetQuery(sc, "SELECT * FROM iris LIMIT 10")
iris_preview
##    Sepal_Length Sepal_Width Petal_Length Petal_Width Species
## 1           5.1         3.5          1.4         0.2  setosa
## 2           4.9         3.0          1.4         0.2  setosa
## 3           4.7         3.2          1.3         0.2  setosa
## 4           4.6         3.1          1.5         0.2  setosa
## 5           5.0         3.6          1.4         0.2  setosa
## 6           5.4         3.9          1.7         0.4  setosa
## 7           4.6         3.4          1.4         0.3  setosa
## 8           5.0         3.4          1.5         0.2  setosa
## 9           4.4         2.9          1.4         0.2  setosa
## 10          4.9         3.1          1.5         0.1  setosa

Machine Learning

You can orchestrate machine learning algorithms in a Spark cluster via the machine learning functions within sparklyr. These functions connect to a set of high-level APIs built on top of DataFrames that help you create and tune machine learning workflows.

Here’s an example where we use ml_linear_regression to fit a linear regression model. We’ll use the built-in mtcars dataset, and see if we can predict a car’s fuel consumption (mpg) based on its weight (wt), and the number of cylinders the engine contains (cyl). We’ll assume in each case that the relationship between mpg and each of our features is linear.

# copy mtcars into spark
mtcars_tbl <- copy_to(sc, mtcars)

# transform our data set, and then partition into 'training', 'test'
partitions <- mtcars_tbl %>%
  filter(hp >= 100) %>%
  mutate(cyl8 = cyl == 8) %>%
  sdf_partition(training = 0.5, test = 0.5, seed = 1099)

# fit a linear model to the training dataset
fit <- partitions$training %>%
  ml_linear_regression(response = "mpg", features = c("wt", "cyl"))
fit
## Call: ml_linear_regression(., response = "mpg", features = c("wt", "cyl"))
## 
## Coefficients:
## (Intercept)          wt         cyl 
##   37.066699   -2.309504   -1.639546

For linear regression models produced by Spark, we can use summary() to learn a bit more about the quality of our fit, and the statistical significance of each of our predictors.

summary(fit)
## Call: ml_linear_regression(., response = "mpg", features = c("wt", "cyl"))
## 
## Deviance Residuals::
##     Min      1Q  Median      3Q     Max 
## -2.6881 -1.0507 -0.4420  0.4757  3.3858 
## 
## Coefficients:
##             Estimate Std. Error t value  Pr(>|t|)    
## (Intercept) 37.06670    2.76494 13.4059 2.981e-07 ***
## wt          -2.30950    0.84748 -2.7252   0.02341 *  
## cyl         -1.63955    0.58635 -2.7962   0.02084 *  
## ---
## Signif. codes:  0 '***' 0.001 '**' 0.01 '*' 0.05 '.' 0.1 ' ' 1
## 
## R-Squared: 0.8665
## Root Mean Squared Error: 1.799

Spark machine learning supports a wide array of algorithms and feature transformations and as illustrated above it’s easy to chain these functions together with dplyr pipelines. To learn more see the machine learning section.

Reading and Writing Data

You can read and write data in CSV, JSON, and Parquet formats. Data can be stored in HDFS, S3, or on the lcoal filesystem of cluster nodes.

temp_csv <- tempfile(fileext = ".csv")
temp_parquet <- tempfile(fileext = ".parquet")
temp_json <- tempfile(fileext = ".json")

spark_write_csv(iris_tbl, temp_csv)
iris_csv_tbl <- spark_read_csv(sc, "iris_csv", temp_csv)

spark_write_parquet(iris_tbl, temp_parquet)
iris_parquet_tbl <- spark_read_parquet(sc, "iris_parquet", temp_parquet)

spark_write_csv(iris_tbl, temp_json)
iris_json_tbl <- spark_read_csv(sc, "iris_json", temp_json)

src_tbls(sc)
## [1] "batting"      "flights"      "iris"         "iris_csv"    
## [5] "iris_json"    "iris_parquet" "mtcars"

Extensions

The facilities used internally by sparklyr for its dplyr and machine learning interfaces are available to extension packages. Since Spark is a general purpose cluster computing system there are many potential applications for extensions (e.g. interfaces to custom machine learning pipelines, interfaces to 3rd party Spark packages, etc.).

Here’s a simple example that wraps a Spark text file line counting function with an R function:

# write a CSV 
tempfile <- tempfile(fileext = ".csv")
write.csv(nycflights13::flights, tempfile, row.names = FALSE, na = "")

# define an R interface to Spark line counting
count_lines <- function(sc, path) {
  spark_context(sc) %>% 
    invoke("textFile", path, 1L) %>% 
      invoke("count")
}

# call spark to count the lines of the CSV
count_lines(sc, tempfile)
## [1] 336777

To learn more about creating extensions see the Extensions section of the sparklyr website.

dplyr Utilities

You can cache a table into memory with:

tbl_cache(sc, "batting")

and unload from memory using:

tbl_uncache(sc, "batting")

Connection Utilities

You can view the Spark web console using the spark_web function:

spark_web(sc)

You can show the log using the spark_log function:

spark_log(sc, n = 10)
## 16/09/24 07:50:59 INFO ContextCleaner: Cleaned accumulator 224
## 16/09/24 07:50:59 INFO ContextCleaner: Cleaned accumulator 223
## 16/09/24 07:50:59 INFO ContextCleaner: Cleaned accumulator 222
## 16/09/24 07:50:59 INFO BlockManagerInfo: Removed broadcast_64_piece0 on localhost:56324 in memory (size: 20.6 KB, free: 483.0 MB)
## 16/09/24 07:50:59 INFO ContextCleaner: Cleaned accumulator 220
## 16/09/24 07:50:59 INFO Executor: Finished task 0.0 in stage 67.0 (TID 117). 2082 bytes result sent to driver
## 16/09/24 07:50:59 INFO TaskSetManager: Finished task 0.0 in stage 67.0 (TID 117) in 122 ms on localhost (1/1)
## 16/09/24 07:50:59 INFO DAGScheduler: ResultStage 67 (count at NativeMethodAccessorImpl.java:-2) finished in 0.122 s
## 16/09/24 07:50:59 INFO TaskSchedulerImpl: Removed TaskSet 67.0, whose tasks have all completed, from pool 
## 16/09/24 07:50:59 INFO DAGScheduler: Job 47 finished: count at NativeMethodAccessorImpl.java:-2, took 0.125238 s

Finally, we disconnect from Spark:

spark_disconnect(sc)

RStudio IDE

The latest RStudio Preview Release of the RStudio IDE includes integrated support for Spark and the sparklyr package, including tools for:

  • Creating and managing Spark connections
  • Browsing the tables and columns of Spark DataFrames
  • Previewing the first 1,000 rows of Spark DataFrames

Once you’ve installed the sparklyr package, you should find a new Spark pane within the IDE. This pane includes a New Connection dialog which can be used to make connections to local or remote Spark instances:

spark-connect

Once you’ve connected to Spark you’ll be able to browse the tables contained within the Spark cluster:

spark-tab

The Spark DataFrame preview uses the standard RStudio data viewer:

spark-connect

The RStudio IDE features for sparklyr are available now as part of the RStudio Preview Release.

Spam Detection with Sparkling Water and Spark Machine Learning Pipelines

This short post presents the “ham or spam” demo, which has already been posted earlier by Michal Malohlava, using our new API in latest Sparkling Water for Spark 1.6 and earlier versions, unifying Spark and H2O Machine Learning pipelines. It shows how to create a simple Spark Machine Learning pipeline and a model based on the fitted pipeline, which can be later used for prediction whether a particular message is spam or not.

Before diving into the demo steps, we would like to provide some details about the new features in the upcoming Sparkling Water 2.0:

  • Support for Apache Spark 2.0 and backwards compatibility with all previous versions.
  • The ability to run Apache Spark and Scala through H2O’s Flow UI.
  • H2O feature improvements and visualizations for MLlib algorithms, including the ability to score feature importance.
  • Visual intelligence for Apache Spark.
  • The ability to build Ensembles using H2O plus MLlib algorithms.
  • The power to export MLlib models as POJOs (Plain Old Java Objects), which can be easily run on commodity hardware.
  • A toolchain for ML pipelines.
  • Debugging support for Spark pipelines.
  • Model and data governance through Steam.
  • Bringing H2O’s powerful data munging capabilities to Apache Spark.
  • In order to run the code below, start your Spark shell with attached Sparkling Water JAR or use sparkling-shell script that already does this for you.

    You can start the Spark shell with Sparkling Water as follows:

    $SPARK_HOME/bin/spark-submit \
    --class water.SparklingWaterDriver \
    --packages ai.h2o:sparkling-water-examples_2.10:1.6.5 \
    --executor-memory=6g \
    --driver-memory=6g /dev/null
    

    Preferable Spark is Spark 1.6 and Sparkling Water 1.6.x.

    Prepare the coding environment

    Here we just import all required libraries.

    import org.apache.spark.SparkFiles
    import org.apache.spark.ml.PipelineModel
    import org.apache.spark.ml.feature._
    import org.apache.spark.ml.h2o.H2OPipeline
    import org.apache.spark.ml.h2o.features.{ColRemover, DatasetSplitter}
    import org.apache.spark.ml.h2o.models.H2ODeepLearning
    import org.apache.spark.sql.types.{StringType, StructField, StructType}
    import org.apache.spark.sql.{DataFrame, Row, SQLContext}
    import water.support.SparkContextSupport
    import water.fvec.H2OFrame
    

    Add our dataset to Spark environment. The dataset consists of 2 columns where the first one is the label ( ham or spam ) and the second one is the message itself. We don’t have to explicitly ask for Spark context since it’s already available via sc variable.

    val smsDataFileName = "smsData.txt"
    val smsDataFilePath = "examples/smalldata/" + smsDataFileName
    SparkContextSupport.addFiles(sc, smsDataFilePath)
    

    Create SQL support.

    implicit val sqlContext = SQLContext.getOrCreate(sc)
    

    Start H2O services.

    import org.apache.spark.h2o._
    implicit val h2oContext = H2OContext.getOrCreate(sc)
    

    Create helper method which loads the dataset, performs some basic filtering and at last creates Spark’s DataFrame with 2 columns – label and text.

    def load(dataFile: String)(implicit sqlContext: SQLContext): DataFrame = {
    val smsSchema = StructType(Array(
    StructField("label", StringType, nullable = false),
    StructField("text", StringType, nullable = false)))
    val rowRDD = sc.textFile(SparkFiles.get(dataFile)).map(_.split("\t")).filter(r => !r(0).isEmpty).map(p => Row(p(0),p(1)))
    sqlContext.createDataFrame(rowRDD, smsSchema)
    }
    

    Define the pipeline stages

    In Spark, a pipeline is formed of two basic elements – transformers and estimators. Estimators usually encapsulate an algorithm for model generation and their output are transformers. During fitting the pipeline stage, all transformers and estimators are executed and estimators are converted to transformers. The model generated by the pipeline contains only transformers. More about Spark pipelines can be found on Spark’s pipeline overview

    In H2O we created a new type of pipeline stage, which is called OneTimeTransformer. This transformer works similarly to Spark’s estimator in a way that it is only executed during fitting the pipeline stage. It does not however produces a transformer during fitting pipeline stage and the model generated by the pipeline does not contain this OneTimeTransformer.
    An example for one-time transformer is splitting the input data into a validation and training dataset using H2O Frames. We don’t need this one-time transformer to be executed every time we do prediction on the model. We just need this code to be executed when we are fitting the pipeline to the data.

    This pipeline stage is using Spark’s RegexTokenizer to tokenize the messages. We just specify input column and output column for tokenized messages.

    val tokenizer = new RegexTokenizer().
        setInputCol("text").
        setOutputCol("words").
        setMinTokenLength(3).
        setGaps(false).
        setPattern("[a-zA-Z]+")
    

    Remove unnecessary words using Spark’s StopWordsRemover.

    val stopWordsRemover = new StopWordsRemover().
        setInputCol(tokenizer.getOutputCol).
        setOutputCol("filtered").
        setStopWords(Array("the", "a", "", "in", "on", "at", "as", "not", "for")).
        setCaseSensitive(false)
    

    Vectorize the words using Spark’s HashingTF.

    val hashingTF = new HashingTF().
        setNumFeatures(1 << 10).
        setInputCol(tokenizer.getOutputCol).
        setOutputCol("wordToIndex")
    

    Create inverse document frequencies based on hashed words. It creates a numerical representation of how much information a
    given word provides in the whole message.

    val idf = new IDF().
        setMinDocFreq(4).
        setInputCol(hashingTF.getOutputCol).
        setOutputCol("tf_idf")
    

    This pipeline stage is one-time transformer. If setKeep(true) is called in it, it preserves specified columns instead
    of deleting them.

    val colRemover = new ColRemover().
        setKeep(true).
        setColumns(Array[String]("label", "tf_idf"))
    

    Split the dataset and store the splits with the specified keys into H2O’s distributed storage called DKV. This is one-time transformer which is executed only during fitting stage. It determines the frame, which is passed on the output in the following order:

    1. If the train key is specified using setTrainKey method and the key is also specified in the list of keys, then frame with this key is passed on the output
    2. Otherwise, if the default key Р“train.hex” is specified in the list of keys, then frame with this key is passed on the output
    3. Otherwise the first frame specified in the list of keys is passed on the output
    val splitter = new DatasetSplitter().
      setKeys(Array[String]("train.hex", "valid.hex")).
      setRatios(Array[Double](0.8)).
      setTrainKey("train.hex")
    

    Create H2O’s deep learning model.
    If the key specifying the training set is set using setTrainKey, then frame with this key is used as the training frame, otherwise it uses the frame from the previous stage as the training frame

    val dl = new H2ODeepLearning().
      setEpochs(10).
      setL1(0.001).
      setL2(0.0).
      setHidden(Array[Int](200, 200)).
      setValidKey(splitter.getKeys(1)).
      setResponseColumn("label")
    

    Create and fit the pipeline

    Create the pipeline using the stages we defined earlier. As a normal Spark pipeline, it can be formed of Spark’s transformers and estimators, but it also may contain H2O’s one-time transformers.

    val pipeline = new H2OPipeline().
      setStages(Array(tokenizer, stopWordsRemover, hashingTF, idf, colRemover, splitter, dl))
    

    Train the pipeline model by fitting it to a Spark’s DataFrame

    val data = load("smsData.txt")
    val model = pipeline.fit(data)
    

    Now we can optionally save the model to disk and load it again.

    model.write.overwrite().save("/tmp/hamOrSpamPipeline")
    val loadedModel = PipelineModel.load("/tmp/hamOrSpamPipeline")
    

    We can also save this unfitted pipeline to disk and load it again.

    pipeline.write.overwrite().save("/tmp/unfit-hamOrSpamPipeline")
    val loadedPipeline = H2OPipeline.load("/tmp/unfit-hamOrSpamPipeline")
    

    Train the pipeline model again on loaded pipeline just to show deserialized model works as it should.

    val modelOfLoadedPipeline = loadedPipeline.fit(data)
    

    Create helper function for predictions on unlabeled data. This method is using model generated by the pipeline. To make a prediction we call transform method with Spark’s Dataframe as an argument on the generated model. This call executes each transformer specified in the pipeline one after one producing Spark’s DataFrame with predictions.

    def isSpam(smsText: String,
               model: PipelineModel,
               h2oContext: H2OContext,
               hamThreshold: Double = 0.5):Boolean = {
      import h2oContext.implicits._
      val smsTextDF = sc.parallelize(Seq(smsText)).toDF("text") // convert to dataframe with one column named "text"
      val prediction: H2OFrame = model.transform(smsTextDF)
      prediction.vecs()(1).at(0) < hamThreshold
    }
    

    Try it!

    println(isSpam("Michal, h2oworld party tonight in MV?", modelOfLoadedPipeline, h2oContext))
    println(isSpam("We tried to contact you re your reply to our offer of a Video Handset? 750 anytime any networks mins? UNLIMITED TEXT?", loadedModel, h2oContext))
    

    In this article we showed how Spark’s pipelines and H2O algorithms work together seamlessly in Spark environment. We strive to be consistent with Spark API in H2O.ai and make the life of a developer/data scientist easier by hiding H2O internals and exposing the APIs that are natural for Spark users.

    Databricks and H2O Make it Rain with Sparkling Water

    **This blog post was first posted on the Databricks blog here

    Databricks provides a cloud-based integrated workspace on top of Apache Spark for developers and data scientists. H2O.ai has been an early adopter of Apache Spark and has developed Sparkling Water to seamlessly integrate H2O.ai’s machine learning library on top of Spark.

    In this blog, we will demonstrate an integration between the Databricks platform and H2O.ai’s Sparking Water that provides Databricks users with an additional set of machine learning libraries. The integration allows data scientists to utilize Sparkling Water with Spark in a notebook environment more easily, allowing them to seamlessly combine Spark with H2O and get the best of both worlds.

    Let’s begin by preparing a Databricks environment to develop our spam predictor:

    The first step is to log into your Databricks account and create a new library containing Sparkling Water. You can use the Maven coordinates of the Sparkling Water package, for example: h2o:sparkling-water-examples_2.10:1.5.6 (this version works with Spark 1.5)

    1

    The next step is to create a new cluster to run the example:

    2

    For this version of the Sparkling Water library we will use Spark 1.5. The name of the created cluster is “HamOrSpamCluster” – keep it handy as we will need it later.

    The next step is to upload data, you can use table import and upload the smsData.txt file

    3

    Now the environment is ready and you can create a Databricks notebook; connect it to “HamOrSpamCluster” and start building a predictive model!

    The goal of the application is to write a spam detector using a trained model to categorize incoming messages

    First look at the data. It contains raw text messages that are labeled as either spam or ham.
    For example:

    spam +123 Congratulations – in this week’s competition draw u have won the ?1450 prize to claim just call 09050002311 b4280703. T&Cs/stop SMS 08718727868. Over 18 only 150
    ham Yun ah.the ubi one say if ? wan call by tomorrow.call 67441233 look for irene.ere only got bus8,22,65,6

    We need to transform these messages into vectors of numbers and then train a binomial model to predict whether the text message is either SPAM or HAM. For the transformation of a message into a vector of numbers we will use Spark MLlib string tokenization and word to vector transformers. We are going to split messages into tokens and use the TF (term frequency–inverse document frequency) technique to represent words of importance inside the training data set:

    // Representation of a training message
    import org.apache.spark.mllib.linalg.Vector
    case class SMS(target: String, fv: Vector)
    def tokenize(data: RDD[String]): RDD[Seq[String]] = {
    val ignoredWords = Seq("the", "a", "", "in", "on", "at", "as", "not", "for")
    val ignoredChars = Seq(',', ':', ';', '/', '<', '>', '"', '.', '(', ')', '?', '-', '\'','!','0', '1')
    
    val texts = data.map( r=> {
    var smsText = r.toLowerCase
    for( c <- ignoredChars) {
    smsText = smsText.replace(c, ' ')
    }
    
    val words =smsText.split(" ").filter(w => !ignoredWords.contains(w) && w.length>2).distinct
    
    words.toSeq
    })
    texts
    }
    import org.apache.spark.mllib.feature._
    
    def buildIDFModel(tokens: RDD[Seq[String]],
    minDocFreq:Int = 4,
    hashSpaceSize:Int = 1 << 10): (HashingTF, IDFModel, RDD[Vector]) = {
    // Hash strings into the given space
    val hashingTF = new HashingTF(hashSpaceSize)
    val tf = hashingTF.transform(tokens)
    // Build term frequency-inverse document frequency
    val idfModel = new IDF(minDocFreq = minDocFreq).fit(tf)
    val expandedText = idfModel.transform(tf)
    (hashingTF, idfModel, expandedText)
    }
    

    The resulting table will contain the following lines:

    spam 0, 0, 0.31, 0.12, ….
    ham 0.67, 0, 0, 0, 0, 0.003, 0, 0.1

    After this we are free to experiment with different binary classification algorithms in H2O.

    To start using H2O, we need to initialize the H2O service by creating an H2OContext:

    // Create SQL support
    import org.apache.spark.sql._
    implicit val sqlContext = SQLContext.getOrCreate(sc)
    import sqlContext.implicits._
    
    // Start H2O services
    import org.apache.spark.h2o._
    @transient val h2oContext = new H2OContext(sc).start()
    

    H2OContext represents H2O running on top of a Spark cluster. You should see the following output:

    4

    For this demonstration, we will leverage the H2O Deep Learning method:

    // Define function which builds a DL model
    import org.apache.spark.h2o._
    import water.Key
    import <em>root</em>.hex.deeplearning.DeepLearning
    import <em>root</em>.hex.deeplearning.DeepLearningParameters
    import <em>root</em>.hex.deeplearning.DeepLearningModel
    
    def buildDLModel(train: Frame, valid: Frame,
    epochs: Int = 10, l1: Double = 0.001, l2: Double = 0.0,
    hidden: Array[Int] = Array[Int](200, 200))
    (implicit h2oContext: H2OContext): DeepLearningModel = {
    import h2oContext._
    // Build a model
    
    val dlParams = new DeepLearningParameters()
    dlParams._model_id = Key.make("dlModel.hex")
    dlParams._train = train
    dlParams._valid = valid
    dlParams._response_column = 'target
    dlParams._epochs = epochs
    dlParams._l1 = l1
    dlParams._hidden = hidden
    
    // Create a job
    val dl = new DeepLearning(dlParams)
    val dlModel = dl.trainModel.get
    
    // Compute metrics on both datasets
    dlModel.score(train).delete()
    dlModel.score(valid).delete()
    
    dlModel
    }
    

    Here is the final application:

    // Build the application
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.examples.h2o.DemoUtils._
    import scala.io.Source
    
    // load both columns from the table
    val data = sqlContext.sql("SELECT * FROM smsData")
    // Extract response spam or ham
    val hamSpam = data.map( r => r(0).toString)
    val message = data.map( r => r(1).toString)
    // Tokenize message content
    val tokens = tokenize(message)
    // Build IDF model
    var (hashingTF, idfModel, tfidf) = buildIDFModel(tokens)
    
    // Merge response with extracted vectors
    val resultRDD: DataFrame = hamSpam.zip(tfidf).map(v => SMS(v._1, v._2)).toDF
    
    // Publish Spark DataFrame as H2OFrame
    // This H2OFrame has to be transient because we do not want it to be serialized. When calling for example sc.parallelize(..) the object which we are trying to parallelize takes with itself all variables in its surroundings scope - apart from those marked as serialized.
    // 
    @transient val table = h2oContext.asH2OFrame(resultRDD)
    println(sc.parallelize(Array(1,2)))
    // Transform target column into categorical
    table.replace(table.find("target"), table.vec("target").toCategoricalVec()).remove()
    table.update(null)
    
    // Split table
    val keys = Array[String]("train.hex", "valid.hex")
    val ratios = Array<a href="0.8">Double</a>
    @transient val frs = split(table, keys, ratios)
    @transient val train = frs(0)
    @transient val valid = frs(1)
    table.delete()
    
    // Build a model
    @transient val dlModel = buildDLModel(train, valid)(h2oContext)
    

    And voila we have a Deep Learning Model ready to detect spam

    At this point you can explore quality of the model:

    scala
    // Collect model metrics and evaluate model quality
    import water.app.ModelMetricsSupport
    val validMetrics = ModelMetricsSupport.binomialMM(dlModel, valid)
    println(validMetrics.auc._auc)


    You can also use the H2O Flow UI by clicking on the URL provided when you instantiated the H2O Context.

    5

    At this point we have everything ready to create a spam detector:

    scala
    // Create a spam detector - a method which will return SPAM or HAM for given text message
    import water.DKV._
    // Spam detector
    def isSpam(msg: String,
    modelId: String,
    hashingTF: HashingTF,
    idfModel: IDFModel,
    h2oContext: H2OContext,
    hamThreshold: Double = 0.5):String = {
    val dlModel: DeepLearningModel = water.DKV.getGet(modelId)
    val msgRdd = sc.parallelize(Seq(msg))
    val msgVector: DataFrame = idfModel.transform(
    hashingTF.transform (
    tokenize (msgRdd))).map(v =&gt; SMS(&quot;?&quot;, v)).toDF
    val msgTable: H2OFrame = h2oContext.asH2OFrame(msgVector)
    msgTable.remove(0) // remove first column
    val prediction = dlModel.score(msgTable)
    //println(prediction)
    if (prediction.vecs()(1).at(0) &lt; hamThreshold) &quot;SPAM DETECTED!&quot; else &quot;HAM&quot;
    }

    The method uses built-in models to transform incoming text message and provide a prediction – SPAM or HAM. For example:

    6

    We’ve shown a fast and easy way to build a spam detector with Databricks and Sparkling Water. To try this out for yourself, register for a free 14-day trial of Databricks and check out the Sparkling Water example in the Databricks Guide.