H2O’s AutoML in Spark

This blog post demonstrates how H2O’s powerful automatic machine learning can be used together with the Spark in Sparkling Water.

We show the benefits of Spark & H2O integration, use Spark for data munging tasks and H2O for the modelling phase, where all these steps are wrapped inside a Spark Pipeline. The integration between Spark and H2O can be see on the figure below. All technical details behind this integration are explained in our documentation which you can access from here.

Sparkling Water Architecture

At the end of this blog post, we also show how the generated model can be taken into production using Spark Streaming application. We use Python and PySparkling for model training phase and Scala for the deployment example.

For the purpose of this blog, we use the Combined Cycle Power Plant dataset. The goal here is to predict the energy output (in megawatts), given the temperature, ambient pressure, relative humidity and exhaust vacuum values. We will alter the dataset a little bit for the blog post purposes and use only rows where the temperature is higher then 10 degrees celsius. This can be explained such as that we are interested just in plant performance in the warmer days.

Obtain Sparkling Water

First step is to download Sparkling Water. It can be downloaded from our official download page. Please make sure to use the latest Sparkling Water as the H2OAutoml in Sparkling Water is a fairly new feature available in the latest versions. Once you downloaded the Sparkling Water, please follow the instructions on the PySparkling tab on how to start PySparkling interpreter.

Download Sparkling Water

Start Sparkling Water

In order to be able to use both Spark and H2O alongside, we need to make H2O available inside the Spark cluster. This can be achieved by
running the code below.

from pysparkling import * # Import PySparkling
hc = H2OContext.getOrCreate(spark) # Start the H2OContext

This code starts H2O node on each spark executor and a special H2O node called client node inside the Spark driver.

Load Data into Sparkling Water

We use Spark to load the data into memory. For that, we can use the line below:

powerplant_df = spark.read.option("inferSchema", "true").csv("powerplant_output.csv", header=True)

This code imports the file from the specified location into the Spark cluster and creates a Spark Dataframe from it. The original datafile can be downloaded from here. It is important to specify the inferSchema option to true because otherwise, Spark won’t try to automatically infer the data types and we will have all columns with type String. This way, the types are correctly inferred.

We will use a portion of this data for the training purposes and a portion for demonstrating the predictions. We can use randomSplit method available on the Spark Dataframe to split the data as:

splits = powerplant_df.randomSplit([0.8, 0.2], 1)
train = splits[0]
for_predictions = splits[1]

We split the dataset and give 80% to one split and 20% to another. The last argument specifies the seed to the method can behave deterministically. We use the 80% part for the training purposes and the second part for scoring later.

Define the Pipeline with H2O AutoML

Now, we can define the Spark pipeline containing the H2O AutoML. Before we do that, we need to do a few imports so all classes and methods we require are available:

from pysparkling.ml import H2OAutoML
from pyspark.ml import Pipeline
from pyspark.ml.feature import SQLTransformer

And finally, we can start building the pipeline stages. The first pipeline stage is the SQLTransformer used for selecting the rows where the temperature is higher than 10 degrees celsius. The SQLTransformer is powerful Spark transformer where we can specify any Spark SQL code which we want to execute on the dataframe passed to from the pipeline.

temperatureTransformer = SQLTransformer(statement="SELECT * FROM __THIS__ WHERE TemperatureCelcius > 10")

It is important to understand that no code is executed at this stage as we are just defining the stages. We will show how to execute the whole pipeline later in the blog post.

The next pipeline stage is not a transformer, but estimator and is used for creating the H2O model using the H2O AutoML algorithm. This estimator is provided by the Sparkling Water library, but we can see that the API is unified with the other Spark pipeline stages.

automlEstimator = H2OAutoML(maxRuntimeSecs=60, predictionCol="HourlyEnergyOutputMW", ratio=0.9)

