Entrenar con sparklyr, predecir con spark

Vivimos en la era del big data según dicen por esos lares, también tengo algún amigo que dice que el big data son los padres, no le falta razón. La mayoría de las cosas que hacemos se pueden hacer sin big data, porque en primer lugar ni en el 95 % de las veces se requiere y porque en el 5% restante podríamos hacer muestreo. Si no hay señal en 20.000 datos díficilmente va a haberla en 2 millones.

Dicho esto, esta entrada va sobre lo fácil que es entrenar un modelo con spark usando sparklyr. Sparklyr es una librería de la gente de rstudio que permite utilizar spark desde R, permitiendo por ejemplo utilizar dplyr como interfaz mientras que es spark quien realiza el trabajo duro. Sparklyr tiene además wrappers de las funciones de MLlib por lo que entrenar un modelo en spark si eres usuario de R está “tirao”.

Reproducimos el mismo ejemplo de juguete que aquí

Modelo de juguete

library(sparklyr)
library(tidyverse)
sc <- spark_connect(master = "local")
df <- starwars %>% select(mass, height, hair_color, birth_year)

Subimos los datos a spark

# Convertimos a sparkdataframe
starwars_tbl <- sc %>%  sdf_copy_to(df, names = "starwars_spark")
starwars_tbl
## # Source: spark<df> [?? x 4]
##     mass height hair_color    birth_year
##    <dbl>  <int> <chr>              <dbl>
##  1    77    172 blond               19  
##  2    75    167 <NA>               112  
##  3    32     96 <NA>                33  
##  4   136    202 none                41.9
##  5    49    150 brown               19  
##  6   120    178 brown, grey         52  
##  7    75    165 brown               47  
##  8    32     97 <NA>               NaN  
##  9    84    183 black               24  
## 10    77    182 auburn, white       57  
## # … with more rows

Hacemos algo con los valores perdidos. Podemos usar sintaxis de dplyr sobre un sparkdataframe, básicamente lo que hace es ejecutar spark.sql

# Se aconseja separar los mutate al hacerlos sobre un spark dataframe

starwars_tbl <- starwars_tbl %>%
  mutate(height = if_else(is.na(height), mean(height), height)) %>%
  mutate(mass = if_else(is.na(mass), mean(mass), mass)) %>% 
  mutate(birth_year = if_else(is.na(birth_year), mean(birth_year), birth_year))

Train, test y guardamos los datos de test en csv para luego importarlos desde spark con scala

partition <-starwars_tbl %>%  sdf_partition(training = 0.7, test = 0.3, seed = 155)
## Warning: Missing values are always removed in SQL.
## Use `avg(x, na.rm = TRUE)` to silence this warning

## Warning: Missing values are always removed in SQL.
## Use `avg(x, na.rm = TRUE)` to silence this warning

## Warning: Missing values are always removed in SQL.
## Use `avg(x, na.rm = TRUE)` to silence this warning
train <- partition$training
test <- partition$test

write_csv(collect(test), path = "test_spark.csv")

Modelo

Vamos a utilizar un modelo gbm de spark, huelga decir que los modelos gbm en spark están implementados de manera subóptima por ser amable, para este tipo de modelos es mucho mejor usar H2O.

m_gbm <- train %>% ml_gbt_regressor(height ~ mass + birth_year)

Y ya podemos utilizar el modelo para predecir

prediccion <- test %>% sdf_predict(m_gbm)
## Warning: 'sdf_predict' is deprecated.
## Use 'ml_predict' instead.
## See help("Deprecated")
prediccion
## # Source: spark<?> [?? x 5]
##     mass height hair_color    birth_year prediction
##    <dbl>  <dbl> <chr>              <dbl>      <dbl>
##  1    17     66 white              896         78.9
##  2    45    165 brown               46         83.7
##  3    48    178 none                87.6      190. 
##  4    49    150 brown               19        170. 
##  5    65    163 none                87.6      183. 
##  6    68    160 none                87.6      183. 
##  7    74    173 <NA>                44        169. 
##  8    75    167 <NA>               112        199. 
##  9    77    182 auburn, white       57        170. 
## 10    79    188 brown               87.6      182. 
## # … with more rows

Calculamos la importancia de las variables y el rmse

ml_feature_importances(m_gbm)
##      feature importance
## 1       mass  0.6613639
## 2 birth_year  0.3386361
prediccion %>% ml_regression_evaluator(label_col = "height", metric = "rmse")
## [1] 31.84751

Productivizar modelos de spark

Spark tiene funciones para guardar y salvar modelos (y también el Pipeline entero). Esto permite que un científico de datos pueda entrenar sus modelos en spark usando scala, pyspark, sparkR o sparklyr, guardarlo en una ruta del hdfs por ejemplo y que luego se pueda cargar desde scala, lo cual facilita enormemente el proceso de ponerlos en producción.

En primer lugar salvamos nuestro modelito de juguete.

# guarda el modelo en la carpeta m_gbm
ml_save(m_gbm, path = "m_gbm", overwrite = TRUE)
## Model successfully saved.

Nos vamos a un spark-shell

# lo lanzo en mi spark instalado en local pero también sirve en un cluster con yarn
~/spark/spark-2.4.0-bin-hadoop2.7/bin/spark-shell --conf spark.driver.memory="2g" --conf spark.executor.memory="2g" --conf spark.executor.instances=2 --conf spark.executor.cores=2
import org.apache.spark.ml.regression.RandomForestRegressor
import org.apache.spark.ml.regression.GBTRegressor
import org.apache.spark.ml.PipelineModel
import org.apache.spark.sql.types._

// Import data
val dataPath = "test_spark.csv"

// hay que crear un esquema para que lea bien los tipos, al leer
// de csv local

val mi_esquema = StructType(Array(
        StructField("mass", DoubleType, true),
        StructField("height", DoubleType, true),
        StructField("hair_color", StringType, true),
        StructField("birth_year", DoubleType, true)))
        
val dfStarWars = spark.read.option("header", "true").
  schema(mi_esquema).
  csv(dataPath)

Hacemos un load del modelo guardado que se entrenó con sparklyr y hacemos la predicción que guardamos en un spark dataframe que podemos luego guardar en una tabla hive o como queramos.

val modelo_gbt =  PipelineModel.load("m_gbm")

val prediccion = modelo_gbt.transform(dfStarWars)

y ya está.

scala> prediccion.select("label","prediction").show
+------------------+------------------+
|             label|        prediction|
+------------------+------------------+
|              66.0| 78.85959836683551|
|             165.0| 83.72420090209631|
|             178.0| 190.3813215401054|
|             150.0| 170.1962486173942|
|             163.0|182.58605991654028|
|             160.0|182.58605991654028|
|             173.0| 168.5011882907288|
|             167.0|199.06341481661963|
|             182.0| 169.5428405224873|
|             188.0| 182.3089130362614|
|             193.0|203.05341183966138|
|             224.0|187.77496086646852|
|             188.0|201.95281487952235|
|             229.0|193.11401242888724|
|             193.0|201.82085181936893|
|             122.0|172.14250890464118|
|             150.0| 176.7744868608689|
|             163.0| 189.3173296140381|
|             171.0|172.14250890464118|
|174.35802469135803|172.14250890464118|
+------------------+------------------+
only showing top 20 rows

Especial agradecimiento a José Carlos, uno de mis “ingenazis” preferidos.

 
comments powered by Disqus