Paralléliser / éviter la boucle foreach spark

J'ai écrit une classe qui obtient un DataFrame, effectue quelques calculs sur elle et peut exporter les résultats. Le Dataframes sont générés par une Liste de Clés. Je sais que je fais cela dans un très unefficient chemin à droite maintenant:

var l = List(34, 32, 132, 352)      //Scala List

l.foreach{i => 
    val data:DataFrame = DataContainer.getDataFrame(i) //get DataFrame
    val x = new MyClass(data)                     //initialize MyClass with new Object
    x.setSettings(...)
    x.calcSomething()
    x.saveResults()                               //writes the Results into another Dataframe that is saved to HDFS
}

Je pense que le foreach sur la Scala liste n'est pas parallèle, alors comment puis-je éviter d'utiliser foreach ici? Le calcul de la DataFrames pourrait se produire en parallèle, que les résultats des calculs ne sont PAS d'entrée pour la prochaine DataFrame - comment puis-je mettre en œuvre cette?

Merci beaucoup!!

__edit:

ce que j'ai essayé de faire:

val l = List(34, 32, 132, 352)      //Scala List
var l_DF:List[DataFrame] = List()
l.foreach{ i =>
    DataContainer.getDataFrame(i)::l        //append DataFrame to List of Dataframes
}

val rdd:DataFrame = sc.parallelize(l)
rdd.foreach(data =>
    val x = new MyClass(data)
)

mais donne

Invalid tree; null:
null

edit 2:
Bon, je n'obtiens pas comment evrything fonctionne sous le capot....

1) Tout fonctionne bien lorsque j'exécute cette spark-shell

spark-shell –driver-memory 10g       
//...
var l = List(34, 32, 132, 352)      //Scala List

l.foreach{i => 
    val data:DataFrame = AllData.where($"a" === i) //get DataFrame
    val x = new MyClass(data)                     //initialize MyClass     with new Object
    x.calcSomething()
}

2) Erreur, quand je lance le même avec

spark-shell --master yarn-client --num-executors 10 –driver-memory 10g  
//same code as above
java.util.concurrent.RejectedExecutionException: Task scala.concurrent.impl.CallbackRunnable@7b600fed rejected from java.util.concurrent.ThreadPoolExecutor@1431127[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1263]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:133)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
at scala.concurrent.Promise$class.complete(Promise.scala:55)
at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153)
at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at org.spark-project.guava.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293)
at scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:133)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
at scala.concurrent.Promise$class.complete(Promise.scala:55)
at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153)
at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)

3) lorsque j'essaie de paralléliser, je reçois une erreur, trop

spark-shell --master yarn-client --num-executors 10 –driver-memory 10g
//...
var l = List(34, 32, 132, 352).par
//same code as above, just parallelized before calling foreach
//i can see the parallel execution by the console messages (my class gives some and they are printed out parallel now instead of sequentielly
scala.collection.parallel.CompositeThrowable: Multiple exceptions thrown during a parallel computation: java.lang.IllegalStateException: SparkContext has been shutdown
org.apache.spark.SparkContext.runJob(SparkContext.scala:1816)
org.apache.spark.SparkContext.runJob(SparkContext.scala:1837)
org.apache.spark.SparkContext.runJob(SparkContext.scala:1850)
org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:215)
org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:207)
org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1385)
org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1385)
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1903)
org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1384)
.
.
.
java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext                  org.apache.spark.SparkContext.org$apache$spark$SparkContext$$assertNotStopped(SparkContext.scala:104)
org.apache.spark.SparkContext.broadcast(SparkContext.scala:1320)
org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:104)
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)
org.apache.spark.sql.execution.SparkStrategies$EquiJoinSelection$.makeBroadcastHashJoin(SparkStrategies.scala:92)
org.apache.spark.sql.execution.SparkStrategies$EquiJoinSelection$.apply(SparkStrategies.scala:104)

Il y a en fait plus de 10 exécuteurs, mais 4 nœuds. Je n'ai jamais configurer l'étincelle contexte. Ses déjà donné au démarrage.

  • Veuillez fournir l'erreur complète stacktrace. Aussi la ligne DataContainer.getDataFrame(i)::l ne pas regarder à droite.