Utiliser Spark pour écrire un fichier parquet sur s3 sur s3a est très lent

Je suis en train d'écrire un parquet fichier à Amazon S3 à l'aide de Spark 1.6.1. La petite parquet que je suis de la génération est ~2GB une fois écrit, il n'est donc pas une quantité de données. Je suis en train d'essayer de prouver Spark comme une plate-forme que je peux utiliser.

Fondamentalement, ce que je vais faire est de configurer un star schema avec dataframesalors je vais écrire à ceux des tables de parquet. Les données à partir de fichiers csv fourni par un fournisseur et je suis en utilisant Étincelle comme un ETL plate-forme. J'ai actuellement un cluster à 3 nœuds dans ec2(r3.2xlarge) Donc 120GB de mémoire sur les exécuteurs testamentaires et 16 cœurs total.

Les fichiers d'entrée total d'environ 22GB et je suis l'extraction d'environ 2 go de données pour l'instant. Finalement, ce seront plusieurs téraoctets, lorsque je démarre le chargement de l'ensemble de données complet.

Voici mon spark/scala pseudocode:

  def loadStage(): Unit = {
sc.hadoopConfiguration.set("fs.s3a.buffer.dir", "/tmp/tempData")
sc.hadoopConfiguration.set("spark.sql.parquet.output.committer.class","org.apache.spark.sql.parquet.DirectParquetOutputCommitter")
sc.hadoopConfiguration.set("spark.sql.hive.convertMetastoreParquet","false")
var sqlCtx = new SQLContext(sc)
val DataFile = sc.textFile("s3a://my-bucket/archive/*/file*.gz")
//Setup header table/df
val header_rec = DataFile.map(_.split("\\|")).filter(x=> x(0) == "1")
val headerSchemaDef = "market_no,rel_date,field1, field2, field3....."
val headerSchema = StructType(headerSchemaDef.split(",").map(fieldName => StructField(fieldName, StringType,false)))
val headerRecords = header_rec.map(p => Row(p(3), p(8), p(1), p(2), p(4), p(5), p(6) ))
val header = sqlCtx.createDataFrame(headerRecords, headerSchema)
header.registerTempTable("header")
sqlCtx.cacheTable("header")
//Setup fact table/df
val fact_recs = DataFile.map(_.split("\\|")).filter(x=> x(0) == "2")
val factSchemaDef = "market_no,rel_date,field1, field2, field3....."
val factSchema = StructType(factSchemaDef.split(",").map(fieldName => StructField(fieldName, StringType,false)))
val records = fact_recs.map(p => Row(p(11), p(12), p(1), p(2), p(3), p(4), p(5), p(6), p(7), p(8), p(9), p(10)))
val df = sqlCtx.createDataFrame(records, factSchema)
df.registerTempTable("fact")
val results = sqlCtx.sql("select fact.* from header inner join fact on fact.market_no = header.market_no and fact.rel_date = header.rel_date")
println(results.count())
results.coalesce(1).write.mode(SaveMode.Overwrite).parquet("s3a://my-bucket/a/joined_data.parquet")
}

Le comte prend environ 2 minutes pour 465884512 lignes. L'écriture de parquet prend 38 minutes

Je comprends que le coalesce un shuffle pour le conducteur qui ne l'écrire.... mais la quantité de temps cela prend est de me faire penser que je suis en train de faire quelque chose de grave. Sans le coalescecela prend 15 minutes, ce qui OMI est toujours trop longue et me donne une tonne de petites parquet fichiers. J'aimerais avoir un seul gros fichier par jour de données que je vais avoir. J'ai un code pour effectuer le partitionnement par une valeur de champ ainsi, et il est tout aussi lent. J'ai aussi essayé de sortie de ce csv et qui prend environ 1 heure.

Aussi, je ne suis pas vraiment réglage du temps d'exécution des accessoires quand je suis à la présentation de mon travail. Ma console stats pour un emploi sont:

  • Vivant Travailleurs: 2
  • Cœurs en cours d'utilisation: un Total de 16, De 16 Utilisé
  • De la mémoire en cours d'utilisation: 117.5 GO au Total, 107.5 GO Utilisés
  • Applications: 1, 5 Terminé
  • Pilotes: 0 En Cours D'Exécution, 0 Complété
  • Statut: VIVANT

source d'informationauteur Brutus35