We defined the H2OAutoML estimator. The maxRuntimeSecs argument specifies how long we want to run the automl algorithm. The predictionCol specifies the response column and the ratio argument specifies how big part of dataset is used for the training purposes.
We specified that we want to use 90% of data for training purposes and 10% of data for the validation.

As we have defined both stages we need, we can define the whole pipeline:

pipeline = Pipeline(stages=[temperatureTransformer, automlEstimator])

And finally, train it on on the training dataset we prepared above:

model = automlEstimator.fit(df)

This call goes through all the pipeline stages and in case of estimators, creates a model. So as part of this call, we run the H2O AutoML algorithm and find the best model given the search criteria we specified in the arguments. The model variable contains the whole Spark pipeline model, which also internally contains the model found by automl. The H2O model stored inside is stored in the H2O MOJO format. That means that it is independent from the H2O runtime and therefore, it can be run anywhere without initializing an H2O cluster. For more information about MOJO, please visit the MOJO documentation.

Prediction

We can run generate predictions on the returned model simply as:

predicted = model.transform(for_predictions)

This call again goes through all the pipeline stages and in case it hits a stage with a model, it performs a scoring operation.

We can also see a few first results as:

predicted.take(2)

Export Model for Deployment

In the following part of the blog post, we show how to put this model into production. For that, we need to export the model, which can be done simply as:

model.write().overwrite().save("pipeline.model")

This call will store the model into the pipeline.model file. It is also helpful trick to export schema of the data. This is especially useful in the case of streaming applications where it’s hard to determine the type of data based on a single row in the input.

We can export the schema as:

with open('schema.json','w') as f:
    f.write(str(powerplant_df.schema.json()))

Deploy the Model

Now, we would like to demonstrate how the Spark pipeline with model found by automl can be put into production in case of Spark Streaming application. For the deployment, we can start a new Spark application, it can be in Scala or Python and we can load the trained pipeline model. The pipeline model contains the H2O AutoML model packaged as a MOJO and therefore, it is independent on the H2O runtime. We will use Scala to demonstrate the language independence of the exported pipeline.

The deployment consist of several steps:

  • Load the schema from the schema file.
  • Create input data stream and pass it the schema. The input data stream will point to a directory where new csv files will be coming from different streaming sources. It can also be a on-line source of streaming data.
  • Load the pipeline from the pipeline file<./li>
  • Create output data stream. For our purposes, we store the data into memory and also to a SparkSQL table so we can see immediate results.

// Start Spark
val spark = SparkSession.builder().master("local").getOrCreate()


// Load exported pipeline
import org.apache.spark.sql.types.DataType
val pipelineModel = PipelineModel.read.load("pipeline.model")


// Load exported schema of input data
val schema = StructType(DataType.fromJson(scala.io.Source.fromFile("schema.json").mkString).asInstanceOf[StructType].map {
  case StructField(name, dtype, nullable, metadata) => StructField(name, dtype, true, metadata)
  case rec => rec
})
println(schema)


// Define input stream
val inputDataStream = spark.readStream.schema(schema).csv("/path/to/folder/where/input/data/are/being/generated")


// Apply loaded model
val outputDataStream = pipelineModel.transform(inputDataStream)


// Forward output stream into memory-sink
outputDataStream.writeStream.format("memory").queryName("predictions").start()


// Query results
while(true){
  spark.sql("select * from predictions").show()
  Thread.sleep(5000)
}

Conclusion

This code demonstrates that we can relatively easily put a pipeline model into production. We used Python for the model creation and JVM-based language for the deployment. The resulting pipeline model contains model found by H2O automl algorithm, exported as MOJO. This means that we don’t need to start H2O or Sparkling Water in place where we deploy the model (but we need to ensure that Sparkling Water dependencies are on the classpath).

Sparkling Water 2.3.0 is now available!

Hi Makers!

We are happy to announce that Sparkling Water now fully supports Spark 2.3 and is available from our download page.

