Pyspark: Split plusieurs colonnes de tableau en lignes

J'ai un dataframe qui a une ligne, et plusieurs colonnes. Certaines colonnes sont des valeurs individuelles, et d'autres sont des listes. Toutes les colonnes de la liste sont de la même longueur. Je veux diviser chaque colonne de la liste dans une ligne distincte, tout en conservant la non-colonne de la liste tel qu'il est.

Échantillon DF:

from pyspark import Row
from pyspark.sql import SQLContext
from pyspark.sql.functions import explode

sqlc = SQLContext(sc)

df = sqlc.createDataFrame([Row(a=1, b=[1,2,3],c=[7,8,9], d='foo')])
# +---+---------+---------+---+
# |  a|        b|        c|  d|
# +---+---------+---------+---+
# |  1|[1, 2, 3]|[7, 8, 9]|foo|
# +---+---------+---------+---+

Ce que je veux:

+---+---+----+------+
|  a|  b|  c |    d |
+---+---+----+------+
|  1|  1|  7 |  foo |
|  1|  2|  8 |  foo |
|  1|  3|  9 |  foo |
+---+---+----+------+

Si je n'avais qu'une colonne de la liste, ce serait facile, il vous suffit de faire un explode:

df_exploded = df.withColumn('b', explode('b'))
# >>> df_exploded.show()
# +---+---+---------+---+
# |  a|  b|        c|  d|
# +---+---+---------+---+
# |  1|  1|[7, 8, 9]|foo|
# |  1|  2|[7, 8, 9]|foo|
# |  1|  3|[7, 8, 9]|foo|
# +---+---+---------+---+

Cependant, si j'essaie aussi de explode la c colonne, je me retrouve avec un dataframe avec une longueur au carré de ce que je veux:

df_exploded_again = df_exploded.withColumn('c', explode('c'))
# >>> df_exploded_again.show()
# +---+---+---+---+
# |  a|  b|  c|  d|
# +---+---+---+---+
# |  1|  1|  7|foo|
# |  1|  1|  8|foo|
# |  1|  1|  9|foo|
# |  1|  2|  7|foo|
# |  1|  2|  8|foo|
# |  1|  2|  9|foo|
# |  1|  3|  7|foo|
# |  1|  3|  8|foo|
# |  1|  3|  9|foo|
# +---+---+---+---+

Est ce que je veux - pour chaque colonne, prendre la n-ième élément du tableau dans la colonne et l'ajouter à une nouvelle ligne. J'ai essayé de cartographie d'un exploser à travers toutes les colonnes dans le dataframe, mais cela ne semble pas fonctionner soit:

df_split = df.rdd.map(lambda col: df.withColumn(col, explode(col))).toDF()
InformationsquelleAutor Steve | 2016-12-07