Comment filtrer un RDD selon une fonction de base d'un autre RDD dans Spark?
Je suis un débutant de Apache Spark. Je veux filtrer tous les groupes dont la somme des poids est supérieure à une valeur constante dans un RDD. Le "poids" de la carte est aussi un RDD. Voici une petite taille de la démo, les groupes à être filtrée est stocké dans des "groupes", la valeur de la constante est de 12:
val groups = sc.parallelize(List("a,b,c,d", "b,c,e", "a,c,d", "e,g"))
val weights = sc.parallelize(Array(("a", 3), ("b", 2), ("c", 5), ("d", 1), ("e", 9), ("f", 4), ("g", 6)))
val wm = weights.toArray.toMap
def isheavy(inp: String): Boolean = {
val allw = inp.split(",").map(wm(_)).sum
allw > 12
}
val result = groups.filter(isheavy)
Lorsque les données d'entrée est très grande, > 10 GO par exemple, j'ai toujours la rencontre d'une "java tas de message d'erreur" mémoire. Je doute que si elle est causée par le "poids".toArray.toMap", parce qu'il convertir un distribué des RDD à un objet Java dans la JVM. J'ai donc essayé de filtre avec RDD directement:
val groups = sc.parallelize(List("a,b,c,d", "b,c,e", "a,c,d", "e,g"))
val weights = sc.parallelize(Array(("a", 3), ("b", 2), ("c", 5), ("d", 1), ("e", 9), ("f", 4), ("g", 6)))
def isheavy(inp: String): Boolean = {
val items = inp.split(",")
val wm = items.map(x => weights.filter(_._1 == x).first._2)
wm.sum > 12
}
val result = groups.filter(isheavy)
Quand j'ai couru result.collect
après le chargement de ce script dans spark shell, j'ai eu un "java.lang.NullPointerException" erreur. Quelqu'un m'a dit quand un RDD est manipulé dans un autre RDD, il y aura un nullpointer exception, et de me suggérer de mettre le poids dans le Redis.
Alors, comment puis-je obtenir le "résultat" sans convertir "poids" de la Carte, ou de le mettre dans le Redis? Si il y a une solution pour filtrer un RDD basée sur une autre carte comme CA sans l'aide extérieure de la banque de données de service?
Merci!
OriginalL'auteur Chad | 2014-09-25
Vous devez vous connecter pour publier un commentaire.
La java "out of memory" l'erreur est venue parce que l'étincelle utilise son
spark.default.parallelism
propriété, tandis que la détermination de nombre de divisions, qui par défaut est le nombre de cœurs disponibles.Lorsque l'entrée devient grand, et vous avez peu de mémoire, vous devez augmenter le nombre de divisions.
Vous pouvez faire quelque chose comme suit:
Vous pouvez également envisager d'augmenter exécuteur de la taille de la mémoire à l'aide de
spark.executor.memory
.spark.executor.memory
fonctionne.OriginalL'auteur Shyamendra Solanki
Supposons que votre groupe est unique. Sinon, le rendre unique distincts, etc.
Si le groupe ou le poids est faible, il devrait être facile. Si à la fois le groupe et les poids sont énormes, vous pouvez essayer ce qui peut être plus évolutif, mais aussi semble compliqué.
OriginalL'auteur zhang zhan