If you are using an older version of Spark, that’s no problem. Even though we suggest upgrading to the latest version possible, we keep the Sparkling Water releases for Spark 2.2 and 2.1 up-to-date with the latest version if we are not limited by Spark.

The last release of Sparkling Water contained several important bug fixes. The 3 major bug fixes are:

  • Handle nulls properly in H2OMojoModel. In the previous versions, running predictions on the H2OMojoModel with null values would fail. We now handle the null values as missing values and it no longer fails.

  • We marked the Spark dependencies in our maven packages as provided. This means that we assume that Spark dependencies are always provided by the run-time, which should always be true. This ensures a cleaner and more transparent Sparkling Water environment.

  • In PySparkling, the method as_h2o_frame didn’t issue an alert when we passed in a wrong input type. This method accepts only Spark DataFrames and RDDs, however, some users tried to pass different types and this method ended silently. Now we fail if the user passes a wrong data type to this method.

It is also important to mention that Spark 2.3 removed support for Scala 2.10. We’ve done the same in the release for Spark 2.3. Scala 2.10 is still supported in the older Spark versions.

The latest Sparkling Water versions also integrated with H2O 3.18.0.5 which brings several important fixes. The full change log for H2O 3.18.0.5 is available here and the full Sparkling Water change log can be viewed here.

Thank you!

Kuba

Senior Software Engineer, Sparkling Water Team

Sparkling Water 2.2.10 is now available!

Hi Makers!

There are several new features in the latest Sparkling Water. The major new addition is that we now publish Sparkling Water documentation as a website which is available here. This link is for Spark 2.2.

We have also documented and fixed a few issues with LDAP on Sparkling Water. Exact steps are provided in the documentation.

Bundled H2O was upgraded to 3.18.0.4 which brings the ordinal regression for GLM as the major change.

The last major change included in this release is the availability of the H2O AutoML and H2O Grid Search transformation for the Spark pipelines. They are now being exposed as regular Spark Estimator and can be used within your Spark pipelines. An example PySparkling AutoML script can be found here.

The full changelog can be viewed here.

Sparkling Water is already integrated with Spark 2.3 on master and the next release will be also for this latest Spark version.

Stay tuned!

Spam Detection with Sparkling Water and Spark Machine Learning Pipelines

This short post presents the “ham or spam” demo, which has already been posted earlier by Michal Malohlava, using our new API in latest Sparkling Water for Spark 1.6 and earlier versions, unifying Spark and H2O Machine Learning pipelines. It shows how to create a simple Spark Machine Learning pipeline and a model based on the fitted pipeline, which can be later used for prediction whether a particular message is spam or not.

