Use H2O.ai on Azure HDInsight

This is a repost from this article on MSDN

We’re hosting an upcoming webinar to present you how to use H2O on HDInsight and to answer your questions. Sign up for our upcoming webinar on combining H2O and Azure HDInsight.

We recently announced that H2O and Microsoft Azure HDInsight have integrated to provide Data Scientists with a Leading Combination of Engines for Machine Learning and Deep Learning. Through H2O’s AI platform and its Sparkling Water solution, users can combine the fast, scalable machine learning algorithms of H2O with the capabilities of Spark, as well as drive computation from Scala/R/Python and utilize the H2O Flow UI, providing an ideal machine learning platform for application developers.

In this blog, we will provide a detailed step-by-step guide to help you set up the first H2O on HDInsight solution.

Step 1: setting up the environment

The first step is to create an HDInsight cluster with H2O installed. You can either create an HDInsight cluster and install H2O during provision time, or you can also install H2O on an existing cluster. Please note that H2O on HDInsight only works for Spark 2.0 on HDInsight 3.5 as of today, which is the default version of HDInsight.

For more information on how to create a cluster in HDInsight, please refer to the documentation here (https://docs.microsoft.com/en-us/azure/hdinsight/hdinsight-hadoop-provision-linux-clusters). For more information on how to install an application on an existing cluster, please refer to the documentation here (https://docs.microsoft.com/en-us/azure/hdinsight/hdinsight-apps-install-applications)

Please be noted that we’ve recently updated our UI with less clicks, so you need to click “custom” button to install applications on HDInsight.

hdi-image1

Step 2: Setting up the environment

After installing H2O on HDInsight, you can simply use the built-in Jupyter notebooks to write your first H2O on HDInsight applications. You can simply go to (https://yourclustername.azurehdinsight.net/jupyter) to open the Jupyter Notebook. You will see a folder named “H2O-PySparkling-Examples”.

hdi-image2

There are a few examples in the folder, but I recommend starting with the one named “Sentiment_analysis_with_Sparkling_Water.ipynb”. Most of the details on how to use the H2O PySparkling Water APIs are already covered in the Notebook itself, so here I will give some high-level overviews.

The first thing you need to do is to configure the environment. Most of the configurations are already taken care by the system, such as the FLOW UI address, Spark jar location, the Sparkling water egg file, etc.

There are three important parameter to configure: the driver memory, executor memory, and the number of executors. The default values are optimized for the default 4 node cluster, but your cluster size might vary.

Tuning these parameters are outside of scope of this blog, as it is more of a Spark resource tuning problem. There are a few good reference articles such as this one.

Note that all spark applications deployed using a Jupyter Notebook will have “yarn-cluster” deploy-mode. This means that the spark driver node will be allocated on any worker node of the cluster, not on the head nodes.

In this example, we simply allocate 75% of an HDInsight cluster worker nodes to the driver and executors (21 GB each), and put 3 executors, since the default HDInsight cluster size is 4 worker nodes (3 executors + 1 driver)

hdi-image3

Please refer to the Jupyter Notebook tutorial for more information on how to use Jupyter Notebooks on HDInsight.

The second step here is to create an H2O context. Since one default spark context is already configured in the Jupyter Notebook (called sc), in H2O, we just need to call

h2o_context = pysparkling.H2OContext.getOrCreate(sc)

so H2O can recognize the default spark context.

After executing this line of code, H2O will print out the status, as well as the YARN application it is using.

hdi-image4

After this, you can use H2O APIs plus the Spark APIs to write your applications. To learn more about Sparkling Water APIs, refer to the H2O GitHub site here.

hdi-image5

This sentiment analysis example has a few steps to analyze the data:

  1. Load data to Spark and H2O frames
  2. Data munging using H2O API
    • Remove columns
    • Refine Time Column into Year/Month/Day/DayOfWeek/Hour columns
  3. Data munging using Spark API
    • Select columns Score, Month, Day, DayOfWeek, Summary
    • Define UDF to transform score (0..5) to binary positive/negative
    • Use TF-IDF to vectorize summary column
  4. Model building using H2O API
    • Use H2O Grid Search to tune hyper parameters
    • Select the best Deep Learning model

Please refer to the Jupyter Notebook for more details.

Step 3: use FLOW UI to monitor the progress and visualize the model

H2O Flow is an interactive web-based computational user interface where you can combine code execution, text, mathematics, plots and rich media into a single document, much like Jupyter Notebooks. With H2O Flow, you can capture, rerun, annotate, present, and share your workflow. H2O Flow allows you to use H2O interactively to import files, build models, and iteratively improve them. Based on your models, you can make predictions and add rich text to create vignettes of your work – all within Flow’s browser-based environment. In this blog, we will only focus on its visualization part.

H2O FLOW web service lives in the Spark driver and is routed through the HDInsight gateway, so it can only be accessed when the spark application/Notebook is running

You can click the available link in the Jupyter Notebook, or you can directly access this URL: https://yourclustername-h2o.apps.azurehdinsight.net/flow/index.html

In this example, we will demonstrate its visualization capabilities. Simply click “Model > List Grid Search Results” (since we are trying to use Grid Search to tune hyper parameters)

hdi-image6

Then you can access the 4 grid search results:

hdi-image7

And you can view the details of each model. For example, you can visualize the ROC curve as below:

hdi-image8

In Jupyter Notebooks, you can also view the performance in text format:

hdi-image9

Summary
In this blog, we have walked you through the detailed steps on how to create your first H2O application on HDInsight for your machine learning applications. For more information on H2O, please visit H2O site; For more information on HDInsight, please visit the HDInsight site

This blog-post is co-authored by Pablo Marin(@pablomarin), Solution Architect in Microsoft.

Sparkling Water on the Spark-Notebook

This is a guest post from our friends at Kensu.

In the space of Data Science development in enterprises, two outstanding scalable technologies are Spark and H2O. Spark is a generic distributed computing framework and H2O is a very performant scalable platform for AI.
Their complementarity is best exploited with the use of Sparkling Water. Sparkling Water is the solution to get the best of Spark – its elegant APIs, RDDs, multi-tenant Context and H2O’s speed, columnar-compression and fully-featured Machine Learning and Deep-Learning algorithms in an enterprise ready fashion.

Examples of Sparkling Water pipelines are readily available in the H2O github repository, we have revisited these examples using the Spark-Notebook.

The Spark-Notebook is an open source notebook (web-based environment for code edition, execution, and data visualization), focused on Scala and Spark. The Spark-Notebook is part of the Adalog suite of Kensu.io which addresses agility, maintainability and productivity for data science teams. Adalog offers to data scientists a short work cycle to deploy their work to the business reality and to managers a set of data governance giving a consistent view on the impact of data activities on the market.

This new material allows diving into Sparkling Water in an interactive and dynamic way.

Working with Sparking Water in the Spark-Notebook scaffolds an ideal platform for big data /data science agile development. Most notably, this gives the data scientist the power to:

  • Write rich documentation of his work alongside the code, thus improving the capacity to index knowledge
  • Experiment quickly through interactive execution of individual code cells and share the results of these experiments with his colleagues.
  • Visualize the data he/she is feeding H2O through an extensive list of widgets and automatic makeup of computation results.

Most of the H2O/Sparkling water examples have been ported to the Spark-Notebook and are available in a github repository.

We are focussing here on the Chicago crime dataset example and looking at:

  • How to take advantage of both H2O and Spark-Notebook technologies,
  • How to install the Spark-Notebook,
  • How to use it to deploy H2O jobs on a spark cluster,
  • How to read, transform and join data with Spark,
  • How to render data on a geospatial map,
  • How to apply deep learning or Gradient Boosted Machine (GBM) models using Sparkling Water

Installing the Spark-Notebook:

Installation is very straightforward on a local machine. Follow the steps described in the Spark-Notebook documentation and in a few minutes, you will have it working. Please note that Sparkling Water works only with Scala 2.11 and Spark 2.02 and above currently.
For larger projects, you may also be interested to read the documentation on how to connect the notebook to an on-premise or cloud computing cluster.

The Sparkling Water notebooks repo should be cloned in the “notebooks” directory of your Spark-Notebook installation.

Integrating H2O with the Spark-Notebook:

In order to integrate Sparkling Water with the Spark-Notebook, we need to tell the notebook to load the Sparkling Water package and specify custom spark configuration, if required. Spark then automatically distributes the H2O libraries on each of your Spark executors. Declaring Sparkling Water dependencies induces some libraries to come along by transitivity, therefore take care to ensure duplication or multiple versions of some dependencies is avoided.
The notebook metadata defines custom dependencies (ai.h2o) and dependencies to not include (because they’re already available, i.e. spark, scala and jetty). The custom local repos allow us to define where dependencies are stored locally and thus avoid downloading these each time a notebook is started.

"customLocalRepo": "/tmp/spark-notebook",
"customDeps": [
  "ai.h2o % sparkling-water-core_2.11 % 2.0.2",
  "ai.h2o % sparkling-water-examples_2.11 % 2.0.2",
  "- org.apache.hadoop % hadoop-client %   _",
  "- org.apache.spark  % spark-core_2.11    %   _",
  "- org.apache.spark % spark-mllib_2.11 % _",
  "- org.apache.spark % spark-repl_2.11 % _",
  "- org.scala-lang    %     _         %   _",
  "- org.scoverage     %     _         %   _",
  "- org.eclipse.jetty.aggregate % jetty-servlet % _"
],
"customSparkConf": {
  "spark.ext.h2o.repl.enabled": "false"
},

With these dependencies set, we can start using Sparkling Water and initiate an H2O context from within the notebook.

Benchmark example – Chicago Crime Scenes:

As an example, we can revisit the Chicago Crime Sparkling Water demo. The Spark-Notebook we used for this benchmark can be seen in a read-only mode here.

Step 1: The Three datasets are loaded as spark data frames:

  • Chicago weather data : Min, Max and Mean temperature per day
  • Chicago Census data : Average poverty, unemployment, education level and gross income per Chicago Community Area
  • Chicago historical crime data : Crime description, date, location, community area, etc. Also contains a flag telling whether the criminal has been arrested or not.

The three tables are joined using Spark into a big table with location and date as keys. A view of the first entries of the table are generated by the notebook’s automatic rendering of tables (See a sample on the table below).

spark_tables

Geospatial charts widgets are also available in the Spark-Notebook, for example, the 100 first crimes in the table:

geospatial

Step 2: We can transform the spark data frame into an H2O Frame and randomly split the H2O Frame into training and validation frames containing 80% and 20% of the rows, respectively. This is a memory to memory transformation, effectively copying and formatting data in the spark data frame into an equivalent representation in the H2O nodes (spawned by Sparkling Water into the spark executors).
We can verify that the frames are loaded into H2O by looking at the H2O Flow UI (available on port 54321 of your spark-notebook installation). We can access it by calling “openFlow” in a notebook cell.

h2oflow

Step 3: From the Spark-Notebook, we train two H2O machine learning models on the training H2O frame. For comparison, we are constructing a Deep Learning MLP model and a Gradient Boosting Machine (GBM) model. Both models are using all the data frame columns as features: time, weather, location, and neighborhood census data. Models are living in the H2O context and thus visible in the H2O flow UI. Sparkling Water functions allow us to access these from the SparkContext.

We compare the classification performance of the two models by looking at the area under the curve (AUC) on the validation dataset. The AUC measures the discrimination power of the model, that is the ability of the model to correctly classify crimes that lead to an arrest or not. The higher, the better.

The Deep Learning model leads to a 0.89 AUC while the GBM gets to 0.90 AUC. The two models are therefore quite comparable in terms of discrimination power.

Flow2

Step 4: Finally, the trained model is used to measure the probability of arrest for two specific crimes:

  • A “narcotics” related crime on 02/08/2015 11:43:58 PM in a street of community area “46” in district 4 with FBI code 18.

    The probability of being arrested predicted by the deep learning model is 99.9% and by the GBM is 75.2%.

  • A “deceptive practice” related crime on 02/08/2015 11:00:39 PM in a residence of community area “14” in district 9 with FBI code 11.

    The probability of being arrested predicted by the deep learning model is 1.4% and by the GBM is 12%.

The Spark-Notebook allows for a quick computation and visualization of the results:

spark_notebook

Summary

Combining Spark and H2O within the Spark-Notebook is a very nice set-up for scalable data science. More examples are available in the online viewer. If you are interested in running them, install the Spark-Notebook and look in this repository. From that point , you are on track for enterprise-ready interactive scalable data science.

Loic Quertenmont,
Data Scientist @ Kensu.io

What is new in Sparkling Water 2.0.3 Release?

This release has H2O core – 3.10.1.2

Important Feature:

This architectural change allows to connect to existing h2o cluster from sparkling water. This has a benefit that we are no longer affected by Spark killing it’s executors thus we should have more stable solution in environment with lots of h2o/spark node. We are working on article on how to use this very important feature in Sparkling Water 2.0.3.

Release notes: https://0xdata.atlassian.net/secure/ReleaseNote.jspa?projectId=12000&version=16601

2.0.3 (2017-01-04)

  • Bug
    • SW-152 – ClassNotFound with spark-submit
    • SW-266 – H2OContext shouldn’t be Serializable
    • SW-276 – ClassLoading issue when running code using SparkSubmit
    • SW-281 – Update sparkling water tests so they use correct frame locking
    • SW-283 – Set spark.sql.warehouse.dir explicitly in tests because of SPARK-17810
    • SW-284 – Fix CraigsListJobTitlesApp to use local file instead of trying to get one from hdfs
    • SW-285 – Disable timeline service also in python integration tests
    • SW-286 – Add missing test in pysparkling for conversion RDD[Double] -> H2OFrame
    • SW-287 – Fix bug in SparkDataFrame converter where key wasn’t random if not specified
    • SW-288 – Improve performance of Dataset tests and call super.afterAll
    • SW-289 – Fix PySparkling numeric handling during conversions
    • SW-290 – Fixes and improvements of task used to extended h2o jars by sparkling-water classes
    • SW-292 – Fix ScalaCodeHandlerTestSuite
  • New Feature
    • SW-178 – Allow external h2o cluster to act as h2o backend in Sparkling Water
  • Improvement
    • SW-282 – Integrate SW with H2O 3.10.1.2 ( Support for external cluster )
    • SW-291 – Use absolute value for random number in sparkling-water in internal backend
    • SW-295 – H2OConf should be parameterized by SparkConf and not by SparkContext

Please visit https://community.h2o.ai to learn more about it, provide feedback and ask for assistance as needed.

@avkashchauhan | @h2oai

sparklyr: R interface for Apache Spark

This post is reposted from Rstudio’s announcement on sparklyr – Rstudio’s extension for Spark

sparklyr-illustration

  • Connect to Spark from R. The sparklyr package provides a complete dplyr backend.
  • Filter and aggregate Spark datasets then bring them into R for analysis and visualization.
  • Use Spark’s distributed machine learning library from R.
  • Create extensions that call the full Spark API and provide interfaces to Spark packages.

Installation

You can install the sparklyr package from CRAN as follows:

install.packages("sparklyr")

You should also install a local version of Spark for development purposes:

library(sparklyr)
spark_install(version = "1.6.2")

To upgrade to the latest version of sparklyr, run the following command and restart your r session:

devtools::install_github("rstudio/sparklyr")

If you use the RStudio IDE, you should also download the latest preview release of the IDE which includes several enhancements for interacting with Spark (see the RStudio IDE section below for more details).

Connecting to Spark

You can connect to both local instances of Spark as well as remote Spark clusters. Here we’ll connect to a local instance of Spark via the spark_connect function:

library(sparklyr)
sc <- spark_connect(master = "local")

The returned Spark connection (sc) provides a remote dplyr data source to the Spark cluster.

For more information on connecting to remote Spark clusters see the Deployment section of the sparklyr website.

Using dplyr

We can new use all of the available dplyr verbs against the tables within the cluster.

We’ll start by copying some datasets from R into the Spark cluster (note that you may need to install the nycflights13 and Lahman packages in order to execute this code):

install.packages(c("nycflights13", "Lahman"))
library(dplyr)
iris_tbl <- copy_to(sc, iris)
flights_tbl <- copy_to(sc, nycflights13::flights, "flights")
batting_tbl <- copy_to(sc, Lahman::Batting, "batting")
src_tbls(sc)
## [1] "batting" "flights" "iris"

To start with here’s a simple filtering example:

# filter by departure delay and print the first few records
flights_tbl %>% filter(dep_delay == 2)
## Source:   query [?? x 19]
## Database: spark connection master=local[8] app=sparklyr local=TRUE
## 
##     year month   day dep_time sched_dep_time dep_delay arr_time
##    <int> <int> <int>    <int>          <int>     <dbl>    <int>
## 1   2013     1     1      517            515         2      830
## 2   2013     1     1      542            540         2      923
## 3   2013     1     1      702            700         2     1058
## 4   2013     1     1      715            713         2      911
## 5   2013     1     1      752            750         2     1025
## 6   2013     1     1      917            915         2     1206
## 7   2013     1     1      932            930         2     1219
## 8   2013     1     1     1028           1026         2     1350
## 9   2013     1     1     1042           1040         2     1325
## 10  2013     1     1     1231           1229         2     1523
## # ... with more rows, and 12 more variables: sched_arr_time <int>,
## #   arr_delay <dbl>, carrier <chr>, flight <int>, tailnum <chr>,
## #   origin <chr>, dest <chr>, air_time <dbl>, distance <dbl>, hour <dbl>,
## #   minute <dbl>, time_hour <dbl>

Introduction to dplyr provides additional dplyr examples you can try. For example, consider the last example from the tutorial which plots data on flight delays:

delay <- flights_tbl %>% 
  group_by(tailnum) %>%
  summarise(count = n(), dist = mean(distance), delay = mean(arr_delay)) %>%
  filter(count > 20, dist < 2000, !is.na(delay)) %>%
  collect

# plot delays
library(ggplot2)
ggplot(delay, aes(dist, delay)) +
  geom_point(aes(size = count), alpha = 1/2) +
  geom_smooth() +
  scale_size_area(max_size = 2)

ggplot2-flights

Window Functions

dplyr window functions are also supported, for example:

batting_tbl %>%
  select(playerID, yearID, teamID, G, AB:H) %>%
  arrange(playerID, yearID, teamID) %>%
  group_by(playerID) %>%
  filter(min_rank(desc(H)) <= 2 & H > 0)
## Source:   query [?? x 7]
## Database: spark connection master=local[8] app=sparklyr local=TRUE
## Groups: playerID
## 
##     playerID yearID teamID     G    AB     R     H
##        <chr>  <int>  <chr> <int> <int> <int> <int>
## 1  abbotpa01   2000    SEA    35     5     1     2
## 2  abbotpa01   2004    PHI    10    11     1     2
## 3  abnersh01   1992    CHA    97   208    21    58
## 4  abnersh01   1990    SDN    91   184    17    45
## 5  abreujo02   2014    CHA   145   556    80   176
## 6  acevejo01   2001    CIN    18    34     1     4
## 7  acevejo01   2004    CIN    39    43     0     2
## 8  adamsbe01   1919    PHI    78   232    14    54
## 9  adamsbe01   1918    PHI    84   227    10    40
## 10 adamsbu01   1945    SLN   140   578    98   169
## # ... with more rows

For additional documentation on using dplyr with Spark see the dplyr section of the sparklyr website.

Using SQL

It’s also possible to execute SQL queries directly against tables within a Spark cluster. The spark_connection object implements a DBI interface for Spark, so you can use dbGetQuery to execute SQL and return the result as an R data frame:

library(DBI)
iris_preview <- dbGetQuery(sc, "SELECT * FROM iris LIMIT 10")
iris_preview
##    Sepal_Length Sepal_Width Petal_Length Petal_Width Species
## 1           5.1         3.5          1.4         0.2  setosa
## 2           4.9         3.0          1.4         0.2  setosa
## 3           4.7         3.2          1.3         0.2  setosa
## 4           4.6         3.1          1.5         0.2  setosa
## 5           5.0         3.6          1.4         0.2  setosa
## 6           5.4         3.9          1.7         0.4  setosa
## 7           4.6         3.4          1.4         0.3  setosa
## 8           5.0         3.4          1.5         0.2  setosa
## 9           4.4         2.9          1.4         0.2  setosa
## 10          4.9         3.1          1.5         0.1  setosa

Machine Learning

You can orchestrate machine learning algorithms in a Spark cluster via the machine learning functions within sparklyr. These functions connect to a set of high-level APIs built on top of DataFrames that help you create and tune machine learning workflows.

Here’s an example where we use ml_linear_regression to fit a linear regression model. We’ll use the built-in mtcars dataset, and see if we can predict a car’s fuel consumption (mpg) based on its weight (wt), and the number of cylinders the engine contains (cyl). We’ll assume in each case that the relationship between mpg and each of our features is linear.

# copy mtcars into spark
mtcars_tbl <- copy_to(sc, mtcars)

# transform our data set, and then partition into 'training', 'test'
partitions <- mtcars_tbl %>%
  filter(hp >= 100) %>%
  mutate(cyl8 = cyl == 8) %>%
  sdf_partition(training = 0.5, test = 0.5, seed = 1099)

# fit a linear model to the training dataset
fit <- partitions$training %>%
  ml_linear_regression(response = "mpg", features = c("wt", "cyl"))
fit
## Call: ml_linear_regression(., response = "mpg", features = c("wt", "cyl"))
## 
## Coefficients:
## (Intercept)          wt         cyl 
##   37.066699   -2.309504   -1.639546

For linear regression models produced by Spark, we can use summary() to learn a bit more about the quality of our fit, and the statistical significance of each of our predictors.

summary(fit)
## Call: ml_linear_regression(., response = "mpg", features = c("wt", "cyl"))
## 
## Deviance Residuals::
##     Min      1Q  Median      3Q     Max 
## -2.6881 -1.0507 -0.4420  0.4757  3.3858 
## 
## Coefficients:
##             Estimate Std. Error t value  Pr(>|t|)    
## (Intercept) 37.06670    2.76494 13.4059 2.981e-07 ***
## wt          -2.30950    0.84748 -2.7252   0.02341 *  
## cyl         -1.63955    0.58635 -2.7962   0.02084 *  
## ---
## Signif. codes:  0 '***' 0.001 '**' 0.01 '*' 0.05 '.' 0.1 ' ' 1
## 
## R-Squared: 0.8665
## Root Mean Squared Error: 1.799

Spark machine learning supports a wide array of algorithms and feature transformations and as illustrated above it’s easy to chain these functions together with dplyr pipelines. To learn more see the machine learning section.

Reading and Writing Data

You can read and write data in CSV, JSON, and Parquet formats. Data can be stored in HDFS, S3, or on the lcoal filesystem of cluster nodes.

temp_csv <- tempfile(fileext = ".csv")
temp_parquet <- tempfile(fileext = ".parquet")
temp_json <- tempfile(fileext = ".json")

spark_write_csv(iris_tbl, temp_csv)
iris_csv_tbl <- spark_read_csv(sc, "iris_csv", temp_csv)

spark_write_parquet(iris_tbl, temp_parquet)
iris_parquet_tbl <- spark_read_parquet(sc, "iris_parquet", temp_parquet)

spark_write_csv(iris_tbl, temp_json)
iris_json_tbl <- spark_read_csv(sc, "iris_json", temp_json)

src_tbls(sc)
## [1] "batting"      "flights"      "iris"         "iris_csv"    
## [5] "iris_json"    "iris_parquet" "mtcars"

Extensions

The facilities used internally by sparklyr for its dplyr and machine learning interfaces are available to extension packages. Since Spark is a general purpose cluster computing system there are many potential applications for extensions (e.g. interfaces to custom machine learning pipelines, interfaces to 3rd party Spark packages, etc.).

Here’s a simple example that wraps a Spark text file line counting function with an R function:

# write a CSV 
tempfile <- tempfile(fileext = ".csv")
write.csv(nycflights13::flights, tempfile, row.names = FALSE, na = "")

# define an R interface to Spark line counting
count_lines <- function(sc, path) {
  spark_context(sc) %>% 
    invoke("textFile", path, 1L) %>% 
      invoke("count")
}

# call spark to count the lines of the CSV
count_lines(sc, tempfile)
## [1] 336777

To learn more about creating extensions see the Extensions section of the sparklyr website.

dplyr Utilities

You can cache a table into memory with:

tbl_cache(sc, "batting")

and unload from memory using:

tbl_uncache(sc, "batting")

Connection Utilities

You can view the Spark web console using the spark_web function:

spark_web(sc)

You can show the log using the spark_log function:

spark_log(sc, n = 10)
## 16/09/24 07:50:59 INFO ContextCleaner: Cleaned accumulator 224
## 16/09/24 07:50:59 INFO ContextCleaner: Cleaned accumulator 223
## 16/09/24 07:50:59 INFO ContextCleaner: Cleaned accumulator 222
## 16/09/24 07:50:59 INFO BlockManagerInfo: Removed broadcast_64_piece0 on localhost:56324 in memory (size: 20.6 KB, free: 483.0 MB)
## 16/09/24 07:50:59 INFO ContextCleaner: Cleaned accumulator 220
## 16/09/24 07:50:59 INFO Executor: Finished task 0.0 in stage 67.0 (TID 117). 2082 bytes result sent to driver
## 16/09/24 07:50:59 INFO TaskSetManager: Finished task 0.0 in stage 67.0 (TID 117) in 122 ms on localhost (1/1)
## 16/09/24 07:50:59 INFO DAGScheduler: ResultStage 67 (count at NativeMethodAccessorImpl.java:-2) finished in 0.122 s
## 16/09/24 07:50:59 INFO TaskSchedulerImpl: Removed TaskSet 67.0, whose tasks have all completed, from pool 
## 16/09/24 07:50:59 INFO DAGScheduler: Job 47 finished: count at NativeMethodAccessorImpl.java:-2, took 0.125238 s

Finally, we disconnect from Spark:

spark_disconnect(sc)

RStudio IDE

The latest RStudio Preview Release of the RStudio IDE includes integrated support for Spark and the sparklyr package, including tools for:

  • Creating and managing Spark connections
  • Browsing the tables and columns of Spark DataFrames
  • Previewing the first 1,000 rows of Spark DataFrames

Once you’ve installed the sparklyr package, you should find a new Spark pane within the IDE. This pane includes a New Connection dialog which can be used to make connections to local or remote Spark instances:

spark-connect

Once you’ve connected to Spark you’ll be able to browse the tables contained within the Spark cluster:

spark-tab

The Spark DataFrame preview uses the standard RStudio data viewer:

spark-connect

The RStudio IDE features for sparklyr are available now as part of the RStudio Preview Release.

Spam Detection with Sparkling Water and Spark Machine Learning Pipelines

This short post presents the “ham or spam” demo, which has already been posted earlier by Michal Malohlava, using our new API in latest Sparkling Water for Spark 1.6 and earlier versions, unifying Spark and H2O Machine Learning pipelines. It shows how to create a simple Spark Machine Learning pipeline and a model based on the fitted pipeline, which can be later used for prediction whether a particular message is spam or not.

Before diving into the demo steps, we would like to provide some details about the new features in the upcoming Sparkling Water 2.0:

  • Support for Apache Spark 2.0 and backwards compatibility with all previous versions.
  • The ability to run Apache Spark and Scala through H2O’s Flow UI.
  • H2O feature improvements and visualizations for MLlib algorithms, including the ability to score feature importance.
  • Visual intelligence for Apache Spark.
  • The ability to build Ensembles using H2O plus MLlib algorithms.
  • The power to export MLlib models as POJOs (Plain Old Java Objects), which can be easily run on commodity hardware.
  • A toolchain for ML pipelines.
  • Debugging support for Spark pipelines.
  • Model and data governance through Steam.
  • Bringing H2O’s powerful data munging capabilities to Apache Spark.
  • In order to run the code below, start your Spark shell with attached Sparkling Water JAR or use sparkling-shell script that already does this for you.

    You can start the Spark shell with Sparkling Water as follows:

    $SPARK_HOME/bin/spark-submit \
    --class water.SparklingWaterDriver \
    --packages ai.h2o:sparkling-water-examples_2.10:1.6.5 \
    --executor-memory=6g \
    --driver-memory=6g /dev/null
    

    Preferable Spark is Spark 1.6 and Sparkling Water 1.6.x.

    Prepare the coding environment

    Here we just import all required libraries.

    import org.apache.spark.SparkFiles
    import org.apache.spark.ml.PipelineModel
    import org.apache.spark.ml.feature._
    import org.apache.spark.ml.h2o.H2OPipeline
    import org.apache.spark.ml.h2o.features.{ColRemover, DatasetSplitter}
    import org.apache.spark.ml.h2o.models.H2ODeepLearning
    import org.apache.spark.sql.types.{StringType, StructField, StructType}
    import org.apache.spark.sql.{DataFrame, Row, SQLContext}
    import water.support.SparkContextSupport
    import water.fvec.H2OFrame
    

    Add our dataset to Spark environment. The dataset consists of 2 columns where the first one is the label ( ham or spam ) and the second one is the message itself. We don’t have to explicitly ask for Spark context since it’s already available via sc variable.

    val smsDataFileName = "smsData.txt"
    val smsDataFilePath = "examples/smalldata/" + smsDataFileName
    SparkContextSupport.addFiles(sc, smsDataFilePath)
    

    Create SQL support.

    implicit val sqlContext = SQLContext.getOrCreate(sc)
    

    Start H2O services.

    import org.apache.spark.h2o._
    implicit val h2oContext = H2OContext.getOrCreate(sc)
    

    Create helper method which loads the dataset, performs some basic filtering and at last creates Spark’s DataFrame with 2 columns – label and text.

    def load(dataFile: String)(implicit sqlContext: SQLContext): DataFrame = {
    val smsSchema = StructType(Array(
    StructField("label", StringType, nullable = false),
    StructField("text", StringType, nullable = false)))
    val rowRDD = sc.textFile(SparkFiles.get(dataFile)).map(_.split("\t")).filter(r => !r(0).isEmpty).map(p => Row(p(0),p(1)))
    sqlContext.createDataFrame(rowRDD, smsSchema)
    }
    

    Define the pipeline stages

    In Spark, a pipeline is formed of two basic elements – transformers and estimators. Estimators usually encapsulate an algorithm for model generation and their output are transformers. During fitting the pipeline stage, all transformers and estimators are executed and estimators are converted to transformers. The model generated by the pipeline contains only transformers. More about Spark pipelines can be found on Spark’s pipeline overview

    In H2O we created a new type of pipeline stage, which is called OneTimeTransformer. This transformer works similarly to Spark’s estimator in a way that it is only executed during fitting the pipeline stage. It does not however produces a transformer during fitting pipeline stage and the model generated by the pipeline does not contain this OneTimeTransformer.
    An example for one-time transformer is splitting the input data into a validation and training dataset using H2O Frames. We don’t need this one-time transformer to be executed every time we do prediction on the model. We just need this code to be executed when we are fitting the pipeline to the data.

    This pipeline stage is using Spark’s RegexTokenizer to tokenize the messages. We just specify input column and output column for tokenized messages.

    val tokenizer = new RegexTokenizer().
        setInputCol("text").
        setOutputCol("words").
        setMinTokenLength(3).
        setGaps(false).
        setPattern("[a-zA-Z]+")
    

    Remove unnecessary words using Spark’s StopWordsRemover.

    val stopWordsRemover = new StopWordsRemover().
        setInputCol(tokenizer.getOutputCol).
        setOutputCol("filtered").
        setStopWords(Array("the", "a", "", "in", "on", "at", "as", "not", "for")).
        setCaseSensitive(false)
    

    Vectorize the words using Spark’s HashingTF.

    val hashingTF = new HashingTF().
        setNumFeatures(1 << 10).
        setInputCol(tokenizer.getOutputCol).
        setOutputCol("wordToIndex")
    

    Create inverse document frequencies based on hashed words. It creates a numerical representation of how much information a
    given word provides in the whole message.

    val idf = new IDF().
        setMinDocFreq(4).
        setInputCol(hashingTF.getOutputCol).
        setOutputCol("tf_idf")
    

    This pipeline stage is one-time transformer. If setKeep(true) is called in it, it preserves specified columns instead
    of deleting them.

    val colRemover = new ColRemover().
        setKeep(true).
        setColumns(Array[String]("label", "tf_idf"))
    

    Split the dataset and store the splits with the specified keys into H2O’s distributed storage called DKV. This is one-time transformer which is executed only during fitting stage. It determines the frame, which is passed on the output in the following order:

    1. If the train key is specified using setTrainKey method and the key is also specified in the list of keys, then frame with this key is passed on the output
    2. Otherwise, if the default key Р“train.hex” is specified in the list of keys, then frame with this key is passed on the output
    3. Otherwise the first frame specified in the list of keys is passed on the output
    val splitter = new DatasetSplitter().
      setKeys(Array[String]("train.hex", "valid.hex")).
      setRatios(Array[Double](0.8)).
      setTrainKey("train.hex")
    

    Create H2O’s deep learning model.
    If the key specifying the training set is set using setTrainKey, then frame with this key is used as the training frame, otherwise it uses the frame from the previous stage as the training frame

    val dl = new H2ODeepLearning().
      setEpochs(10).
      setL1(0.001).
      setL2(0.0).
      setHidden(Array[Int](200, 200)).
      setValidKey(splitter.getKeys(1)).
      setResponseColumn("label")
    

    Create and fit the pipeline

    Create the pipeline using the stages we defined earlier. As a normal Spark pipeline, it can be formed of Spark’s transformers and estimators, but it also may contain H2O’s one-time transformers.

    val pipeline = new H2OPipeline().
      setStages(Array(tokenizer, stopWordsRemover, hashingTF, idf, colRemover, splitter, dl))
    

    Train the pipeline model by fitting it to a Spark’s DataFrame

    val data = load("smsData.txt")
    val model = pipeline.fit(data)
    

    Now we can optionally save the model to disk and load it again.

    model.write.overwrite().save("/tmp/hamOrSpamPipeline")
    val loadedModel = PipelineModel.load("/tmp/hamOrSpamPipeline")
    

    We can also save this unfitted pipeline to disk and load it again.

    pipeline.write.overwrite().save("/tmp/unfit-hamOrSpamPipeline")
    val loadedPipeline = H2OPipeline.load("/tmp/unfit-hamOrSpamPipeline")
    

    Train the pipeline model again on loaded pipeline just to show deserialized model works as it should.

    val modelOfLoadedPipeline = loadedPipeline.fit(data)
    

    Create helper function for predictions on unlabeled data. This method is using model generated by the pipeline. To make a prediction we call transform method with Spark’s Dataframe as an argument on the generated model. This call executes each transformer specified in the pipeline one after one producing Spark’s DataFrame with predictions.

    def isSpam(smsText: String,
               model: PipelineModel,
               h2oContext: H2OContext,
               hamThreshold: Double = 0.5):Boolean = {
      import h2oContext.implicits._
      val smsTextDF = sc.parallelize(Seq(smsText)).toDF("text") // convert to dataframe with one column named "text"
      val prediction: H2OFrame = model.transform(smsTextDF)
      prediction.vecs()(1).at(0) < hamThreshold
    }
    

    Try it!

    println(isSpam("Michal, h2oworld party tonight in MV?", modelOfLoadedPipeline, h2oContext))
    println(isSpam("We tried to contact you re your reply to our offer of a Video Handset? 750 anytime any networks mins? UNLIMITED TEXT?", loadedModel, h2oContext))
    

    In this article we showed how Spark’s pipelines and H2O algorithms work together seamlessly in Spark environment. We strive to be consistent with Spark API in H2O.ai and make the life of a developer/data scientist easier by hiding H2O internals and exposing the APIs that are natural for Spark users.

    Databricks and H2O Make it Rain with Sparkling Water

    **This blog post was first posted on the Databricks blog here

    Databricks provides a cloud-based integrated workspace on top of Apache Spark for developers and data scientists. H2O.ai has been an early adopter of Apache Spark and has developed Sparkling Water to seamlessly integrate H2O.ai’s machine learning library on top of Spark.

    In this blog, we will demonstrate an integration between the Databricks platform and H2O.ai’s Sparking Water that provides Databricks users with an additional set of machine learning libraries. The integration allows data scientists to utilize Sparkling Water with Spark in a notebook environment more easily, allowing them to seamlessly combine Spark with H2O and get the best of both worlds.

    Let’s begin by preparing a Databricks environment to develop our spam predictor:

    The first step is to log into your Databricks account and create a new library containing Sparkling Water. You can use the Maven coordinates of the Sparkling Water package, for example: h2o:sparkling-water-examples_2.10:1.5.6 (this version works with Spark 1.5)

    1

    The next step is to create a new cluster to run the example:

    2

    For this version of the Sparkling Water library we will use Spark 1.5. The name of the created cluster is “HamOrSpamCluster” – keep it handy as we will need it later.

    The next step is to upload data, you can use table import and upload the smsData.txt file

    3

    Now the environment is ready and you can create a Databricks notebook; connect it to “HamOrSpamCluster” and start building a predictive model!

    The goal of the application is to write a spam detector using a trained model to categorize incoming messages

    First look at the data. It contains raw text messages that are labeled as either spam or ham.
    For example:

    spam +123 Congratulations – in this week’s competition draw u have won the ?1450 prize to claim just call 09050002311 b4280703. T&Cs/stop SMS 08718727868. Over 18 only 150
    ham Yun ah.the ubi one say if ? wan call by tomorrow.call 67441233 look for irene.ere only got bus8,22,65,6

    We need to transform these messages into vectors of numbers and then train a binomial model to predict whether the text message is either SPAM or HAM. For the transformation of a message into a vector of numbers we will use Spark MLlib string tokenization and word to vector transformers. We are going to split messages into tokens and use the TF (term frequency–inverse document frequency) technique to represent words of importance inside the training data set:

    // Representation of a training message
    import org.apache.spark.mllib.linalg.Vector
    case class SMS(target: String, fv: Vector)
    def tokenize(data: RDD[String]): RDD[Seq[String]] = {
    val ignoredWords = Seq("the", "a", "", "in", "on", "at", "as", "not", "for")
    val ignoredChars = Seq(',', ':', ';', '/', '<', '>', '"', '.', '(', ')', '?', '-', '\'','!','0', '1')
    
    val texts = data.map( r=> {
    var smsText = r.toLowerCase
    for( c <- ignoredChars) {
    smsText = smsText.replace(c, ' ')
    }
    
    val words =smsText.split(" ").filter(w => !ignoredWords.contains(w) && w.length>2).distinct
    
    words.toSeq
    })
    texts
    }
    import org.apache.spark.mllib.feature._
    
    def buildIDFModel(tokens: RDD[Seq[String]],
    minDocFreq:Int = 4,
    hashSpaceSize:Int = 1 << 10): (HashingTF, IDFModel, RDD[Vector]) = {
    // Hash strings into the given space
    val hashingTF = new HashingTF(hashSpaceSize)
    val tf = hashingTF.transform(tokens)
    // Build term frequency-inverse document frequency
    val idfModel = new IDF(minDocFreq = minDocFreq).fit(tf)
    val expandedText = idfModel.transform(tf)
    (hashingTF, idfModel, expandedText)
    }
    

    The resulting table will contain the following lines:

    spam 0, 0, 0.31, 0.12, ….
    ham 0.67, 0, 0, 0, 0, 0.003, 0, 0.1

    After this we are free to experiment with different binary classification algorithms in H2O.

    To start using H2O, we need to initialize the H2O service by creating an H2OContext:

    // Create SQL support
    import org.apache.spark.sql._
    implicit val sqlContext = SQLContext.getOrCreate(sc)
    import sqlContext.implicits._
    
    // Start H2O services
    import org.apache.spark.h2o._
    @transient val h2oContext = new H2OContext(sc).start()
    

    H2OContext represents H2O running on top of a Spark cluster. You should see the following output:

    4

    For this demonstration, we will leverage the H2O Deep Learning method:

    // Define function which builds a DL model
    import org.apache.spark.h2o._
    import water.Key
    import <em>root</em>.hex.deeplearning.DeepLearning
    import <em>root</em>.hex.deeplearning.DeepLearningParameters
    import <em>root</em>.hex.deeplearning.DeepLearningModel
    
    def buildDLModel(train: Frame, valid: Frame,
    epochs: Int = 10, l1: Double = 0.001, l2: Double = 0.0,
    hidden: Array[Int] = Array[Int](200, 200))
    (implicit h2oContext: H2OContext): DeepLearningModel = {
    import h2oContext._
    // Build a model
    
    val dlParams = new DeepLearningParameters()
    dlParams._model_id = Key.make("dlModel.hex")
    dlParams._train = train
    dlParams._valid = valid
    dlParams._response_column = 'target
    dlParams._epochs = epochs
    dlParams._l1 = l1
    dlParams._hidden = hidden
    
    // Create a job
    val dl = new DeepLearning(dlParams)
    val dlModel = dl.trainModel.get
    
    // Compute metrics on both datasets
    dlModel.score(train).delete()
    dlModel.score(valid).delete()
    
    dlModel
    }
    

    Here is the final application:

    // Build the application
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.examples.h2o.DemoUtils._
    import scala.io.Source
    
    // load both columns from the table
    val data = sqlContext.sql("SELECT * FROM smsData")
    // Extract response spam or ham
    val hamSpam = data.map( r => r(0).toString)
    val message = data.map( r => r(1).toString)
    // Tokenize message content
    val tokens = tokenize(message)
    // Build IDF model
    var (hashingTF, idfModel, tfidf) = buildIDFModel(tokens)
    
    // Merge response with extracted vectors
    val resultRDD: DataFrame = hamSpam.zip(tfidf).map(v => SMS(v._1, v._2)).toDF
    
    // Publish Spark DataFrame as H2OFrame
    // This H2OFrame has to be transient because we do not want it to be serialized. When calling for example sc.parallelize(..) the object which we are trying to parallelize takes with itself all variables in its surroundings scope - apart from those marked as serialized.
    // 
    @transient val table = h2oContext.asH2OFrame(resultRDD)
    println(sc.parallelize(Array(1,2)))
    // Transform target column into categorical
    table.replace(table.find("target"), table.vec("target").toCategoricalVec()).remove()
    table.update(null)
    
    // Split table
    val keys = Array[String]("train.hex", "valid.hex")
    val ratios = Array<a href="0.8">Double</a>
    @transient val frs = split(table, keys, ratios)
    @transient val train = frs(0)
    @transient val valid = frs(1)
    table.delete()
    
    // Build a model
    @transient val dlModel = buildDLModel(train, valid)(h2oContext)
    

    And voila we have a Deep Learning Model ready to detect spam

    At this point you can explore quality of the model:

    scala
    // Collect model metrics and evaluate model quality
    import water.app.ModelMetricsSupport
    val validMetrics = ModelMetricsSupport.binomialMM(dlModel, valid)
    println(validMetrics.auc._auc)


    You can also use the H2O Flow UI by clicking on the URL provided when you instantiated the H2O Context.

    5

    At this point we have everything ready to create a spam detector:

    scala
    // Create a spam detector - a method which will return SPAM or HAM for given text message
    import water.DKV._
    // Spam detector
    def isSpam(msg: String,
    modelId: String,
    hashingTF: HashingTF,
    idfModel: IDFModel,
    h2oContext: H2OContext,
    hamThreshold: Double = 0.5):String = {
    val dlModel: DeepLearningModel = water.DKV.getGet(modelId)
    val msgRdd = sc.parallelize(Seq(msg))
    val msgVector: DataFrame = idfModel.transform(
    hashingTF.transform (
    tokenize (msgRdd))).map(v =&gt; SMS(&quot;?&quot;, v)).toDF
    val msgTable: H2OFrame = h2oContext.asH2OFrame(msgVector)
    msgTable.remove(0) // remove first column
    val prediction = dlModel.score(msgTable)
    //println(prediction)
    if (prediction.vecs()(1).at(0) &lt; hamThreshold) &quot;SPAM DETECTED!&quot; else &quot;HAM&quot;
    }

    The method uses built-in models to transform incoming text message and provide a prediction – SPAM or HAM. For example:

    6

    We’ve shown a fast and easy way to build a spam detector with Databricks and Sparkling Water. To try this out for yourself, register for a free 14-day trial of Databricks and check out the Sparkling Water example in the Databricks Guide.