Appliquer une fonction à grouper des données avec pyspark
Je vais essayer d'obtenir le nombre de mots à partir d'un csv lors d'un regroupement sur une autre colonne. Mon csv a trois colonnes: id, message et user_id. J'ai lu cela et ensuite de diviser le message et le stockage d'une liste de unigrams:
+-----------------+--------------------+--------------------+
| id| message| user_id|
+-----------------+--------------------+--------------------+
|10100720363468236|[i'm, sad, to, mi...|dceafb541a1b8e894...|
|10100718944611636|[what, does, the,...|dceafb541a1b8e894...|
|10100718890699676|[at, the, oecd, w...|dceafb541a1b8e894...|
+-----------------+--------------------+--------------------+
Prochaine, compte tenu de mon dataframe df
, je veux du groupe par user_id
et puis obtenir les chiffres pour chacun des unigrams. Comme un simple premier passage, j'ai essayé de regroupement par user_id
et obtenir la longueur de la groupés champ du message:
from collections import Counter
from pyspark.sql.types import ArrayType, StringType, IntegerType
from pyspark.sql.functions import udf
df = self.session.read.csv(self.corptable, header=True,
mode="DROPMALFORMED",)
# split my messages ....
# message is now ArrayType(StringType())
grouped = df.groupBy(df["user_id"])
counter = udf(lambda l: len(l), ArrayType(StringType()))
grouped.agg(counter(df["message"]))
print(grouped.collect())
J'obtiens l'erreur suivante:
pyspark.sql.utils.AnalysisException: "expression '`message`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;"
Pas sûr de savoir comment contourner cette erreur. En général, comment fait-on appliquer une fonction à une seule colonne lors d'un regroupement d'une autre? Dois-je toujours avoir à créer une Fonction Définie par l'Utilisateur? Très peu d'Étincelle.
Modifier: Voici comment j'ai résolu ce problème, étant donné un générateur de jetons dans un autre fichier Python:
group_field = "user_id"
message_field = "message"
context = SparkContext()
session = SparkSession\
.builder\
.appName("dlastk")\
.getOrCreate()
# add tokenizer
context.addPyFile(tokenizer_path)
from tokenizer import Tokenizer
tokenizer = Tokenizer()
spark_tokenizer = udf(tokenizer.tokenize, ArrayType(StringType()))
df = session.read.csv("myFile.csv", header=True,)
df = df[group_field, message_field]
# tokenize the message field
df = df.withColumn(message_field, spark_tokenizer(df[message_field]))
# create ngrams from tokenized messages
n = 1
grouped = df.rdd.map(lambda row: (row[0], Counter([" ".join(x) for x in zip(*[row[1][i:] for i in range(n)])]))).reduceByKey(add)
# flatten the rdd so that each row contains (group_id, ngram, count, relative frequency
flat = grouped.flatMap(lambda row: [[row[0], x,y, y/sum(row[1].values())] for x,y in row[1].items()])
# rdd -> DF
flat = flat.toDF()
flat.write.csv("myNewCSV.csv")
Apparence des données:
# after read
+--------------------+--------------------+
| user_id| message|
+--------------------+--------------------+
|00035fb0dcfbeaa8b...|To the douchebag ...|
|00035fb0dcfbeaa8b...| T minus 1 week...|
|00035fb0dcfbeaa8b...|Last full day of ...|
+--------------------+--------------------+
# after tokenize
+--------------------+--------------------+
| user_id| message|
+--------------------+--------------------+
|00035fb0dcfbeaa8b...|[to, the, doucheb...|
|00035fb0dcfbeaa8b...|[t, minus, 1, wee...|
|00035fb0dcfbeaa8b...|[last, full, day,...|
+--------------------+--------------------+
# grouped: after 1grams extracted and Counters added
[('00035fb0dcfbeaa8bb70ffe24d614d4dcee446b803eb4063dccf14dd2a474611', Counter({'!': 545, '.': 373, 'the': 306, '"': 225, ...
# flat: after calculating sum and relative frequency for each 1gram
[['00035fb0dcfbeaa8bb70ffe24d614d4dcee446b803eb4063dccf14dd2a474611', 'face', 3, 0.000320547066994337], ['00035fb0dcfbeaa8bb70ffe24d614d4dcee446b803eb4063dccf14dd2a474611', 'was', 26, 0.002778074580617587] ....
# after flat RDD to DF
+--------------------+---------+---+--------------------+
| _1| _2| _3| _4|
+--------------------+---------+---+--------------------+
|00035fb0dcfbeaa8b...| face| 3| 3.20547066994337E-4|
|00035fb0dcfbeaa8b...| was| 26|0.002778074580617587|
|00035fb0dcfbeaa8b...| how| 22|0.002350678491291...|
+--------------------+---------+---+--------------------+
- Cette réponse dit: "Vous devez appliquer une variante de l'apa.." ce que je fais.
Vous devez vous connecter pour publier un commentaire.
Une approche pourrait consister à regrouper les mots dans une liste, puis utilisez la fonction python
Counter()
pour générer des mots. Pour les deux étapes, nous allons utiliserudf
s'. Tout d'abord, celui qui va aplatir la liste imbriquée résultant decollect_list()
de plusieurs tableaux:Deuxième, celle qui génère le nombre de mots de n-uplets, ou dans notre cas
struct
's:Mettant tous ensemble:
De données:
time
emballage pour mesurer le temps d'exécution. au début de bloc de code :start_time = time.time()
, à la fin:print("Execution time --- %s seconds ---" % (time.time() - start_time))
Essayer: