Paralléliser / éviter la boucle foreach spark
J'ai écrit une classe qui obtient un DataFrame, effectue quelques calculs sur elle et peut exporter les résultats. Le Dataframes sont générés par une Liste de Clés. Je sais que je fais cela dans un très unefficient chemin à droite maintenant:
var l = List(34, 32, 132, 352) //Scala List
l.foreach{i =>
val data:DataFrame = DataContainer.getDataFrame(i) //get DataFrame
val x = new MyClass(data) //initialize MyClass with new Object
x.setSettings(...)
x.calcSomething()
x.saveResults() //writes the Results into another Dataframe that is saved to HDFS
}
Je pense que le foreach sur la Scala liste n'est pas parallèle, alors comment puis-je éviter d'utiliser foreach ici? Le calcul de la DataFrames pourrait se produire en parallèle, que les résultats des calculs ne sont PAS d'entrée pour la prochaine DataFrame - comment puis-je mettre en œuvre cette?
Merci beaucoup!!
__edit:
ce que j'ai essayé de faire:
val l = List(34, 32, 132, 352) //Scala List
var l_DF:List[DataFrame] = List()
l.foreach{ i =>
DataContainer.getDataFrame(i)::l //append DataFrame to List of Dataframes
}
val rdd:DataFrame = sc.parallelize(l)
rdd.foreach(data =>
val x = new MyClass(data)
)
mais donne
Invalid tree; null:
null
edit 2:
Bon, je n'obtiens pas comment evrything fonctionne sous le capot....
1) Tout fonctionne bien lorsque j'exécute cette spark-shell
spark-shell –driver-memory 10g
//...
var l = List(34, 32, 132, 352) //Scala List
l.foreach{i =>
val data:DataFrame = AllData.where($"a" === i) //get DataFrame
val x = new MyClass(data) //initialize MyClass with new Object
x.calcSomething()
}
2) Erreur, quand je lance le même avec
spark-shell --master yarn-client --num-executors 10 –driver-memory 10g
//same code as above
java.util.concurrent.RejectedExecutionException: Task scala.concurrent.impl.CallbackRunnable@7b600fed rejected from java.util.concurrent.ThreadPoolExecutor@1431127[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1263]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:133)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
at scala.concurrent.Promise$class.complete(Promise.scala:55)
at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153)
at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at org.spark-project.guava.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293)
at scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:133)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
at scala.concurrent.Promise$class.complete(Promise.scala:55)
at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153)
at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
3) lorsque j'essaie de paralléliser, je reçois une erreur, trop
spark-shell --master yarn-client --num-executors 10 –driver-memory 10g
//...
var l = List(34, 32, 132, 352).par
//same code as above, just parallelized before calling foreach
//i can see the parallel execution by the console messages (my class gives some and they are printed out parallel now instead of sequentielly
scala.collection.parallel.CompositeThrowable: Multiple exceptions thrown during a parallel computation: java.lang.IllegalStateException: SparkContext has been shutdown
org.apache.spark.SparkContext.runJob(SparkContext.scala:1816)
org.apache.spark.SparkContext.runJob(SparkContext.scala:1837)
org.apache.spark.SparkContext.runJob(SparkContext.scala:1850)
org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:215)
org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:207)
org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1385)
org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1385)
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1903)
org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1384)
.
.
.
java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext org.apache.spark.SparkContext.org$apache$spark$SparkContext$$assertNotStopped(SparkContext.scala:104)
org.apache.spark.SparkContext.broadcast(SparkContext.scala:1320)
org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:104)
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)
org.apache.spark.sql.execution.SparkStrategies$EquiJoinSelection$.makeBroadcastHashJoin(SparkStrategies.scala:92)
org.apache.spark.sql.execution.SparkStrategies$EquiJoinSelection$.apply(SparkStrategies.scala:104)
Il y a en fait plus de 10 exécuteurs, mais 4 nœuds. Je n'ai jamais configurer l'étincelle contexte. Ses déjà donné au démarrage.
- Veuillez fournir l'erreur complète stacktrace. Aussi la ligne
DataContainer.getDataFrame(i)::l
ne pas regarder à droite.
Vous devez vous connecter pour publier un commentaire.
Vous pouvez utiliser scala en parallèle des collections pour atteindre
foreach
parallélisme sur le côté du conducteur.*Cependant, un mot de prudence: votre cluster capable de travaux en cours d'exécution parallèle? Vous pouvez soumettre les travaux à votre étincelle de cluster, parallèlement, mais ils vont peut-être arriver en file d'attente sur le cluster et sont exécutées de manière séquentielle.
Vous pouvez utiliser scala de l'Avenir et de la bougie Juste de Planification, par exemple
Avec FairScheduler et les piscines différentes, chacune de travail simultanés aura une juste part de l'étincelle ressources de cluster.
Certains de référence concernant la scala avenir ici. Vous pourriez avoir besoin d'ajouter des rappels nécessaires sur la fin, de la réussite et/ou d'échecs.
Je l'ai fait en utilisant quelque chose comme
using List.par.foreach{object => print(object)}
.Je suis à l'aide de Zeppelin sur la Spark 2.3. J'ai une utilisation similaire de cas où j'ai besoin d'obtenir les données de jour en jour, et de les traiter séparément. Cela ne peut être fait à l'aide d'un ensemble de mois de données en raison de certaines conditions de jointure sur les tables que j'utilise. Voici un exemple de mon code:
Cela permet d'obtenir les données pour les deux jours, de la traiter, puis de copier uniquement la sortie souhaitée. L'exécution de ce sans
par
prendra environ 15 minutes par jour, mais avecpar
il a fallu 1 heure pour le mois entier. Cela dépendra aussi de ce que votre étincelle cluster peut prendre en charge et de la taille de vos données.