Remplacement des valeurs nulles par 0 après la fermeture de la jointure externe

J'ai deux dataframes appelé gauche et droit.

scala> left.printSchema
root
|-- user_uid: double (nullable = true)
|-- labelVal: double (nullable = true)
|-- probability_score: double (nullable = true)

scala> right.printSchema
root
|-- user_uid: double (nullable = false)
|-- real_labelVal: double (nullable = false)

Alors, je me joins à eux pour obtenir le rejoint Dataframe. C'est un jointure externe gauche. Toute personne intéressée à la natjoin fonction pouvez le trouver ici.

https://gist.github.com/anonymous/f02bd79528ac75f57ae8

scala> val joinedData = natjoin(predictionDataFrame, labeledObservedDataFrame, "left_outer")

scala> joinedData.printSchema
|-- user_uid: double (nullable = true)
|-- labelVal: double (nullable = true)
|-- probability_score: double (nullable = true)
|-- real_labelVal: double (nullable = false)

Puisque c'est une jointure externe gauche, le real_labelVal colonne est null lorsque user_uid n'est pas présent dans la droite.

scala> val realLabelVal = joinedData.select("real_labelval").distinct.collect
realLabelVal: Array[org.apache.spark.sql.Row] = Array([0.0], [null])

Je veux remplacer les valeurs null dans les realLabelVal colonne avec la 1.0.

Actuellement je ne les suivants:

  1. J'ai trouver l'index de real_labelval colonne et l'utilisation de l'étincelle.sql.Ligne API pour définir les valeurs null à 1.0.
    (Ce qui me donne un RDD[Ligne])
  2. Puis-je appliquer le schéma de la rejoint dataframe à obtenir les nettoyer dataframe.

Le code est comme suit:

 val real_labelval_index = 3
 def replaceNull(row: Row) = {
    val rowArray = row.toSeq.toArray
     rowArray(real_labelval_index) = 1.0
     Row.fromSeq(rowArray)
 }

 val cleanRowRDD = joinedData.map(row => if (row.isNullAt(real_labelval_index)) replaceNull(row) else row)
 val cleanJoined = sqlContext.createDataFrame(cleanRowRdd, joinedData.schema)

Est-il un élégant ou une manière efficace de faire cela?

Goolging n'a pas aidé beaucoup.
Merci à l'avance.

source d'informationauteur Mihir Shinde