Appliquer une fonction à grouper des données avec pyspark

Je vais essayer d'obtenir le nombre de mots à partir d'un csv lors d'un regroupement sur une autre colonne. Mon csv a trois colonnes: id, message et user_id. J'ai lu cela et ensuite de diviser le message et le stockage d'une liste de unigrams:

+-----------------+--------------------+--------------------+
|               id|             message|             user_id|
+-----------------+--------------------+--------------------+
|10100720363468236|[i'm, sad, to, mi...|dceafb541a1b8e894...|
|10100718944611636|[what, does, the,...|dceafb541a1b8e894...|
|10100718890699676|[at, the, oecd, w...|dceafb541a1b8e894...|
+-----------------+--------------------+--------------------+

Prochaine, compte tenu de mon dataframe df, je veux du groupe par user_id et puis obtenir les chiffres pour chacun des unigrams. Comme un simple premier passage, j'ai essayé de regroupement par user_id et obtenir la longueur de la groupés champ du message:

from collections import Counter
from pyspark.sql.types import ArrayType, StringType, IntegerType
from pyspark.sql.functions import udf

df = self.session.read.csv(self.corptable, header=True,
        mode="DROPMALFORMED",)

# split my messages ....
# message is now ArrayType(StringType())

grouped = df.groupBy(df["user_id"])
counter = udf(lambda l: len(l), ArrayType(StringType()))
grouped.agg(counter(df["message"]))
print(grouped.collect())

J'obtiens l'erreur suivante:

pyspark.sql.utils.AnalysisException: "expression '`message`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;"

Pas sûr de savoir comment contourner cette erreur. En général, comment fait-on appliquer une fonction à une seule colonne lors d'un regroupement d'une autre? Dois-je toujours avoir à créer une Fonction Définie par l'Utilisateur? Très peu d'Étincelle.

Modifier: Voici comment j'ai résolu ce problème, étant donné un générateur de jetons dans un autre fichier Python:

group_field = "user_id"
message_field = "message"

context = SparkContext()
session = SparkSession\
        .builder\
        .appName("dlastk")\
        .getOrCreate()

# add tokenizer
context.addPyFile(tokenizer_path)
from tokenizer import Tokenizer
tokenizer = Tokenizer()
spark_tokenizer = udf(tokenizer.tokenize, ArrayType(StringType()))

df = session.read.csv("myFile.csv", header=True,)
df = df[group_field, message_field]

# tokenize the message field
df = df.withColumn(message_field, spark_tokenizer(df[message_field]))

# create ngrams from tokenized messages
n = 1
grouped = df.rdd.map(lambda row: (row[0], Counter([" ".join(x) for x in zip(*[row[1][i:] for i in range(n)])]))).reduceByKey(add)

# flatten the rdd so that each row contains (group_id, ngram, count, relative frequency
flat = grouped.flatMap(lambda row: [[row[0], x,y, y/sum(row[1].values())] for x,y in row[1].items()])

# rdd -> DF
flat = flat.toDF()
flat.write.csv("myNewCSV.csv")

Apparence des données:

# after read
+--------------------+--------------------+
|             user_id|             message|
+--------------------+--------------------+
|00035fb0dcfbeaa8b...|To the douchebag ...|
|00035fb0dcfbeaa8b...|   T minus 1 week...|
|00035fb0dcfbeaa8b...|Last full day of ...|
+--------------------+--------------------+

# after tokenize
+--------------------+--------------------+
|             user_id|             message|
+--------------------+--------------------+
|00035fb0dcfbeaa8b...|[to, the, doucheb...|
|00035fb0dcfbeaa8b...|[t, minus, 1, wee...|
|00035fb0dcfbeaa8b...|[last, full, day,...|
+--------------------+--------------------+

# grouped: after 1grams extracted and Counters added
[('00035fb0dcfbeaa8bb70ffe24d614d4dcee446b803eb4063dccf14dd2a474611', Counter({'!': 545, '.': 373, 'the': 306, '"': 225, ...

# flat: after calculating sum and relative frequency for each 1gram
[['00035fb0dcfbeaa8bb70ffe24d614d4dcee446b803eb4063dccf14dd2a474611', 'face', 3, 0.000320547066994337], ['00035fb0dcfbeaa8bb70ffe24d614d4dcee446b803eb4063dccf14dd2a474611', 'was', 26, 0.002778074580617587] ....

# after flat RDD to DF
+--------------------+---------+---+--------------------+
|                  _1|       _2| _3|                  _4|
+--------------------+---------+---+--------------------+
|00035fb0dcfbeaa8b...|     face|  3| 3.20547066994337E-4|
|00035fb0dcfbeaa8b...|      was| 26|0.002778074580617587|
|00035fb0dcfbeaa8b...|      how| 22|0.002350678491291...|
+--------------------+---------+---+--------------------+
  • Cette réponse dit: "Vous devez appliquer une variante de l'apa.." ce que je fais.
InformationsquelleAutor Sal | 2016-12-05