Comment filtrer un RDD selon une fonction de base d'un autre RDD dans Spark?

Je suis un débutant de Apache Spark. Je veux filtrer tous les groupes dont la somme des poids est supérieure à une valeur constante dans un RDD. Le "poids" de la carte est aussi un RDD. Voici une petite taille de la démo, les groupes à être filtrée est stocké dans des "groupes", la valeur de la constante est de 12:

val groups = sc.parallelize(List("a,b,c,d", "b,c,e", "a,c,d", "e,g"))
val weights = sc.parallelize(Array(("a", 3), ("b", 2), ("c", 5), ("d", 1), ("e", 9), ("f", 4), ("g", 6)))
val wm = weights.toArray.toMap
def isheavy(inp: String): Boolean = {
  val allw = inp.split(",").map(wm(_)).sum
  allw > 12
}
val result = groups.filter(isheavy)

Lorsque les données d'entrée est très grande, > 10 GO par exemple, j'ai toujours la rencontre d'une "java tas de message d'erreur" mémoire. Je doute que si elle est causée par le "poids".toArray.toMap", parce qu'il convertir un distribué des RDD à un objet Java dans la JVM. J'ai donc essayé de filtre avec RDD directement:

val groups = sc.parallelize(List("a,b,c,d", "b,c,e", "a,c,d", "e,g"))
val weights = sc.parallelize(Array(("a", 3), ("b", 2), ("c", 5), ("d", 1), ("e", 9), ("f", 4), ("g", 6)))
def isheavy(inp: String): Boolean = {
  val items = inp.split(",")
  val wm = items.map(x => weights.filter(_._1 == x).first._2)
  wm.sum > 12
}
val result = groups.filter(isheavy)

Quand j'ai couru result.collect après le chargement de ce script dans spark shell, j'ai eu un "java.lang.NullPointerException" erreur. Quelqu'un m'a dit quand un RDD est manipulé dans un autre RDD, il y aura un nullpointer exception, et de me suggérer de mettre le poids dans le Redis.

Alors, comment puis-je obtenir le "résultat" sans convertir "poids" de la Carte, ou de le mettre dans le Redis? Si il y a une solution pour filtrer un RDD basée sur une autre carte comme CA sans l'aide extérieure de la banque de données de service?
Merci!

OriginalL'auteur Chad | 2014-09-25