Itérer les lignes et les colonnes dans Spark dataframe
J'ai le texte suivant Étincelle dataframe qui est créé dynamiquement:
val sf1 = StructField("name", StringType, nullable = true)
val sf2 = StructField("sector", StringType, nullable = true)
val sf3 = StructField("age", IntegerType, nullable = true)
val fields = List(sf1,sf2,sf3)
val schema = StructType(fields)
val row1 = Row("Andy","aaa",20)
val row2 = Row("Berta","bbb",30)
val row3 = Row("Joe","ccc",40)
val data = Seq(row1,row2,row3)
val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
df.createOrReplaceTempView("people")
val sqlDF = spark.sql("SELECT * FROM people")
Maintenant, j'ai besoin de parcourir chaque ligne et colonne dans sqlDF
pour imprimer chaque colonne, c'est ma tentative:
sqlDF.foreach { row =>
row.foreach { col => println(col) }
}
row
est de type Row
, mais n'est pas itératif c'est pourquoi ce code génère une erreur de compilation dans row.foreach
. Comment parcourir chaque colonne dans Row
?
- Pour l'impression dataframe pourquoi n'utilisez-vous pas sqlDF.spectacle?
- println est juste pour la question, j'ai besoin d'accéder à des données de colonne dans le programme
- Double Possible de Spark extraction des valeurs à partir d'une Ligne
- Cette question est différente, il vous demande comment faire pour extraire des valeurs de colonnes, l'une de vous référer à l'est d'extraire les valeurs de Lignes
Vous devez vous connecter pour publier un commentaire.
Vous pouvez convertir
Row
àSeq
avectoSeq
. Une fois tourné versSeq
vous pouvez effectuer une itération sur elle comme d'habitude avecforeach
,map
ou tout ce que vous avez besoinDe sortie:
Considérer que vous avez un
Dataframe
comme ci-dessousÀ la boucle de votre Dataframe et d'en extraire les éléments de la Dataframe, vous pouvez soit choisir l'un des ci-dessous approches.
Approche 1 - Boucle en utilisant foreach
Lecture en boucle d'un dataframe directement à l'aide de
foreach
boucle n'est pas possible. Pour ce faire, vous devez d'abord définir le schéma de dataframe à l'aide decase class
et ensuite, vous devez spécifier ce schéma pour le dataframe.Veuillez voir le résultat ci-dessous :
Approche 2 - Boucle à l'aide de rdd
Utilisation
rdd.collect
sur le dessus de votre Dataframe. Lerow
variable contiendra chaque ligne de Dataframe derdd
type de ligne. Pour obtenir chaque élément d'une ligne, utilisezrow.mkString(",")
qui contiendra la valeur de chaque ligne de valeurs séparées par des virgules. À l'aide desplit
fonction (intégré la fonction), vous pouvez accéder à chaque valeur de la colonne derdd
ligne avec l'indice.Remarque qu'il y a deux inconvénients de cette approche.
1. Si il y a un
,
dans la colonne valeur, les données seront, à tort, de scission à la colonne adjacente.2.
rdd.collect
est unaction
qui renvoie toutes les données pour le pilote de la mémoire où le conducteur de la mémoire peut ne pas être que beaucoup énorme pour stocker les données, qui s'achève avec l'obtention de l'application a échoué.Je vous recommande d'utiliser Approche 1.
Approche 3 - à l'Aide de où puis sélectionnez
Vous pouvez utiliser directement
where
etselect
qui va à l'interne de la boucle et trouve les données. Depuis, il ne devrait pas en jette Indice lié exception, une condition if est utiliséApproche 4 - Utilisation de tables temporaires
Vous pouvez vous inscrire dataframe comme temptable qui seront stockées dans l'étincelle de la mémoire. Ensuite, vous pouvez utiliser une requête select comme d'autres base de données à interroger les données, puis de les recueillir et de les enregistrer dans une variable
Vous devez utiliser
mkString
sur votreRow
:Mais notez que cela sera imprimé à l'intérieur, les exécuteurs testamentaires de la JVM, donc norally vous ne verrez pas la sortie (à moins de travailler avec des maîtres = local)
sqlDF.foreach
ne fonctionne pas pour moi, mais s'Approche de 1 de @Sarath Avanavu réponse fonctionne, mais il était aussi en train de jouer avec l'ordre des enregistrements de temps en temps.J'ai trouvé une façon de plus pour ce qui est de travail
simple recueillir résultat et ensuite appliquer foreach
df.collect().foreach(println)