En un post anterior ya vimos como entrenar un modelo de h2o y cómo sería la lógica para hacer predicciones en un entorno que tenga Spark pero no h2o.
Un lector del blog comentó que porque no usar directamente H20 con sparkling water y leer directamente los datos a partir de un sparkdataframe y predecir también usando sparkling water. Aquí varios motivos.
Por mi escasa experiencia con sparkling water existe un cuello de botella al pasar de sparkdataframe a h2oframe.
Puede que estemos en un entorno dónde no te dejen usar H20 en el cluster. Esta circunstancia aumenta su probabilidad si el cluster es gestionado por terceros, ejem, ejem.
Lo que si que es posible que tengamos es la posibilidad de instalar h2o en nuestro nodo frontera, ya sea porque lo instalamos usando R o Python o directamente el jar. O a las malas tenemos h2o en local y entrenamos los datos con una muestra.
El caso es que nos mola h2o porque sabemos que los modelos están bastante bien implementados (mejor que en Spark), que se integra bien con R y Python y que como vimos en el post que enlazo al principio, es muy sencillo predecir usando spark sin necesidad de tener h2o.
Pero vamos a ir un poco más allá, adentrándonos un pelín más en el mundo de los ingenieros. Nuestro objetivo es crear una aplicación de Spark que tenga embedida la librería de h2o para hacer predicciones de modelos binarios.
La idea es tener un .jar
que podamos lanzar con spark-submit
y que le pasemos como argumentosla ruta dónde está el mojo del modelo entrenado en un fichero zip, el nombre de la tabla hive sobre la que vamos a hacer predicciones y el nombre de la tabla dónde queremos guardar las predicciones. Esta aplicación valdrá para todos los modelos de clasificación binaria de h2o, de forma que si quisiéramos subir un nuevo modelo a producción lo único que cambiaría sería el fichero zip (el modelo entrenado en formato MOJO model) y el nombre de las tabla sobre la que se aplica.
Al final, tendremos algo como esto (obviando parámetros de configuración de spark, como número de ejecutores y tal)
spark-submit --class com.h2ospark.BinaryModels binary_h2o-1.0.1-SNAPSHOT.jar mi_modelo.zip nombre_esquema.tablon_to_predict nombre_esquema.tablon_predicho
¿Fácil verdad?
Creando el proyecto
Para crear la aplicación de spark lo que se necesita básicamente es empaquetar la lógica y las dependencias necesarias utilizando cosas como sbt o gradle, yo voy a usar gradle. Una cosa que ayuda bastante es utilizar un IDE adecuado para este tipo de tareas, uno muy conocido es Intellij Idea.
Instalar gradle
En el Intellij Idea podemos decir que use gradle como gestor de dependencias y para empaquetar y el ya se encarga de bajar una versión de gradle que usará en el proyecto, aunque también es fácil hacer sudo apt install gradle
en ubuntu por ejemplo.
Estructuras de carpetas y ficheros
En mi carpeta de proyecto que he llamado predict_binary_h2o
tengo los siguientes archivos
- build.gradle
- settings.gradle
- gradle.properties
Y mi fichero principal con la clase se llama BinaryModels.scala
y está en la ruta src/main/scala/com/h2ospark/
Ficheros
En settings.gradle
sólo tengo el nombre del proyecto.
rootProject.name = 'binary_h2o'
En gradle.properties
tengo guardadas el número de versión de las librerías que luego especificaremos en build.gradle
# Artifact origins
# Dependency versions
scala_major = 2.11
scala_minor = 8
spark_version = 2.3.0
hadoop_version = 2.6.0
rules_version = 1.4.0
joda_version = 2.9.9
holden_version = 2.3.0_0.10.0
scalatest_version = 3.0.4
typesafe_version = 1.3.1
scopt_version = 3.5.0
h2oProjectVersion = 3.22.1.1
En el fichero build.gradle
es dónde indicamos qué librerías vamos a usar en nuestro proyecto y cuales queremos que se compilen de forma que cuando hagamos un gradle build
nos las incluya en el jar
final
/*
* This file was generated by the Gradle 'init' task.
*
* This is a general purpose Gradle build.
* Learn how to create Gradle builds at https://guides.gradle.org/creating-new-gradle-builds/
*/
group 'com.h2ospark'
version '1.0.1-SNAPSHOT'
buildscript {
repositories {
mavenCentral()
}
}
//apply plugin: 'java'
apply plugin: 'scala'
apply plugin: 'maven'
repositories {
mavenCentral()
maven {
url "http://h2o-release.s3.amazonaws.com/h2o/rel-xu/4/maven/repo/"
}
}
configurations {
master
all*.exclude group: 'javax.servlet', module: 'servlet-api'
}
dependencies {
compileOnly "org.scala-lang:scala-library:$scala_major.$scala_minor"
compileOnly "org.scala-lang:scala-reflect:$scala_major.$scala_minor"
compileOnly "org.apache.spark:spark-sql_$scala_major:$spark_version"
compileOnly "org.apache.spark:spark-core_$scala_major:$spark_version"
compileOnly "org.apache.spark:spark-hive_$scala_major:$spark_version"
compile "joda-time:joda-time:$joda_version"
compile "org.joda:joda-convert:1.8"
compile "com.typesafe:config:$typesafe_version"
compile "com.github.scopt:scopt_$scala_major:$scopt_version"
compile "com.databricks:spark-csv_$scala_major:1.5.0"
compile "org.scala-lang:scala-compiler:$scala_major.$scala_minor"
compile "ai.h2o:h2o-genmodel:$h2oProjectVersion"
compileOnly 'com.esotericsoftware.kryo:kryo:2.21'
testCompile "com.holdenkarau:spark-testing-base_$scala_major:$holden_version"
testCompile "org.scalatest:scalatest_$scala_major:$scalatest_version"
testCompile "org.apache.spark:spark-hive_$scala_major:$spark_version"
testCompile "com.holdenkarau:spark-testing-base_$scala_major:$holden_version"
testCompile "org.apache.spark:spark-sql_$scala_major:$spark_version"
testCompile "org.apache.spark:spark-core_$scala_major:$spark_version"
testCompile "org.apache.hadoop:hadoop-client:$hadoop_version"
}
jar {
from {
configurations.compile.collect { it.isDirectory() ? it : zipTree(it) }
}
}
//run scala tests. These are not automatically picked up by gradle,
task testScala(dependsOn: ['testClasses'], type: JavaExec) {
main = 'org.scalatest.tools.Runner'
args = ['-R', 'build/classes/test', '-o']
classpath = sourceSets.test.runtimeClasspath
}
task testScalaUnit(dependsOn: ['testClasses'], type: JavaExec) {
main = 'org.scalatest.tools.Runner'
args = ['-R', 'build/classes/test', '-o', '-n', 'UTest']
classpath = sourceSets.test.runtimeClasspath
}
task testScalaIntegration(dependsOn: ['testClasses'], type: JavaExec) {
main = 'org.scalatest.tools.Runner'
args = ['-R', 'build/classes/test', '-o', '-n', 'ITest']
classpath = sourceSets.test.runtimeClasspath
}
Como partes importantes del contenido de este fichero, la de añadir el repo de h2o en maven y la de que compile la librería de h2o para poder predecir compile "ai.h2o:h2o-genmodel:$h2oProjectVersion"
, y la parte de construir el jar dónde incluya esas librerías
jar {
from {
configurations.compile.collect { it.isDirectory() ? it : zipTree(it) }
}
}
Lógica en spark
En el fichero src/main/scala/com/h2ospark/BinaryModels.scala
es dónde está implementada la clase principal que incorpora la lógica de predicción de con spark, básicamente la misma que vimos en el post de jugando con h2o
Una cosa importante, el nombre de la clase debe coincidir con el nombre del fichero.
Contenido fichero
package com.h2ospark
import org.apache.spark.sql.SparkSession
import hex.genmodel.easy.{EasyPredictModelWrapper, RowData}
import hex.genmodel.MojoModel
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SaveMode
object BinaryModels {
def main(args: Array[String])
{
// args(0) : path ruta modelo h2o de regresion
// args(1) : path completo tablon a predecir
// args(2) : path completo tablon_predict
val modelPath = args(0)
val tableIn = args(1)
val tableOut = args(2)
val spark = SparkSession.builder
// .master("local[*]")
.appName("binomial_predict")
.getOrCreate()
import spark.implicits._
val tabla_origin = spark.table(tableIn)
// cast all columns to string for h2o compatibility
val tabla = tabla_origin.select(tabla_origin.columns.map(c => col(c).cast(StringType)) : _*)
// Import MOJO model
val mojo = MojoModel.load(modelPath)
val easyModel = new EasyPredictModelWrapper(
new EasyPredictModelWrapper.Config().
setModel(mojo).
setConvertUnknownCategoricalLevelsToNa(true).
setConvertInvalidNumbersToNa(true))
val header = tabla.columns
// Predict and save as dataframe
val dfScore = tabla.map {
x =>
val r = new RowData
header.indices.foreach(idx => r.put(header(idx), x.getAs[String](idx) ))
val score = easyModel.predictBinomial(r).classProbabilities
(x.getAs[String](0), score(1))
}.toDF("columna1","predict")
// Print Schema and show datos
dfScore.printSchema
dfScore.show()
// Save in a specific table
dfScore.write.mode(SaveMode.Overwrite).saveAsTable(tableOut)
}
}
Cómo veis, la lógica es bastante sencilla, se le pasan mediante argumentos el path del modelo, el nombre de la tabla hive dónde está el tablón a predecir y el nombre de la tabla hive dónde queremos que salve las predicciones. (Primera columna del tablón y la probabilidad estimada.)
Pues si todo va bien, construimos el jar definitivo con la orden ./gradlew build
y se habrá creado nuestro jar, en mi caso en /home/jose/IdeaProjects/predict_binary_h2o_local/build/libs/binary_h2o-1.0.1-SNAPSHOT.jar
Y listo, ya podemos entrenar un modelo de clasificación binaria con h2o, ya sea un glm o un gbm por ejemplo, guardar el modelo generado tal y como contaba en el post anterior y si tenemos una tabla a predecir en hive con las mismos nombres de columnas y tipo del tablón con el que hemos entrenado podemos predecir en un cluster de Spark que no tenga h2o utilizando este jar. La gracia de esto, es que en principio, ya no tendríamos que hacer de nuevo el proceso de crear el jar en gradle, tan sólo necesitaríamos el modelo entrenado en h2o en formato zip.
Un ejemplo de ejecución en un nodo frontera con acceso a spark y dónde tenemos todos los ficheros (el jar y el zip ) en el directorio dónde lo lanzamos sería este.
spark-submit --class com.h2ospark.BinaryModels --master yarn --num-executors 20 --executor-cores 5 --driver-memory 2G --executor-memory 10G binary_h2o-1.0.0-SNAPSHOT.jar mi_modelo_h2o_entrenado.zip default.tablon_to_predict resultados.predicciones_modelo
Por si os sirve os dejo el jar aquí para que hagáis pruebas, y en este repo están los archivos mínimos para generar el jar con gradle.
Evidentemente esto es sólo un ejemplo de cómo hacer una aplicación mínima de Spark, le faltan muchas cosas como escribir los test unitarios y alguna cosa más, pero el jar está probado ya en un cluster con spark y con diferentes modelos sobre distintos datos y funciona.