Spark manque de mémoire lors du regroupement par clé
Je suis tenter d'effectuer une simple transformation de la commune d'analyse des données à l'aide de l'Étincelle de l'hôte sur un EC2 avec ce guidemon code ressemble à ceci:
package ccminer
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
object ccminer {
val english = "english|en|eng"
val spanish = "es|esp|spa|spanish|espanol"
val turkish = "turkish|tr|tur|turc"
val greek = "greek|el|ell"
val italian = "italian|it|ita|italien"
val all = (english :: spanish :: turkish :: greek :: italian :: Nil).mkString("|")
def langIndep(s: String) = s.toLowerCase().replaceAll(all, "*")
def main(args: Array[String]): Unit = {
if (args.length != 3) {
System.err.println("Bad command line")
System.exit(-1)
}
val cluster = "spark://???"
val sc = new SparkContext(cluster, "Common Crawl Miner",
System.getenv("SPARK_HOME"), Seq("/root/spark/ccminer/target/scala-2.10/cc-miner_2.10-1.0.jar"))
sc.sequenceFile[String, String](args(0)).map {
case (k, v) => (langIndep(k), v)
}
.groupByKey(args(2).toInt)
.filter {
case (_, vs) => vs.size > 1
}
.saveAsTextFile(args(1))
}
}
Et je suis en cours d'exécution avec la commande comme suit:
sbt/sbt "run-main ccminer.ccminer s3n://aws-publicdatasets/common-crawl/parse-output/segment/1341690165636/textData-* s3n://parallelcorpus/out/2000"
Mais très vite, il échoue avec des erreurs comme suit
java.lang.OutOfMemoryError: Java heap space
at com.ning.compress.BufferRecycler.allocEncodingBuffer(BufferRecycler.java:59)
at com.ning.compress.lzf.ChunkEncoder.<init>(ChunkEncoder.java:93)
at com.ning.compress.lzf.impl.UnsafeChunkEncoder.<init>(UnsafeChunkEncoder.java:40)
at com.ning.compress.lzf.impl.UnsafeChunkEncoderLE.<init>(UnsafeChunkEncoderLE.java:13)
at com.ning.compress.lzf.impl.UnsafeChunkEncoders.createEncoder(UnsafeChunkEncoders.java:31)
at com.ning.compress.lzf.util.ChunkEncoderFactory.optimalInstance(ChunkEncoderFactory.java:44)
at com.ning.compress.lzf.LZFOutputStream.<init>(LZFOutputStream.java:61)
at org.apache.spark.io.LZFCompressionCodec.compressedOutputStream(CompressionCodec.scala:60)
at org.apache.spark.storage.BlockManager.wrapForCompression(BlockManager.scala:803)
at org.apache.spark.storage.BlockManager$$anonfun$5.apply(BlockManager.scala:471)
at org.apache.spark.storage.BlockManager$$anonfun$5.apply(BlockManager.scala:471)
at org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:117)
at org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:174)
at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:164)
at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:161)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
at org.apache.spark.scheduler.Task.run(Task.scala:53)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
Donc ma question est, ce qui est nécessaire pour écrire une Étincelle tâche qui peut groupe par clé avec un nombre quasiment infini d'entrée sans être à court de mémoire?
source d'informationauteur John McCrae
Vous devez vous connecter pour publier un commentaire.
La cause la plus fréquente de java.lang.OutOfMemoryError exceptions en shuffle tâches (telles que groupByKey, reduceByKey, etc.) faible niveau de le parallélisme.
Vous pouvez augmenter la valeur par défaut en définissant étincelle.par défaut.le parallélisme propriété dans configuration.
Donc ça nous dit que vous n'avez plus de allouées espace de tas de la JVM. Vous pouvez augmenter la taille du segment de mémoire mais c'est toujours limitée par les capacités du système (Ne peut pas dépasser la quantité de RAM physique).
D'autre part, comme expliqué par homutov ce qui se passe dans les grandes opérations de collecte. Par exemple groupByKey, reduceByKey, cartisien + mapToPair . Ces opérations permettra de recueillir les RDD données dans un lieu qui JVM pour exécuter de espace de tas.
Que pouvez-vous faire?
Avec mon expérience, quand un système en cluster/ont des ressources limitées, vous pouvez utiliser Spark guide de réglage de. étincelle.defualt.le parallélisme peut être augmentée jusqu'à ce que vous pouvez accompagner de la tâche dans votre système en cluster/[une fois, j'ai couru un KNN mise en œuvre de 14000 exemple, 1024 jeu de données d'entité sur mon ordinateur portable de l'ordinateur virtuel en jouant sur le parallélisme ].
Rappelez-vous, vous devez TUNE ces caractéristiques à la plus efficace et ne parviennent pas évitement (à cours de mémoire) pour obtenir des meilleurs résultats de l'Étincelle.
En outre
N'oubliez pas d'utiliser d'utiliser primitive types de données au lieu de wrappers . Et l'utilisation Tableaux au lieu de collections.
Étincelle dans les tableaux peuvent économiser beaucoup d'espace précieux et d'améliorer les performances.
Également utiliser Diffusion des variables au lieu de produit Cartésien ou de toute grande combinaison de la tâche.