Return to page

BLOG

H2O’s AutoML in Spark

 headshot

By Jakub Hava | minute read | July 23, 2018

Blog decorative banner image

This blog post demonstrates how H2O’s powerful automatic machine learning can be used together with the Spark in Sparkling Water.

We show the benefits of Spark & H2O integration, use Spark for data munging tasks and H2O for the modelling phase, where all these steps are wrapped inside a Spark Pipeline. The integration between Spark and H2O can be see on the figure below. All technical details behind this integration are explained in our documentation which you can access from here .

Sparkling Water Architecture

At the end of this blog post, we also show how the generated model can be taken into production using Spark Streaming application. We use Python and PySparkling for model training phase and Scala for the deployment example.

For the purpose of this blog, we use the Combined Cycle Power Plant dataset . The goal here is to predict the energy output (in megawatts), given the temperature, ambient pressure, relative humidity and exhaust vacuum values. We will alter the dataset a little bit for the blog post purposes and use only rows where the temperature is higher then 10 degrees celsius. This can be explained such as that we are interested just in plant performance in the warmer days.

Obtain Sparkling Water

First step is to download Sparkling Water. It can be downloaded from our official download page . Please make sure to use the latest Sparkling Water as the H2OAutoml  in Sparkling Water is a fairly new feature available in the latest versions. Once you downloaded the Sparkling Water, please follow the instructions on the PySparkling tab on how to start PySparkling interpreter.

Download Sparkling Water

Start Sparkling Water

In order to be able to use both Spark and H2O alongside, we need to make H2O available inside the Spark cluster. This can be achieved by
running the code below.

from pysparkling import * # Import PySparkling
hc = H2OContext.getOrCreate(spark) # Start the H2OContext

This code starts H2O node on each spark executor and a special H2O node called client node inside the Spark driver.

Load Data into Sparkling Water

We use Spark to load the data into memory. For that, we can use the line below:

powerplant_df = spark.read.option("inferSchema", "true").csv("powerplant_output.csv", header=True)

This code imports the file from the specified location into the Spark cluster and creates a Spark Dataframe from it. The original datafile can be downloaded from here . It is important to specify the inferSchema option to true because otherwise, Spark won’t try to automatically infer the data types and we will have all columns with type String. This way, the types are correctly inferred.

We will use a portion of this data for the training purposes and a portion for demonstrating the predictions. We can use randomSplit method available on the Spark Dataframe to split the data as:

splits = powerplant_df.randomSplit([0.8, 0.2], 1)
train = splits[0]
for_predictions = splits[1]

We split the dataset and give 80% to one split and 20% to another. The last argument specifies the seed to the method can behave deterministically. We use the 80% part for the training purposes and the second part for scoring later.

Define the Pipeline with H2O AutoML

Now, we can define the Spark pipeline containing the H2O AutoML. Before we do that, we need to do a few imports so all classes and methods we require are available:

from pysparkling.ml import H2OAutoML
from pyspark.ml import Pipeline
from pyspark.ml.feature import SQLTransformer

And finally, we can start building the pipeline stages. The first pipeline stage is the SQLTransformer used for selecting the rows where the temperature is higher than 10 degrees celsius. The SQLTransformer is powerful Spark transformer where we can specify any Spark SQL code which we want to execute on the dataframe passed to from the pipeline.

temperatureTransformer = SQLTransformer(statement="SELECT * FROM __THIS__ WHERE TemperatureCelcius > 10")

It is important to understand that no code is executed at this stage as we are just defining the stages. We will show how to execute the whole pipeline later in the blog post.

The next pipeline stage is not a transformer, but estimator and is used for creating the H2O model using the H2O AutoML algorithm. This estimator is provided by the Sparkling Water library, but we can see that the API is unified with the other Spark pipeline stages.

automlEstimator = H2OAutoML(maxRuntimeSecs=60, predictionCol="HourlyEnergyOutputMW", ratio=0.9)

We defined the H2OAutoML estimator. The maxRuntimeSecs argument specifies how long we want to run the automl algorithm. The predictionCol specifies the response column and the ratio argument specifies how big part of dataset is used for the training purposes.
We specified that we want to use 90% of data for training purposes and 10% of data for the validation.

