Désactivation de toutes les données dans une étincelle (py)

Je suis une étincelle application avec plusieurs points où je tiens à conserver l'état actuel. C'est généralement après une grande étape, ou la mise en cache d'un état que je voudrais l'utiliser plusieurs fois. Il semble que lorsque j'appelle le cache sur mon dataframe un deuxième temps, une nouvelle copie en cache de la mémoire. Dans mon application, ce qui conduit à des problèmes de mémoire lors de la mise à l'échelle. Même si, un dataframe est un maximum d'environ 100 MO au courant de mes tests, la taille cumulée des résultats intermédiaires pousse au-delà de la mémoire allouée à l'exécuteur testamentaire. Vous trouverez ci dessous un petit exemple qui montre ce comportement.

cache_test.py:

from pyspark import SparkContext, HiveContext

spark_context = SparkContext(appName='cache_test')
hive_context = HiveContext(spark_context)

df = (hive_context.read
      .format('com.databricks.spark.csv')
      .load('simple_data.csv')
     )
df.cache()
df.show()

df = df.withColumn('C1+C2', df['C1'] + df['C2'])
df.cache()
df.show()

spark_context.stop()

simple_data.csv:

1,2,3
4,5,6
7,8,9

À la recherche à l'INTERFACE de l'application, il existe une copie de l'original dataframe, en plus de celui avec la nouvelle colonne. Je peux supprimer la copie originale en appelant df.unpersist() avant la withColumn ligne. Est-ce la manière recommandée pour enlever le cache résultat intermédiaire (c'est à dire appel unpersist avant chaque cache()).

Aussi, est-il possible de purger tous les objets mis en cache. Dans mon application, il y a des naturels des points d'arrêt où je peux simplement vider toute la mémoire, et de passer au fichier suivant. Je voudrais le faire sans la création d'une nouvelle étincelle de l'application pour chaque fichier d'entrée.

Vous en remercie d'avance!

source d'informationauteur bjack3