Coder et de les assembler de multiples fonctions dans PySpark
J'ai un Python de la classe que j'utilise pour charger et traiter des données dans Spark. Parmi les différentes choses que je dois faire, je suis en générant une liste de variables muettes provenant de différentes colonnes dans une Étincelle dataframe. Mon problème est que je ne suis pas sûr de savoir comment bien définir une Fonction Définie par l'Utilisateur pour accomplir ce dont j'ai besoin.
Je ne ont actuellement une méthode qui, lorsqu'elles sont mappées sur la dataframe RDD, résout une partie du problème (n'oubliez pas que c'est une méthode dans un plus grand data_processor
classe):
def build_feature_arr(self,table):
# this dict has keys for all the columns for which I need dummy coding
categories = {'gender':['1','2'], ..}
# there are actually two differnt dataframes that I need to do this for, this just specifies which I'm looking at, and grabs the relevant features from a config file
if table == 'users':
iter_over = self.config.dyadic_features_to_include
elif table == 'activty':
iter_over = self.config.user_features_to_include
def _build_feature_arr(row):
result = []
row = row.asDict()
for col in iter_over:
column_value = str(row[col]).lower()
cats = categories[col]
result += [1 if column_value and cat==column_value else 0 for cat in cats]
return result
return _build_feature_arr
Essentiellement ce que ce n'est, pour le dataframe, prend la variable catégorique valeurs pour les colonnes spécifiées, et renvoie une liste de valeurs de ces nouvelles variables muettes. Que signifie le code suivant:
data = data_processor(init_args)
result = data.user_data.rdd.map(self.build_feature_arr('users'))
retourne quelque chose comme:
In [39]: result.take(10)
Out[39]:
[[1, 0, 0, 0, 1, 0],
[1, 0, 0, 1, 0, 0],
[1, 0, 0, 0, 0, 0],
[1, 0, 1, 0, 0, 0],
[1, 0, 0, 1, 0, 0],
[1, 0, 0, 1, 0, 0],
[0, 1, 1, 0, 0, 0],
[1, 0, 1, 1, 0, 0],
[1, 0, 0, 1, 0, 0],
[1, 0, 0, 0, 0, 1]]
C'est exactement ce que je veux en terme de génération de la liste des variables muettes que je veux, mais voici ma question: Comment puis-je soit (un) de faire un fichier UDF avec les mêmes fonctionnalités que je peux utiliser dans une Étincelle de requête SQL (ou d'une autre façon, je suppose), ou (b) de prendre les RDD résultant de la carte décrite ci-dessus et ajoutez une nouvelle colonne à la user_data dataframe?
De toute façon, ce que je dois faire est de générer un nouveau dataframe contenant les colonnes de user_data, avec une nouvelle colonne (appelons feature_array
) contenant le résultat de la fonction ci-dessus (ou quelque chose d'équivalent sur le plan fonctionnel).
OriginalL'auteur moustachio | 2015-10-07
Vous devez vous connecter pour publier un commentaire.
Spark >= 2.3
Depuis Étincelle 2.3
OneHotEncoder
est dépréciée en faveur deOneHotEncoderEstimator
. Si vous utilisez une version récente veuillez modifierencoder
codeSpark < 2.3
Bien, vous pouvez écrire un UDF mais pourquoi le feriez-vous? Il y a déjà un certain nombre d'outils conçus pour gérer cette catégorie de tâches:
Tout d'abord
StringIndexer
.Prochaine
OneHotEncoder
:VectorAssembler
:Si
bar
contenus variables catégorielles vous pouvez utiliserVectorIndexer
pour définir les métadonnées requises:mais il n'est pas le cas ici.
Enfin, vous pouvez envelopper tout cela à l'aide de pipelines:
Sans doute, il est beaucoup plus robuste et propre approche de l'écriture tout à partir de zéro. Il ya quelques mises en garde en particulier quand vous en avez besoin uniforme de codage entre les différents ensembles de données. Vous pouvez en lire plus dans la documentation officielle pour
StringIndexer
etVectorIndexer
.Une autre façon d'obtenir une production comparable est
RFormula
qui:Comme vous pouvez le voir, il est beaucoup plus concis, mais plus difficile à composer ne permet pas beaucoup de personnalisation. Néanmoins, le résultat d'un simple pipeline comme celui-ci sera identique:
Concernant vos questions:
C'est juste une UDF comme les autres. Assurez-vous que vous utilisez des types pris en charge, et au-delà que tout devrait fonctionner parfaitement.
Note:
Pour Spark 1.x remplacez
pyspark.ml.linalg
avecpyspark.mllib.linalg
.model.matrix
dans R). Plus susceptibles de former un certain type de modèle linéaire. Rish explication - la chaîne de l'indexeur genre de crée comme facteur de colonne à partir de chaînes, l'une chaude appelsmodel.matrix
🙂merci @zero323! Juste une remarque: à partir de Spark 2.0+
from pyspark.mllib.linalg import DenseVector
doit être remplacé parfrom pyspark.ml.linalg import DenseVector
, sinon, vous aurez des erreurs de types dans leVectorIndexer
stadePrécisé, merci.
J'ai une question....si je lance un randomforest_Classifier ce sont des données que je voudrais obtenir la forêt aléatoire feuilles en termes de nombre de personnes(en raison de l'indexation). Comment puis-je attacher en arrière à l'original, les descriptions (j'.e texte en anglais) dans une belle façon. Par exemple, la forêt aléatoire classificateur n'a pas meatadata et il devient une tâche difficile. J'ai une vague idée que je dois utiliser quelque chose comme IndexToString() mais je ne suis pas sûr de la façon de l'utiliser
pouvez-vous expliquer comment lire les gender_vector après oneHotEncoding dans votre exemple?
OriginalL'auteur