As we have defined both stages we need, we can define the whole pipeline:

pipeline = Pipeline(stages=[temperatureTransformer, automlEstimator])

And finally, train it on on the training dataset we prepared above:

model = automlEstimator.fit(df)

This call goes through all the pipeline stages and in case of estimators, creates a model. So as part of this call, we run the H2O AutoML algorithm and find the best model given the search criteria we specified in the arguments. The model variable contains the whole Spark pipeline model, which also internally contains the model found by automl. The H2O model stored inside is stored in the H2O MOJO format. That means that it is independent from the H2O runtime and therefore, it can be run anywhere without initializing an H2O cluster. For more information about MOJO, please visit the MOJO documentation .

Prediction

We can run generate predictions on the returned model simply as:

predicted = model.transform(for_predictions)

This call again goes through all the pipeline stages and in case it hits a stage with a model, it performs a scoring operation.

We can also see a few first results as:

predicted.take(2)

Export Model for Deployment

In the following part of the blog post, we show how to put this model into production. For that, we need to export the model, which can be done simply as:

model.write().overwrite().save("pipeline.model")

This call will store the model into the pipeline.model file. It is also helpful trick to export schema of the data. This is especially useful in the case of streaming applications where it’s hard to determine the type of data based on a single row in the input.

We can export the schema as:

with open('schema.json','w') as f:
 f.write(str(powerplant_df.schema.json()))

Deploy the Model

Now, we would like to demonstrate how the Spark pipeline with model found by automl can be put into production in case of Spark Streaming application. For the deployment, we can start a new Spark application, it can be in Scala or Python and we can load the trained pipeline model. The pipeline model contains the H2O AutoML model packaged as a MOJO and therefore, it is independent on the H2O runtime. We will use Scala to demonstrate the language independence of the exported pipeline.

The deployment consist of several steps:

  • Load the schema from the schema file.
  • Create input data stream and pass it the schema. The input data stream will point to a directory where new csv files will be coming from different streaming sources. It can also be a on-line source of streaming data.
  • Load the pipeline from the pipeline file<./li>
  • Create output data stream. For our purposes, we store the data into memory and also to a SparkSQL table so we can see immediate results.

// Start Spark
val spark = SparkSession.builder().master("local").getOrCreate()


// Load exported pipeline
import org.apache.spark.sql.types.DataType
val pipelineModel = PipelineModel.read.load("pipeline.model")


// Load exported schema of input data
val schema = StructType(DataType.fromJson(scala.io.Source.fromFile("schema.json").mkString).asInstanceOf[StructType].map {
 case StructField(name, dtype, nullable, metadata) => StructField(name, dtype, true, metadata)
 case rec => rec
})
println(schema)


// Define input stream
val inputDataStream = spark.readStream.schema(schema).csv("/path/to/folder/where/input/data/are/being/generated")


// Apply loaded model
val outputDataStream = pipelineModel.transform(inputDataStream)


// Forward output stream into memory-sink
outputDataStream.writeStream.format("memory").queryName("predictions").start()


// Query results
while(true){
 spark.sql("select * from predictions").show()
 Thread.sleep(5000)
}

Conclusion

This code demonstrates that we can relatively easily put a pipeline model into production. We used Python for the model creation and JVM-based language for the deployment. The resulting pipeline model contains model found by H2O automl algorithm, exported as MOJO. This means that we don’t need to start H2O or Sparkling Water in place where we deploy the model (but we need to ensure that Sparkling Water dependencies are on the classpath).

 headshot

Jakub Hava

Jakub (or “Kuba” as we call him) completed his Bachelor’s Degree in Computer Science and Master’s Degree in Software Systems at Charles University in Prague. As a bachelor’s thesis, Kuba wrote a small platform for distributed computing of any types of tasks. During his master’s degree studies, he developed a cluster monitoring tool for JVM based languages which makes debugging and reasoning the performance of distributed systems easier using a concept called distributed stack traces. Kuba enjoys dealing with problems and learning new programming languages. At H2O.ai, Kuba works on Sparkling Water. Aside from programming, Kuba enjoys exploring new cultures and bouldering. He’s also a big fan of tea preparation and the associated ceremony.