Utiliser Spark pour écrire un fichier parquet sur s3 sur s3a est très lent
Je suis en train d'écrire un parquet
fichier à Amazon S3
à l'aide de Spark 1.6.1
. La petite parquet
que je suis de la génération est ~2GB
une fois écrit, il n'est donc pas une quantité de données. Je suis en train d'essayer de prouver Spark
comme une plate-forme que je peux utiliser.
Fondamentalement, ce que je vais faire est de configurer un star schema
avec dataframes
alors je vais écrire à ceux des tables de parquet. Les données à partir de fichiers csv fourni par un fournisseur et je suis en utilisant Étincelle comme un ETL
plate-forme. J'ai actuellement un cluster à 3 nœuds dans ec2(r3.2xlarge)
Donc 120GB
de mémoire sur les exécuteurs testamentaires et 16 cœurs total.
Les fichiers d'entrée total d'environ 22GB et je suis l'extraction d'environ 2 go de données pour l'instant. Finalement, ce seront plusieurs téraoctets, lorsque je démarre le chargement de l'ensemble de données complet.
Voici mon spark/scala pseudocode
:
def loadStage(): Unit = {
sc.hadoopConfiguration.set("fs.s3a.buffer.dir", "/tmp/tempData")
sc.hadoopConfiguration.set("spark.sql.parquet.output.committer.class","org.apache.spark.sql.parquet.DirectParquetOutputCommitter")
sc.hadoopConfiguration.set("spark.sql.hive.convertMetastoreParquet","false")
var sqlCtx = new SQLContext(sc)
val DataFile = sc.textFile("s3a://my-bucket/archive/*/file*.gz")
//Setup header table/df
val header_rec = DataFile.map(_.split("\\|")).filter(x=> x(0) == "1")
val headerSchemaDef = "market_no,rel_date,field1, field2, field3....."
val headerSchema = StructType(headerSchemaDef.split(",").map(fieldName => StructField(fieldName, StringType,false)))
val headerRecords = header_rec.map(p => Row(p(3), p(8), p(1), p(2), p(4), p(5), p(6) ))
val header = sqlCtx.createDataFrame(headerRecords, headerSchema)
header.registerTempTable("header")
sqlCtx.cacheTable("header")
//Setup fact table/df
val fact_recs = DataFile.map(_.split("\\|")).filter(x=> x(0) == "2")
val factSchemaDef = "market_no,rel_date,field1, field2, field3....."
val factSchema = StructType(factSchemaDef.split(",").map(fieldName => StructField(fieldName, StringType,false)))
val records = fact_recs.map(p => Row(p(11), p(12), p(1), p(2), p(3), p(4), p(5), p(6), p(7), p(8), p(9), p(10)))
val df = sqlCtx.createDataFrame(records, factSchema)
df.registerTempTable("fact")
val results = sqlCtx.sql("select fact.* from header inner join fact on fact.market_no = header.market_no and fact.rel_date = header.rel_date")
println(results.count())
results.coalesce(1).write.mode(SaveMode.Overwrite).parquet("s3a://my-bucket/a/joined_data.parquet")
}
Le comte prend environ 2 minutes pour 465884512 lignes. L'écriture de parquet prend 38 minutes
Je comprends que le coalesce
un shuffle pour le conducteur qui ne l'écrire.... mais la quantité de temps cela prend est de me faire penser que je suis en train de faire quelque chose de grave. Sans le coalesce
cela prend 15 minutes, ce qui OMI est toujours trop longue et me donne une tonne de petites parquet
fichiers. J'aimerais avoir un seul gros fichier par jour de données que je vais avoir. J'ai un code pour effectuer le partitionnement par une valeur de champ ainsi, et il est tout aussi lent. J'ai aussi essayé de sortie de ce csv
et qui prend environ 1 heure.
Aussi, je ne suis pas vraiment réglage du temps d'exécution des accessoires quand je suis à la présentation de mon travail. Ma console stats pour un emploi sont:
- Vivant Travailleurs: 2
- Cœurs en cours d'utilisation: un Total de 16, De 16 Utilisé
- De la mémoire en cours d'utilisation: 117.5 GO au Total, 107.5 GO Utilisés
- Applications: 1, 5 Terminé
- Pilotes: 0 En Cours D'Exécution, 0 Complété
- Statut: VIVANT
source d'informationauteur Brutus35
Vous devez vous connecter pour publier un commentaire.
Étincelle par défaut cause d'un grand nombre de (probablement) une surcharge inutile pendant les opérations d'e/S, en particulier lors de l'écriture du S3. Cet article en discute plus en profondeur, mais il y a 2 paramètres que vous aurez envie d'envisager de changer.
à l'Aide de la DirectParquetOutputCommitter. Par défaut, l'Étincelle va enregistrer toutes les données dans un dossier temporaire puis déplacez ces fichiers par la suite. À l'aide de la DirectParquetOutputCommitter permettra d'économiser du temps de l'écriture pour le S3 chemin de sortieDésactiver le Schéma de la Fusion. Si le schéma de fusion est sur, le pilote nœud va scanner tous les fichiers afin d'assurer une cohérence du schéma. Cela est particulièrement coûteuse, car elle n'est pas distribuée de fonctionnement. Assurez-vous que cette option est désactivée par faire
val file = sqx.read.option("mergeSchema", "false").parquet(path)
La sortie directe livreur est passé de l'étincelle de la base de code; vous êtes à l'écriture de votre propre/ressusciter le supprimé de code dans votre propre POT. SI vous le faites, tourner la spéculation dans votre travail, et de savoir que d'autres pannes peuvent causer des problèmes, où le problème est "données non valides".
Sur une meilleure note, Hadoop 2.8 va ajouter un peu de S3A accélérations spécifiquement optimisé pour la lecture de formats binaires (ORC, Parquet) off S3; voir HADOOP-11694 pour plus de détails. Et certaines personnes sont de travail sur l'utilisation d'Amazon de la Dynamo pour la magasin de métadonnées qui doivent être en mesure de faire une robuste O(1) s'engager à la fin des travaux.