Pyspark: Split plusieurs colonnes de tableau en lignes
J'ai un dataframe qui a une ligne, et plusieurs colonnes. Certaines colonnes sont des valeurs individuelles, et d'autres sont des listes. Toutes les colonnes de la liste sont de la même longueur. Je veux diviser chaque colonne de la liste dans une ligne distincte, tout en conservant la non-colonne de la liste tel qu'il est.
Échantillon DF:
from pyspark import Row
from pyspark.sql import SQLContext
from pyspark.sql.functions import explode
sqlc = SQLContext(sc)
df = sqlc.createDataFrame([Row(a=1, b=[1,2,3],c=[7,8,9], d='foo')])
# +---+---------+---------+---+
# | a| b| c| d|
# +---+---------+---------+---+
# | 1|[1, 2, 3]|[7, 8, 9]|foo|
# +---+---------+---------+---+
Ce que je veux:
+---+---+----+------+
| a| b| c | d |
+---+---+----+------+
| 1| 1| 7 | foo |
| 1| 2| 8 | foo |
| 1| 3| 9 | foo |
+---+---+----+------+
Si je n'avais qu'une colonne de la liste, ce serait facile, il vous suffit de faire un explode
:
df_exploded = df.withColumn('b', explode('b'))
# >>> df_exploded.show()
# +---+---+---------+---+
# | a| b| c| d|
# +---+---+---------+---+
# | 1| 1|[7, 8, 9]|foo|
# | 1| 2|[7, 8, 9]|foo|
# | 1| 3|[7, 8, 9]|foo|
# +---+---+---------+---+
Cependant, si j'essaie aussi de explode
la c
colonne, je me retrouve avec un dataframe avec une longueur au carré de ce que je veux:
df_exploded_again = df_exploded.withColumn('c', explode('c'))
# >>> df_exploded_again.show()
# +---+---+---+---+
# | a| b| c| d|
# +---+---+---+---+
# | 1| 1| 7|foo|
# | 1| 1| 8|foo|
# | 1| 1| 9|foo|
# | 1| 2| 7|foo|
# | 1| 2| 8|foo|
# | 1| 2| 9|foo|
# | 1| 3| 7|foo|
# | 1| 3| 8|foo|
# | 1| 3| 9|foo|
# +---+---+---+---+
Est ce que je veux - pour chaque colonne, prendre la n-ième élément du tableau dans la colonne et l'ajouter à une nouvelle ligne. J'ai essayé de cartographie d'un exploser à travers toutes les colonnes dans le dataframe, mais cela ne semble pas fonctionner soit:
df_split = df.rdd.map(lambda col: df.withColumn(col, explode(col))).toDF()
Vous devez vous connecter pour publier un commentaire.
Spark >= 2.4
Vous pouvez remplacer
zip_
udf
avecarrays_zip
fonctionSpark < 2.4
Avec
DataFrames
et de l'UDF:Avec
RDDs
:Les deux solutions sont inefficaces en raison de Python communication frais généraux. Si la taille des données est fixe, vous pouvez faire quelque chose comme ceci:
ou encore:
Cela devrait être beaucoup plus rapide par rapport à l'UDF ou de RDD. Généralisée à l'appui d'un nombre quelconque de colonnes:
Vous auriez besoin d'utiliser
flatMap
, pasmap
que vous voulez faire de plusieurs lignes de sortie de chaque ligne d'entrée.