Écriture d'un seul fichier CSV à l'aide de l'étincelle-csv
Je suis en utilisant https://github.com/databricks/spark-csv , je suis en train d'écrire un seul CSV, mais pas en mesure de le faire, c'est de faire un dossier.
Besoin d'un Scala fonction qui prendra en paramètre comme chemin d'accès et nom de fichier et écrire que fichier CSV.
Vous devez vous connecter pour publier un commentaire.
C'est la création d'un dossier avec plusieurs fichiers, parce que chaque partition est enregistré individuellement. Si vous avez besoin d'un fichier de sortie unique (dans un dossier), vous pouvez
repartition
(de préférence si en amont des données est grande, mais nécessite une lecture aléatoire):ou
coalesce
:bloc de données avant de les enregistrer:
Toutes les données seront écrites dans
mydata.csv/part-00000
. Avant d'utiliser cette option assurez-vous de comprendre ce qui se passe et quel est le coût de transfert de toutes les données à un seul travailleur. Si vous utilisez le système de fichiers distribués avec la réplication, les données seront transférées à de nombreuses reprises, d'abord extraites d'un seul travailleur et par la suite répartis sur les nœuds de stockage.Alternativement, vous pouvez laisser votre code comme il est et l'utilisation à des fins générales des outils comme
cat
ou HDFSgetmerge
tout simplement de fusionner toutes les pièces par la suite..coalesce(1)
il dit que certains FileNotFoundException sur _temporary répertoire. C'est encore un bug dans spark : issues.apache.org/jira/browse/SPARK-2984coalesce(1)
être très coûteux et généralement pas pratique.Si vous exécutez Étincelle avec HDFS, j'ai résolu le problème en écrivant des fichiers csv normalement et en tirant parti de HDFS pour faire la fusion. Je suis en train de faire que Spark (1.6) directement:
Ne me souviens pas où j'ai appris ce truc, mais il pourrait fonctionner pour vous.
Je suis peut-être un peu tard pour le jeu ici, mais à l'aide de
coalesce(1)
ourepartition(1)
peut travailler pour de petits ensembles de données, mais de grands ensembles de données seront tous jetés dans une partition sur un nœud. C'est probablement à jeter OOM erreurs, ou, au mieux, processus lentement.Je vous suggère fortement que vous utilisez le
FileUtil.copyMerge()
fonction de l'Hadoop API. Cela permettra de fusionner les sorties dans un fichier unique.MODIFIER - Ce effectivement regroupe les données pour le conducteur plutôt que d'un exécuteur testamentaire nœud.
Coalesce()
serait bien si un seul exécuteur testamentaire a plus de RAM pour que le pilote.EDIT 2:
copyMerge()
est retiré dans Hadoop 3.0. Voir la suite de débordement de pile dans l'article pour plus d'informations sur la façon de travailler avec la version la plus récente: Hadoop comment faire CopyMerge dans Hadoop 3.0Si vous utilisez Databricks et peut s'adapter à toutes les données dans la RAM sur un des travailleurs (et donc possible d'utiliser
.coalesce(1)
), vous pouvez utiliser dbfs pour trouver et déplacer le fichier CSV ainsi obtenu:Si votre fichier ne rentre pas dans la RAM sur le travailleur, vous pourriez envisager de chaotic3quilibrium la suggestion d'utiliser FileUtils.copyMerge(). Je n'ai pas fait, et ne savons pas encore si c'est possible ou pas, par exemple, sur S3.
Cette réponse est construit sur les précédentes réponses à cette question ainsi que mes propres tests de l'extrait de code. J'ai posté à Databricks et je suis le rééditer ici.
La meilleure documentation pour dbfs du rm est récursif option que j'ai trouvé est sur un Databricks forum.
repartitionner/fusionner pour 1 partition avant de vous enregistrer (si vous souhaitez toujours obtenir un dossier, mais il aurait une partie de fichier)
vous pouvez utiliser
rdd.coalesce(1, true).saveAsTextFile(path)
il va stocker des données comme singile fichier dans le chemin d'accès/partie-00000
Une solution qui fonctionne pour S3 modifié à partir de Minkymorgan.
Tout simplement passer le temporaire partitionné chemin d'accès au répertoire (avec un nom différent de chemin final) comme le
srcPath
et seule finale d'un fichier csv/txt commedestPath
Spécifier égalementdeleteSource
si vous souhaitez supprimer le répertoire d'origine.de la spark
df.write()
API va créer plusieurs fichiers de pièce à l'intérieur de chemin ... à force étincelle écrire qu'une seule partie, l'utilisation du fichier dedf.coalesce(1).write.csv(...)
au lieu dedf.repartition(1).write.csv(...)
que se rejoignent une étroite transformation alors que répartition est une grande transformation de voir Spark - répartition() vs fusionnent()va créer un dossier dans filepath avec un
part-0001-...-c000.csv
fichierutilisation
d'avoir une interface utilisateur conviviale pour le nom de fichier
Il y a un moyen de plus pour utiliser Java