PySpark Comment lire CSV dans Dataframe, et les manipuler
Je suis assez nouveau à pyspark et suis en train de l'utiliser pour traiter un grand jeu de données qui est enregistré en tant que fichier csv.
J'aimerais lire fichier CSV dans spark dataframe, supprimer des colonnes, et d'ajouter de nouvelles colonnes.
Comment dois-je faire?
J'ai de la difficulté à obtenir ces données dans un dataframe. C'est une version allégée de ce que j'ai à ce jour:
def make_dataframe(data_portion, schema, sql):
fields = data_portion.split(",")
return sql.createDateFrame([(fields[0], fields[1])], schema=schema)
if __name__ == "__main__":
sc = SparkContext(appName="Test")
sql = SQLContext(sc)
...
big_frame = data.flatMap(lambda line: make_dataframe(line, schema, sql))
.reduce(lambda a, b: a.union(b))
big_frame.write \
.format("com.databricks.spark.redshift") \
.option("url", "jdbc:redshift://<...>") \
.option("dbtable", "my_table_copy") \
.option("tempdir", "s3n://path/for/temp/data") \
.mode("append") \
.save()
sc.stop()
Cela produit une erreur TypeError: 'JavaPackage' object is not callable
à la réduire étape.
Est-il possible de faire cela? L'idée de réduire à un dataframe est d'être capable d'écrire les données à une base de données (Redshift, à l'aide de la spark-décalage vers le rouge).
J'ai aussi essayé d'utiliser unionAll()
, et map()
avec partial()
, mais ne peut pas le faire fonctionner.
Je suis en cours d'exécution ce sur Amazon EMR, avec spark-redshift_2.10:2.0.0
, et Amazon pilote JDBC RedshiftJDBC41-1.1.17.1017.jar
.
Le csv est juste des numéros qui peuvent être organisées dans le schéma de la dataframe. Je sais que je peux facilement l'enregistrer comme un EDR agréable et efficace, mais si je fais ça je ne peux pas écrire à un redshift de la base de données (autant que je sache), ce qui est le but ultime.
ce que j'ai essayé de faire allusion - je suppose que vous pouvez le résoudre en utilisant une étincelle de données-image, sans avoir besoin de plusieurs données-images + de l'union. encore une fois - ce que l'algorithme sont que vous essayez d'utiliser? quel est le résultat attendu?
J'ai ajouté l'étincelle redshift la fonction d'écriture, je l'utiliserais avec le dataframe. Cela permettrait d'ajouter les données à une table existante. Vous avez peut-être raison, je ne sais pas si il y a une meilleure façon de renvoyer les données à partir de la fonction map (make_dataframe) pour me permettre de créer un cadre?
OriginalL'auteur Tim B | 2016-10-30
Vous devez vous connecter pour publier un commentaire.
Mise à jour - répondre aussi à votre question dans les commentaires:
Lire des données à partir d'un fichier CSV à dataframe:
Il semble que vous essayez seulement de lire fichier CSV dans une étincelle dataframe.
Si soi - ma réponse ici: https://stackoverflow.com/a/37640154/5088142.
Le code suivant devrait lire CSV dans une étincelle de données-cadre
chute de la colonne
vous pouvez déposer colonne à l'aide de "drop(col)"
https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html
drop(col)
ajouter une colonne
Vous pouvez utiliser "withColumn"
https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html
withColumn(colName, col)
Note: spark a beaucoup d'autres fonctions qui peuvent être utilisées (par exemple, vous pouvez utiliser le bouton "select" au lieu de "drop")
J'ai mis à jour ma réponse , pour répondre à vous questions dans votre commentaire
si j'ai répondu à votre question, merci de l'accepter.
C'est super, merci. Dois-je appeler en pcv()? Je voudrais éviter de soumettre toutes les données vers le conducteur si possible. Je me demandais si je pouvais ajouter la fonction write() de commande, par exemple, la withColumn commande?
mise à jour de ma réponse
OriginalL'auteur Yaron