PySpark Comment lire CSV dans Dataframe, et les manipuler

Je suis assez nouveau à pyspark et suis en train de l'utiliser pour traiter un grand jeu de données qui est enregistré en tant que fichier csv.
J'aimerais lire fichier CSV dans spark dataframe, supprimer des colonnes, et d'ajouter de nouvelles colonnes.
Comment dois-je faire?

J'ai de la difficulté à obtenir ces données dans un dataframe. C'est une version allégée de ce que j'ai à ce jour:

def make_dataframe(data_portion, schema, sql):
    fields = data_portion.split(",")
    return sql.createDateFrame([(fields[0], fields[1])], schema=schema)

if __name__ == "__main__":
    sc = SparkContext(appName="Test")
    sql = SQLContext(sc)

    ...

    big_frame = data.flatMap(lambda line: make_dataframe(line, schema, sql))
                .reduce(lambda a, b: a.union(b))

    big_frame.write \
        .format("com.databricks.spark.redshift") \
        .option("url", "jdbc:redshift://<...>") \
        .option("dbtable", "my_table_copy") \
        .option("tempdir", "s3n://path/for/temp/data") \
        .mode("append") \
        .save()

    sc.stop()

Cela produit une erreur TypeError: 'JavaPackage' object is not callable à la réduire étape.

Est-il possible de faire cela? L'idée de réduire à un dataframe est d'être capable d'écrire les données à une base de données (Redshift, à l'aide de la spark-décalage vers le rouge).

J'ai aussi essayé d'utiliser unionAll(), et map() avec partial(), mais ne peut pas le faire fonctionner.

Je suis en cours d'exécution ce sur Amazon EMR, avec spark-redshift_2.10:2.0.0, et Amazon pilote JDBC RedshiftJDBC41-1.1.17.1017.jar.

Qu'est-ce que l'entrée (champs CSV), et ce qui devrait être la sortie? Vous devrez peut-être revoir votre code. Il me semble que vous pouvez éviter la nécessité de créer des bases de données-cadres et de l'union....
Le csv est juste des numéros qui peuvent être organisées dans le schéma de la dataframe. Je sais que je peux facilement l'enregistrer comme un EDR agréable et efficace, mais si je fais ça je ne peux pas écrire à un redshift de la base de données (autant que je sache), ce qui est le but ultime.
ce que j'ai essayé de faire allusion - je suppose que vous pouvez le résoudre en utilisant une étincelle de données-image, sans avoir besoin de plusieurs données-images + de l'union. encore une fois - ce que l'algorithme sont que vous essayez d'utiliser? quel est le résultat attendu?
J'ai ajouté l'étincelle redshift la fonction d'écriture, je l'utiliserais avec le dataframe. Cela permettrait d'ajouter les données à une table existante. Vous avez peut-être raison, je ne sais pas si il y a une meilleure façon de renvoyer les données à partir de la fonction map (make_dataframe) pour me permettre de créer un cadre?

OriginalL'auteur Tim B | 2016-10-30