Before diving into the demo steps, we would like to provide some details about the new features in the upcoming Sparkling Water 2.0:

  • Support for Apache Spark 2.0 and backwards compatibility with all previous versions.
  • The ability to run Apache Spark and Scala through H2O’s Flow UI.
  • H2O feature improvements and visualizations for MLlib algorithms, including the ability to score feature importance.
  • Visual intelligence for Apache Spark.
  • The ability to build Ensembles using H2O plus MLlib algorithms.
  • The power to export MLlib models as POJOs (Plain Old Java Objects), which can be easily run on commodity hardware.
  • A toolchain for ML pipelines.
  • Debugging support for Spark pipelines.
  • Model and data governance through Steam.
  • Bringing H2O’s powerful data munging capabilities to Apache Spark.
  • In order to run the code below, start your Spark shell with attached Sparkling Water JAR or use sparkling-shell script that already does this for you.

    You can start the Spark shell with Sparkling Water as follows:

    $SPARK_HOME/bin/spark-submit \
    --class water.SparklingWaterDriver \
    --packages ai.h2o:sparkling-water-examples_2.10:1.6.5 \
    --executor-memory=6g \
    --driver-memory=6g /dev/null
    

    Preferable Spark is Spark 1.6 and Sparkling Water 1.6.x.

    Prepare the coding environment

    Here we just import all required libraries.

    import org.apache.spark.SparkFiles
    import org.apache.spark.ml.PipelineModel
    import org.apache.spark.ml.feature._
    import org.apache.spark.ml.h2o.H2OPipeline
    import org.apache.spark.ml.h2o.features.{ColRemover, DatasetSplitter}
    import org.apache.spark.ml.h2o.models.H2ODeepLearning
    import org.apache.spark.sql.types.{StringType, StructField, StructType}
    import org.apache.spark.sql.{DataFrame, Row, SQLContext}
    import water.support.SparkContextSupport
    import water.fvec.H2OFrame
    

    Add our dataset to Spark environment. The dataset consists of 2 columns where the first one is the label ( ham or spam ) and the second one is the message itself. We don’t have to explicitly ask for Spark context since it’s already available via sc variable.

    val smsDataFileName = "smsData.txt"
    val smsDataFilePath = "examples/smalldata/" + smsDataFileName
    SparkContextSupport.addFiles(sc, smsDataFilePath)
    

    Create SQL support.

    implicit val sqlContext = SQLContext.getOrCreate(sc)
    

    Start H2O services.

    import org.apache.spark.h2o._
    implicit val h2oContext = H2OContext.getOrCreate(sc)
    

    Create helper method which loads the dataset, performs some basic filtering and at last creates Spark’s DataFrame with 2 columns – label and text.

    def load(dataFile: String)(implicit sqlContext: SQLContext): DataFrame = {
    val smsSchema = StructType(Array(
    StructField("label", StringType, nullable = false),
    StructField("text", StringType, nullable = false)))
    val rowRDD = sc.textFile(SparkFiles.get(dataFile)).map(_.split("\t")).filter(r => !r(0).isEmpty).map(p => Row(p(0),p(1)))
    sqlContext.createDataFrame(rowRDD, smsSchema)
    }
    

    Define the pipeline stages

    In Spark, a pipeline is formed of two basic elements – transformers and estimators. Estimators usually encapsulate an algorithm for model generation and their output are transformers. During fitting the pipeline stage, all transformers and estimators are executed and estimators are converted to transformers. The model generated by the pipeline contains only transformers. More about Spark pipelines can be found on Spark’s pipeline overview

    In H2O we created a new type of pipeline stage, which is called OneTimeTransformer. This transformer works similarly to Spark’s estimator in a way that it is only executed during fitting the pipeline stage. It does not however produces a transformer during fitting pipeline stage and the model generated by the pipeline does not contain this OneTimeTransformer.
    An example for one-time transformer is splitting the input data into a validation and training dataset using H2O Frames. We don’t need this one-time transformer to be executed every time we do prediction on the model. We just need this code to be executed when we are fitting the pipeline to the data.

    This pipeline stage is using Spark’s RegexTokenizer to tokenize the messages. We just specify input column and output column for tokenized messages.

    val tokenizer = new RegexTokenizer().
        setInputCol("text").
        setOutputCol("words").
        setMinTokenLength(3).
        setGaps(false).
        setPattern("[a-zA-Z]+")
    

    Remove unnecessary words using Spark’s StopWordsRemover.

    val stopWordsRemover = new StopWordsRemover().
        setInputCol(tokenizer.getOutputCol).
        setOutputCol("filtered").
        setStopWords(Array("the", "a", "", "in", "on", "at", "as", "not", "for")).
        setCaseSensitive(false)
    

    Vectorize the words using Spark’s HashingTF.

    val hashingTF = new HashingTF().
        setNumFeatures(1 << 10).
        setInputCol(tokenizer.getOutputCol).
        setOutputCol("wordToIndex")
    

    Create inverse document frequencies based on hashed words. It creates a numerical representation of how much information a
    given word provides in the whole message.

    val idf = new IDF().
        setMinDocFreq(4).
        setInputCol(hashingTF.getOutputCol).
        setOutputCol("tf_idf")
    

    This pipeline stage is one-time transformer. If setKeep(true) is called in it, it preserves specified columns instead
    of deleting them.

    val colRemover = new ColRemover().
        setKeep(true).
        setColumns(Array[String]("label", "tf_idf"))
    

    Split the dataset and store the splits with the specified keys into H2O’s distributed storage called DKV. This is one-time transformer which is executed only during fitting stage. It determines the frame, which is passed on the output in the following order:

    1. If the train key is specified using setTrainKey method and the key is also specified in the list of keys, then frame with this key is passed on the output
    2. Otherwise, if the default key Р“train.hex” is specified in the list of keys, then frame with this key is passed on the output
    3. Otherwise the first frame specified in the list of keys is passed on the output
    val splitter = new DatasetSplitter().
      setKeys(Array[String]("train.hex", "valid.hex")).
      setRatios(Array[Double](0.8)).
      setTrainKey("train.hex")
    

    Create H2O’s deep learning model.
    If the key specifying the training set is set using setTrainKey, then frame with this key is used as the training frame, otherwise it uses the frame from the previous stage as the training frame

    val dl = new H2ODeepLearning().
      setEpochs(10).
      setL1(0.001).
      setL2(0.0).
      setHidden(Array[Int](200, 200)).
      setValidKey(splitter.getKeys(1)).
      setResponseColumn("label")
    

    Create and fit the pipeline

    Create the pipeline using the stages we defined earlier. As a normal Spark pipeline, it can be formed of Spark’s transformers and estimators, but it also may contain H2O’s one-time transformers.

    val pipeline = new H2OPipeline().
      setStages(Array(tokenizer, stopWordsRemover, hashingTF, idf, colRemover, splitter, dl))
    

    Train the pipeline model by fitting it to a Spark’s DataFrame

    val data = load("smsData.txt")
    val model = pipeline.fit(data)
    

    Now we can optionally save the model to disk and load it again.

    model.write.overwrite().save("/tmp/hamOrSpamPipeline")
    val loadedModel = PipelineModel.load("/tmp/hamOrSpamPipeline")
    

    We can also save this unfitted pipeline to disk and load it again.

    pipeline.write.overwrite().save("/tmp/unfit-hamOrSpamPipeline")
    val loadedPipeline = H2OPipeline.load("/tmp/unfit-hamOrSpamPipeline")
    

    Train the pipeline model again on loaded pipeline just to show deserialized model works as it should.

    val modelOfLoadedPipeline = loadedPipeline.fit(data)
    

    Create helper function for predictions on unlabeled data. This method is using model generated by the pipeline. To make a prediction we call transform method with Spark’s Dataframe as an argument on the generated model. This call executes each transformer specified in the pipeline one after one producing Spark’s DataFrame with predictions.

    def isSpam(smsText: String,
               model: PipelineModel,
               h2oContext: H2OContext,
               hamThreshold: Double = 0.5):Boolean = {
      import h2oContext.implicits._
      val smsTextDF = sc.parallelize(Seq(smsText)).toDF("text") // convert to dataframe with one column named "text"
      val prediction: H2OFrame = model.transform(smsTextDF)
      prediction.vecs()(1).at(0) < hamThreshold
    }
    

    Try it!

    println(isSpam("Michal, h2oworld party tonight in MV?", modelOfLoadedPipeline, h2oContext))
    println(isSpam("We tried to contact you re your reply to our offer of a Video Handset? 750 anytime any networks mins? UNLIMITED TEXT?", loadedModel, h2oContext))
    

    In this article we showed how Spark’s pipelines and H2O algorithms work together seamlessly in Spark environment. We strive to be consistent with Spark API in H2O.ai and make the life of a developer/data scientist easier by hiding H2O internals and exposing the APIs that are natural for Spark users.