"Conteneur tué par YARN pour avoir dépassé les limites de mémoire. 10,4 Go de mémoire physique 10,4 Go utilisés "sur un cluster EMR avec 75 Go de mémoire

Je suis en cours d'exécution d'un nœud 5 Étincelle de cluster sur AWS DME chacun de taille m3.xlarge (1 master 4 esclaves). J'ai réussi a couru à travers une 146Mb bzip2 compressé fichier CSV, et qui finit avec une parfaite agrégées résultat.

Maintenant je suis en train de traiter environ 5 go de bzip2 fichier CSV sur ce groupe mais je reçois cette erreur:

16/11/23 17:29:53 AVERTIR TaskSetManager: Perdu tâche à 49,2 en scène 6.0 (TID xxx, xxx.xxx.xxx.de calcul.interne): ExecutorLostFailure (exécuteur 16 quitté provoquée par l'une des tâches en cours d'exécution) Raison: Conteneur tué par le FILS pour le dépassement des limites de la mémoire. 10.4 GO de 10,4 GO de mémoire physique utilisée. Envisager de multiplier étincelle.de fil.exécuteur testamentaire.memoryOverhead.

Je suis confus quant à pourquoi je suis un ~10.5 GB limite de mémoire sur un ~75 GO de cluster (15GO par 3m.xlarge exemple)...

Voici mon DME config:

[
 {
  "classification":"spark-env",
  "properties":{

  },
  "configurations":[
     {
        "classification":"export",
        "properties":{
           "PYSPARK_PYTHON":"python34"
        },
        "configurations":[

        ]
     }
  ]
},
{
  "classification":"spark",
  "properties":{
     "maximizeResourceAllocation":"true"
  },
  "configurations":[

  ]
 }
]

De ce que j'ai lu, la définition de la maximizeResourceAllocation propriété doit dire EMR pour configurer Étincelle d'utiliser pleinement toutes les ressources disponibles sur le cluster. C'est à dire, je devrais avoir ~75 GO de mémoire disponible... Alors pourquoi je reçois un ~10.5 GB de mémoire erreur de limite?
Voici le code, je suis en cours d'exécution:

def sessionize(raw_data, timeout):
# https://www.dataiku.com/learn/guide/code/reshaping_data/sessionization.html
    window = (pyspark.sql.Window.partitionBy("user_id", "site_id")
              .orderBy("timestamp"))
    diff = (pyspark.sql.functions.lag(raw_data.timestamp, 1)
            .over(window))
    time_diff = (raw_data.withColumn("time_diff", raw_data.timestamp - diff)
                 .withColumn("new_session", pyspark.sql.functions.when(pyspark.sql.functions.col("time_diff") >= timeout.seconds, 1).otherwise(0)))
    window = (pyspark.sql.Window.partitionBy("user_id", "site_id")
              .orderBy("timestamp")
              .rowsBetween(-1, 0))
    sessions = (time_diff.withColumn("session_id", pyspark.sql.functions.concat_ws("_", "user_id", "site_id", pyspark.sql.functions.sum("new_session").over(window))))
    return sessions
def aggregate_sessions(sessions):
    median = pyspark.sql.functions.udf(lambda x: statistics.median(x))
    aggregated = sessions.groupBy(pyspark.sql.functions.col("session_id")).agg(
        pyspark.sql.functions.first("site_id").alias("site_id"),
        pyspark.sql.functions.first("user_id").alias("user_id"),
        pyspark.sql.functions.count("id").alias("hits"),
        pyspark.sql.functions.min("timestamp").alias("start"),
        pyspark.sql.functions.max("timestamp").alias("finish"),
        median(pyspark.sql.functions.collect_list("foo")).alias("foo"),
    )
    return aggregated
 spark_context = pyspark.SparkContext(appName="process-raw-data")
spark_session = pyspark.sql.SparkSession(spark_context)
raw_data = spark_session.read.csv(sys.argv[1],
                                  header=True,
                                  inferSchema=True)
# Windowing doesn't seem to play nicely with TimestampTypes.
#
# Should be able to do this within the ``spark.read.csv`` call, I'd
# think. Need to look into it.
convert_to_unix = pyspark.sql.functions.udf(lambda s: arrow.get(s).timestamp)
raw_data = raw_data.withColumn("timestamp",
                               convert_to_unix(pyspark.sql.functions.col("timestamp")))
sessions = sessionize(raw_data, SESSION_TIMEOUT)
aggregated = aggregate_sessions(sessions)
aggregated.foreach(save_session)

Fondamentalement, rien de plus que de fenêtrage et un groupBy pour agréger les données.

Il commence avec quelques-unes de ces erreurs, et en direction de l'arrêt de la hausse du montant de la même erreur.

J'ai essayé de courir étincelle soumettre avec --conf étincelle.de fil.exécuteur testamentaire.memoryOverhead mais qui ne semble pas résoudre le problème.

source d'informationauteur lauri108