Charger un fichier CSV avec Spark
Je suis nouveau à l'Étincelle et je suis en train de lire des données au format CSV à partir d'un fichier avec Spark.
Voici ce que j'ai fais :
sc.textFile('file.csv')
.map(lambda line: (line.split(',')[0], line.split(',')[1]))
.collect()
Je m'attends à cet appel pour me donner une liste des deux premières colonnes de mon fichier mais j'obtiens cette erreur :
File "<ipython-input-60-73ea98550983>", line 1, in <lambda>
IndexError: list index out of range
bien que mon fichier CSV que plus d'une colonne.
Vous devez vous connecter pour publier un commentaire.
Êtes-vous sûr que tous les lignes ont au moins 2 colonnes? Pouvez-vous essayer quelque chose comme, juste pour vérifier?:
Alternativement, vous pouvez imprimer le coupable (le cas échéant):
csv
bibliothèque pour gérer tous les échapper, parce que tout simplement le fractionnement par des virgules ne fonctionnera pas si, par exemple, il y a des virgules dans les valeurs.","
.Étincelle 2.0.0+
Vous pouvez utiliser les haut-csv source de données directement:
ou
sans dépendances externes.
Spark < 2.0.0:
Au lieu de manuel d'analyse, ce qui est loin d'être négligeable dans le cas général, je recommande
spark-csv
:Assurez-vous que l'Étincelle CSV est inclus dans le chemin d'accès (
--packages
,--jars
,--driver-class-path
)Et de charger vos données comme suit:
Il peut gérer le chargement, le schéma d'inférence, en laissant tomber la malformation de lignes et ne nécessite pas le passage des données à partir de Python à la JVM.
Note:
Si vous connaissez le schéma, il est préférable d'éviter de schéma d'inférence et de le passer à
DataFrameReader
. En supposant que vous avez trois colonnes - integer, double et string:pyspark --packages com.databricks:spark-csv_2.11:1.4.0
(assurez-vous de changer la databricks/spark versions à ceux que vous avez installé).Et encore une autre option qui consiste à lire le fichier CSV à l'aide de Pandas, puis de l'importer les Pandas DataFrame dans Spark.
Par exemple:
Simplement fractionnement par la virgule sera également diviser les virgules qui sont dans les champs (par exemple,
a,b,"1,2,3",c
), de sorte qu'il n'est pas recommandé. zero323 réponse est bon si vous voulez utiliser le DataFrames API, mais si vous voulez coller à la base de l'Étincelle, vous pouvez analyser ces volumes dans la base de Python, avec la csv module:EDIT: Comme @muon mentionné dans les commentaires, cela permettra de traiter l'en-tête comme n'importe quel autre ligne de sorte que vous aurez besoin de l'extraire manuellement. Par exemple,
header = rdd.first(); rdd = rdd.filter(lambda x: x != header)
(assurez-vous de ne pas modifierheader
avant le filtre évalue). Mais à ce stade, vous êtes probablement mieux d'utiliser un haut-csv analyseur.StringIO
.csv
pouvez utiliser n'importe quel itérable b)__next__
ne devrait pas être utilisé directement et échoue sur la ligne vide. Jetez un oeil à flatMap c) Il serait beaucoup plus efficace d'utilisermapPartitions
au lieu d'initialisation du lecteur sur chaque ligne 🙂rdd.mapPartitions(lambda x: csv.reader(x))
travail tout enrdd.map(lambda x: csv.reader(x))
renvoie une erreur? Je m'attendais à la fois de jeter le mêmeTypeError: can't pickle _csv.reader objects
. Il semble aussi quemapPartitions
appelle automatiquement l'équivalent de "readlines" sur lecsv.reader
l'objet, le cas avecmap
, j'ai besoin d'appeler__next__
explicitement pour obtenir la liste descsv.reader
. 2) d'Où vientflatMap
venir? Juste appelermapPartitions
seul a fonctionné pour moi.rdd.mapPartitions(lambda x: csv.reader(x))
fonctionne parce quemapPartitions
s'attend à uneIterable
objet. Si vous voulez être explicite, vous pourriez vous de compréhension ou de générateur d'expression.map
seul ne fonctionne pas car il n'a pas d'itérer sur l'objet. D'où ma suggestion d'utiliserflatMap(lambda x: csv.reader([x]))
qui va se répéter sur le lecteur. MaismapPartitions
est beaucoup mieux ici.Maintenant, il y a également une autre option pour tout fichier csv: https://github.com/seahboonsiew/pyspark-csv comme suit:
Supposer que nous avons le contexte suivant
D'abord, distribuer pyspark-csv.py pour les liquidateurs de l'aide SparkContext
Lire des données au format csv via SparkContext et la convertir en DataFrame
Ceci est en ligne avec ce que JP Mercier initialement suggéré sur l'utilisation des Pandas, mais avec une modification majeure: Si vous lisez des données dans les Pandas en morceaux, il devrait être plus malléable. Ce qui signifie que vous pouvez analyser un fichier assez volumineux que les Pandas peuvent effectivement gérer en une seule pièce, et le transmettre à l'Étincelle dans les petites tailles. (Cela répond également le commentaire à propos de pourquoi vouloir utiliser Étincelle si on peut charger le tout dans les Pandas de toute façon.)
Si vous souhaitez charger csv comme un dataframe ensuite, vous pouvez effectuer les opérations suivantes:
Il a bien fonctionné pour moi.
Si vos données csv arrive pas contenir des sauts de ligne dans l'un des champs, vous pouvez charger vos données avec
textFile()
et l'analyseSi vous rencontrez un ou plusieurs ligne(s) avec plus ou moins de nombre de colonnes que 2 dans le jeu de données puis cette erreur peut se produire.
Je suis aussi nouvelle pour Pyspark et en essayant de lire un fichier CSV. Suivant le code a fonctionné pour moi:
Dans ce code, je suis en utilisant des données de kaggle le lien est: https://www.kaggle.com/carrie1/ecommerce-data
1. Sans mentionner le schéma:
Maintenant vérifier les colonnes:
sdfData.les colonnes
Sortie sera:
Vérifier le type de données pour chaque colonne:
Ce que va donner la trame de données avec toutes les colonnes de type de données que Chaînetapez
2. Avec le schéma:
Si vous connaissez le schéma ou vous voulez changer le type de données de chaque colonne dans le tableau ci-dessus puis utiliser cette (disons que je vais avoir des colonnes suivantes et en un type de données particulier pour chacun d'eux)
Maintenant vérifier le schéma pour le type de données de chaque colonne:
Édité: Nous pouvons utiliser la ligne de code suivante ainsi sans mentionner schéma explicite:
La sortie est:
La sortie ressemble à ceci: