Coder et de les assembler de multiples fonctions dans PySpark

J'ai un Python de la classe que j'utilise pour charger et traiter des données dans Spark. Parmi les différentes choses que je dois faire, je suis en générant une liste de variables muettes provenant de différentes colonnes dans une Étincelle dataframe. Mon problème est que je ne suis pas sûr de savoir comment bien définir une Fonction Définie par l'Utilisateur pour accomplir ce dont j'ai besoin.

Je ne ont actuellement une méthode qui, lorsqu'elles sont mappées sur la dataframe RDD, résout une partie du problème (n'oubliez pas que c'est une méthode dans un plus grand data_processor classe):

def build_feature_arr(self,table):
    # this dict has keys for all the columns for which I need dummy coding
    categories = {'gender':['1','2'], ..}

    # there are actually two differnt dataframes that I need to do this for, this just specifies which I'm looking at, and grabs the relevant features from a config file
    if table == 'users':
        iter_over = self.config.dyadic_features_to_include
    elif table == 'activty':
        iter_over = self.config.user_features_to_include

    def _build_feature_arr(row):
        result = []
        row = row.asDict()
        for col in iter_over:
            column_value = str(row[col]).lower()
            cats = categories[col]
            result += [1 if column_value and cat==column_value else 0 for cat in cats]
        return result
    return _build_feature_arr

Essentiellement ce que ce n'est, pour le dataframe, prend la variable catégorique valeurs pour les colonnes spécifiées, et renvoie une liste de valeurs de ces nouvelles variables muettes. Que signifie le code suivant:

data = data_processor(init_args)
result = data.user_data.rdd.map(self.build_feature_arr('users'))

retourne quelque chose comme:

In [39]: result.take(10)
Out[39]:
[[1, 0, 0, 0, 1, 0],
 [1, 0, 0, 1, 0, 0],
 [1, 0, 0, 0, 0, 0],
 [1, 0, 1, 0, 0, 0],
 [1, 0, 0, 1, 0, 0],
 [1, 0, 0, 1, 0, 0],
 [0, 1, 1, 0, 0, 0],
 [1, 0, 1, 1, 0, 0],
 [1, 0, 0, 1, 0, 0],
 [1, 0, 0, 0, 0, 1]]

C'est exactement ce que je veux en terme de génération de la liste des variables muettes que je veux, mais voici ma question: Comment puis-je soit (un) de faire un fichier UDF avec les mêmes fonctionnalités que je peux utiliser dans une Étincelle de requête SQL (ou d'une autre façon, je suppose), ou (b) de prendre les RDD résultant de la carte décrite ci-dessus et ajoutez une nouvelle colonne à la user_data dataframe?

De toute façon, ce que je dois faire est de générer un nouveau dataframe contenant les colonnes de user_data, avec une nouvelle colonne (appelons feature_array) contenant le résultat de la fonction ci-dessus (ou quelque chose d'équivalent sur le plan fonctionnel).

OriginalL'auteur moustachio | 2015-10-07