PySpark DataFrames - façon d'énumérer sans convertir en Pandas?
J'ai un très gros pyspark.sql.dataframe.DataFrame nommé df.
J'ai besoin d'une certaine manière de l'énumération des enregistrements, ainsi, être en mesure d'accéder à l'enregistrement avec certains index. (ou sélectionner un groupe d'enregistrements avec des indices de gamme)
Dans les pandas, j'ai pu faire juste
indexes=[2,3,6,7]
df[indexes]
Ici, je veux quelque chose de similaire, (et sans les convertir dataframe de pandas)
Le plus proche que je peux obtenir à l'est:
- L'énumération de tous les objets dans l'original dataframe par:
indexes=np.arange(df.count()) df_indexed=df.withColumn('index', indexes)
- À la recherche pour les valeurs que j'ai besoin d'aide, où() fonction.
QUESTIONS:
- Pourquoi il ne fonctionne pas et comment le faire fonctionner? Comment ajouter une ligne à un dataframe?
- Serait-il travailler plus tard pour faire quelque chose comme:
indexes=[2,3,6,7] df1.where("index in indexes").collect()
- Importe quel plus rapide et la plus simple façon de traiter avec elle?
source d'informationauteur Maria Koroliuk
Vous devez vous connecter pour publier un commentaire.
Cela ne fonctionne pas car:
withColumn
devrait être unColumn
n'est pas une collection.np.array
ne marchera pas ici"index in indexes"
comme une expression SQL àwhere
indexes
est hors de portée et il n'est pas résolu comme un identificateur validePySpark >= 1.4.0
Vous pouvez ajouter des numéros de ligne à l'aide de fenêtre correspondante de la fonction et de la requête à l'aide deColumn.isin
méthode ou un formatage de chaîne de requête:On dirait les fonctions de la fenêtre appelée sans
PARTITION BY
clause de déplacer toutes les données de la partition unique, ce qui est en haut est peut-être pas la meilleure solution, après tout.Pas vraiment. Spark DataFrames ne prennent pas en charge aléatoire de l'accès aux lignes.
PairedRDD
peut être consulté à l'aide delookup
méthode qui est relativement rapide si les données sont partitionnées en utilisantHashPartitioner
. Il est également indexées ca projet qui contribue à l'efficacité des recherches.Modifier:
Indépendant de PySpark version, vous pouvez essayer quelque chose comme cela:
Si vous voulez un numéro de série, c'est la garantie de ne pas entrer en collision, mais ne nécessite pas un
.over(partitionBy())
ensuite, vous pouvez utilisermonotonicallyIncreasingId()
.Noter cependant que les valeurs ne sont pas particulièrement "propres". Chaque partition est donné une fourchette de valeur et la sortie ne sera pas contigus. E. g.
0, 1, 2, 8589934592, 8589934593, 8589934594
.Cela a été ajouté à Étincelle Avr 28, 2015 ici: https://github.com/apache/spark/commit/d94cd1a733d5715792e6c4eac87f0d5c81aebbe2
Vous pouvez certainement ajouter un tableau pour l'indexation, un tableau de votre choix en effet:
En Scala, nous devons d'abord créer une indexation de Tableau:
Vous pouvez maintenant ajouter cet article à votre DF. Tout d'abord, Pour cela, vous devez ouvrir nos DF et l'obtenir comme un tableau, puis zip avec votre index_array et puis nous convertir le nouveau tableau et RDD. La dernière étape consiste à obtenir que d'un DF:
L'indexation serait plus clair après.
monotonicallyIncreasingId()
- ce qui va affecter les numéros de ligne dans incresing l'ordre, mais non pas dans l'ordre.exemple de sortie avec 2 colonnes:
|--------------------- | ------------------|
| RowNo | Heading 2 |
|--------------------- | ------------------|
| 1 | xy |
|--------------------- | ------------------|
| 12 | xz |
|--------------------- | ------------------|
Si vous voulez attribuer des numéros de ligne utiliser l'astuce suivante.
Testé spark-2.0.1 et versions supérieures.
df.createOrReplaceTempView("df")
dfRowId = spark.sql("select *, row_number() over (partition by 0) as rowNo from df")
exemple de sortie avec 2 colonnes:
|--------------------- | ------------------|
| RowNo | Heading 2 |
|--------------------- | ------------------|
| 1 | xy |
|--------------------- | ------------------|
| 2 | xz |
|--------------------- | ------------------|
Espère que cette aide.