Ajouter de nouvelles données à partitionné parquet fichiers
Je suis en train d'écrire un ETL processus, où j'ai besoin de lire les horaires des fichiers journaux, partition de données, et de l'enregistrer. Je suis à l'aide de l'Étincelle (en Databricks).
Les fichiers journaux sont CSV j'ai donc lu et de les appliquer à un schéma, puis de réaliser mes transformations.
Mon problème est, comment puis-je enregistrer chaque heure de données comme un parquet format, mais l'ajouter à l'ensemble de données? Lors de l'enregistrement, j'ai besoin de partition en 4 colonnes présentes dans le dataframe.
Voici mon enregistrer en ligne:
data
.filter(validPartnerIds($"partnerID"))
.write
.partitionBy("partnerID","year","month","day")
.parquet(saveDestination)
Le problème est que si le dossier de destination existe le sauver renvoie une erreur.
Si la destination n'existe pas, alors je ne suis pas en ajoutant mes fichiers.
J'ai essayé d'utiliser .mode("append")
mais je trouve que l'Allumage échoue parfois à mi-chemin à travers, de sorte que je finis par perdre combien de mes données est écrit et la façon dont beaucoup j'ai encore besoin d'écrire.
Je suis en utilisant le parquet parce que le partitionnement augmente considérablement mon interrogation dans l'avenir. Ainsi, je dois écrire les données sous plusieurs formats de fichier sur le disque et ne peut pas utiliser une base de données tels que le Druide ou Cassandra.
Des suggestions sur la façon de partitionner mon dataframe et enregistrer les fichiers (colle à parquet ou un autre format) est grandement apprécié.
.mode(append)
.L'erreur que je reçois est: est-ce Causé par: java.io.IOException: Fichier déjà exists:/tracking/v4/010316/gif=a/partnerID=111/year=2016/month=1/day=3/part-r-00147-8f30e760-3706-4e4c-bf56-e3b5515942d1.gz.parquet, je pense que cette erreur est renvoyée en raison d'une planification des tâches d'incompatibilité lors de certaines opérations d'écriture, de prendre du temps.
Comment avez-vous résolu ce problème? Je suis en train de gérer la situation semblable ici. Plus de 10 000 partitions, en fait, encore plus, depuis une partition à l'heure. Je suis en train de construire une solution autour de cela, toutes les suggestions.? Merci.
à l'aide de la dernière version de spark (je suis en utilisant Databricks de l'Environnement d'Exécution qui a de nombreuses fonctions à l'extérieur de l'open source étincelle), vous pouvez utiliser le bouton "ajouter" mode de l'écrivain. Avant de résoudre ce problème, vous devriez reconsidérer votre partition de colonne(s) qui donnent lieu à 10k+ partitions. Je ne sais pas la quantité de données que vous avez, mais est-il possible de partitionner par jour au lieu de l'heure? Essayez de repartitionnement vos données pour vous assurer d'avoir 200-500 mo de fichiers une fois que vous les écrivez. Êtes-vous essayer d'écrire tous les 10k partitions à la fois ou chaque lot d'un peu de partitions?
C'est le problème que j'essaye de résoudre. stackoverflow.com/questions/50197782/... . Je suis en train de partition par une liste d'id (ou) de l'id de produit. Alors voulez stocker listingId=612/année=2018/mois=3/jour=5 Si je roule à des partitions pour chaque inscription. Mais c'est toujours trop grand nombre de partition en raison de l'augmentation du nombre d'inscriptions. Toutes les suggestions?
OriginalL'auteur Saman | 2016-01-21
Vous devez vous connecter pour publier un commentaire.
Si vous devez ajouter les fichiers, vous devez absolument utiliser le mode ajout. Je ne sais pas combien de partitions vous attendre à générer, mais je trouve que si vous avez de nombreux partitions,
partitionBy
va entraîner un certain nombre de problèmes (de mémoire - et IO-questions à l'identique).Si vous pensez que votre problème est causé par les opérations d'écriture prend trop de temps, je vous recommande d'essayer ces deux choses:
1) Utilisation snappy par l'ajout, à la configuration:
2) Désactiver la génération des fichiers de métadonnées dans le
hadoopConfiguration
sur leSparkContext
comme ceci:Les métadonnées des fichiers sera un peu de temps à générer (voir ce blog), mais selon cette ils ne sont pas réellement important. Personnellement, j'ai toujours les désactiver et n'avons pas de problèmes.
Si vous générez beaucoup de partitions (> 500), j'ai peur que le mieux que je puisse faire est de vous suggérer que vous vous regardez dans une solution pas utiliser append-mode - je tout simplement jamais réussi à obtenir
partitionBy
de travailler avec beaucoup de partitions.j'ai des problème similaire, je suis de partitionnement sur la base de timestamp actuel, avec chaque nouvelle partition ajoutée qu'elle crée total de la tâche est égale à la partitions jusqu'à présent. j'.e si il y a 1000 partitions et 1 de nouveau à ajouter , il sera exécuté 1001 tâche et il augmente globalement de temps de travail. suis-je en train de faire quelque chose de mal ici?
OriginalL'auteur Glennie Helles Sindholt
Si vous utilisez des ménagères de partitionnement de vos données vont être répartis sur l'ensemble de vos partitions. Cela signifie que chaque tâche de produire et d'écrire des données pour chacun de vos fichiers de sortie.
Envisager de repartitionnement de vos données selon vos colonnes de partition avant d'écrire pour avoir toutes les données pour le fichier de sortie sur les mêmes partitions:
Voir: DataFrame.répartition
OriginalL'auteur MrChrisRodriguez