Vivimos en la era del big data según dicen por esos lares, también tengo algún amigo que dice que el big data son los padres, no le falta razón. La mayoría de las cosas que hacemos se pueden hacer sin big data, porque en primer lugar ni en el 95 % de las veces se requiere y porque en el 5% restante podríamos hacer muestreo. Si no hay señal en 20.000 datos díficilmente va a haberla en 2 millones.
Dicho esto, esta entrada va sobre lo fácil que es entrenar un modelo con spark usando sparklyr. Sparklyr es una librería de la gente de rstudio que permite utilizar spark desde R, permitiendo por ejemplo utilizar dplyr como interfaz mientras que es spark quien realiza el trabajo duro. Sparklyr tiene además wrappers de las funciones de MLlib por lo que entrenar un modelo en spark si eres usuario de R está “tirao”.
Reproducimos el mismo ejemplo de juguete que aquí
Modelo de juguete
library(sparklyr)
library(tidyverse)
sc <- spark_connect(master = "local")
df <- starwars %>% select(mass, height, hair_color, birth_year)
Subimos los datos a spark
# Convertimos a sparkdataframe
starwars_tbl <- sc %>% sdf_copy_to(df, names = "starwars_spark")
starwars_tbl
## # Source: spark<df> [?? x 4]
## mass height hair_color birth_year
## <dbl> <int> <chr> <dbl>
## 1 77 172 blond 19
## 2 75 167 <NA> 112
## 3 32 96 <NA> 33
## 4 136 202 none 41.9
## 5 49 150 brown 19
## 6 120 178 brown, grey 52
## 7 75 165 brown 47
## 8 32 97 <NA> NaN
## 9 84 183 black 24
## 10 77 182 auburn, white 57
## # … with more rows
Hacemos algo con los valores perdidos. Podemos usar sintaxis de dplyr sobre un sparkdataframe, básicamente lo que hace es ejecutar spark.sql
# Se aconseja separar los mutate al hacerlos sobre un spark dataframe
starwars_tbl <- starwars_tbl %>%
mutate(height = if_else(is.na(height), mean(height), height)) %>%
mutate(mass = if_else(is.na(mass), mean(mass), mass)) %>%
mutate(birth_year = if_else(is.na(birth_year), mean(birth_year), birth_year))
Train, test y guardamos los datos de test en csv para luego importarlos desde spark con scala
partition <-starwars_tbl %>% sdf_partition(training = 0.7, test = 0.3, seed = 155)
## Warning: Missing values are always removed in SQL.
## Use `avg(x, na.rm = TRUE)` to silence this warning
## Warning: Missing values are always removed in SQL.
## Use `avg(x, na.rm = TRUE)` to silence this warning
## Warning: Missing values are always removed in SQL.
## Use `avg(x, na.rm = TRUE)` to silence this warning
train <- partition$training
test <- partition$test
write_csv(collect(test), path = "test_spark.csv")
Modelo
Vamos a utilizar un modelo gbm de spark, huelga decir que los modelos gbm en spark están implementados de manera subóptima por ser amable, para este tipo de modelos es mucho mejor usar H2O.
m_gbm <- train %>% ml_gbt_regressor(height ~ mass + birth_year)
Y ya podemos utilizar el modelo para predecir
prediccion <- test %>% sdf_predict(m_gbm)
## Warning: 'sdf_predict' is deprecated.
## Use 'ml_predict' instead.
## See help("Deprecated")
prediccion
## # Source: spark<?> [?? x 5]
## mass height hair_color birth_year prediction
## <dbl> <dbl> <chr> <dbl> <dbl>
## 1 17 66 white 896 78.9
## 2 45 165 brown 46 83.7
## 3 48 178 none 87.6 190.
## 4 49 150 brown 19 170.
## 5 65 163 none 87.6 183.
## 6 68 160 none 87.6 183.
## 7 74 173 <NA> 44 169.
## 8 75 167 <NA> 112 199.
## 9 77 182 auburn, white 57 170.
## 10 79 188 brown 87.6 182.
## # … with more rows
Calculamos la importancia de las variables y el rmse
ml_feature_importances(m_gbm)
## feature importance
## 1 mass 0.6613639
## 2 birth_year 0.3386361
prediccion %>% ml_regression_evaluator(label_col = "height", metric = "rmse")
## [1] 31.84751
Productivizar modelos de spark
Spark tiene funciones para guardar y salvar modelos (y también el Pipeline entero). Esto permite que un científico de datos pueda entrenar sus modelos en spark usando scala, pyspark, sparkR o sparklyr, guardarlo en una ruta del hdfs por ejemplo y que luego se pueda cargar desde scala, lo cual facilita enormemente el proceso de ponerlos en producción.
En primer lugar salvamos nuestro modelito de juguete.
# guarda el modelo en la carpeta m_gbm
ml_save(m_gbm, path = "m_gbm", overwrite = TRUE)
## Model successfully saved.
Nos vamos a un spark-shell
# lo lanzo en mi spark instalado en local pero también sirve en un cluster con yarn
~/spark/spark-2.4.0-bin-hadoop2.7/bin/spark-shell --conf spark.driver.memory="2g" --conf spark.executor.memory="2g" --conf spark.executor.instances=2 --conf spark.executor.cores=2
import org.apache.spark.ml.regression.RandomForestRegressor
import org.apache.spark.ml.regression.GBTRegressor
import org.apache.spark.ml.PipelineModel
import org.apache.spark.sql.types._
// Import data
val dataPath = "test_spark.csv"
// hay que crear un esquema para que lea bien los tipos, al leer
// de csv local
val mi_esquema = StructType(Array(
StructField("mass", DoubleType, true),
StructField("height", DoubleType, true),
StructField("hair_color", StringType, true),
StructField("birth_year", DoubleType, true)))
val dfStarWars = spark.read.option("header", "true").
schema(mi_esquema).
csv(dataPath)
Hacemos un load del modelo guardado que se entrenó con sparklyr y hacemos la predicción que guardamos en un spark dataframe que podemos luego guardar en una tabla hive o como queramos.
val modelo_gbt = PipelineModel.load("m_gbm")
val prediccion = modelo_gbt.transform(dfStarWars)
y ya está.
scala> prediccion.select("label","prediction").show
+------------------+------------------+
| label| prediction|
+------------------+------------------+
| 66.0| 78.85959836683551|
| 165.0| 83.72420090209631|
| 178.0| 190.3813215401054|
| 150.0| 170.1962486173942|
| 163.0|182.58605991654028|
| 160.0|182.58605991654028|
| 173.0| 168.5011882907288|
| 167.0|199.06341481661963|
| 182.0| 169.5428405224873|
| 188.0| 182.3089130362614|
| 193.0|203.05341183966138|
| 224.0|187.77496086646852|
| 188.0|201.95281487952235|
| 229.0|193.11401242888724|
| 193.0|201.82085181936893|
| 122.0|172.14250890464118|
| 150.0| 176.7744868608689|
| 163.0| 189.3173296140381|
| 171.0|172.14250890464118|
|174.35802469135803|172.14250890464118|
+------------------+------------------+
only showing top 20 rows
Especial agradecimiento a José Carlos, uno de mis “ingenazis” preferidos.