Comment puis-je passer des paramètres supplémentaires pour Udf Spark SQL?

Je veux analyser les colonnes de date dans un DataFrame, et pour chaque colonne de la date de la résolution pour la date peut changer (c'est à dire 2011/01/10 => 2011 /01 si la résolution est réglée sur "Mois").

J'ai écrit le code suivant:

def convertDataFrame(dataframe: DataFrame, schema : Array[FieldDataType], resolution: Array[DateResolutionType]) : DataFrame =
{
  import org.apache.spark.sql.functions._
  val convertDateFunc = udf{(x:String, resolution: DateResolutionType) => SparkDateTimeConverter.convertDate(x, resolution)}
  val convertDateTimeFunc = udf{(x:String, resolution: DateResolutionType) => SparkDateTimeConverter.convertDateTime(x, resolution)}

  val allColNames = dataframe.columns
  val allCols = allColNames.map(name => dataframe.col(name))

  val mappedCols =
  {
    for(i <- allCols.indices) yield
    {
      schema(i) match
      {
        case FieldDataType.Date => convertDateFunc(allCols(i), resolution(i)))
        case FieldDataType.DateTime => convertDateTimeFunc(allCols(i), resolution(i))
        case _ => allCols(i)
      }
    }
  }

  dataframe.select(mappedCols:_*)

}}

Toutefois, il ne fonctionne pas. Il semble que je ne peux passer Columns à l'Udf. Et je me demande si il sera très lente si je convertir le DataFrame à RDD et d'appliquer la fonction sur chaque ligne.

Quelqu'un sait-il la bonne solution? Merci!!!!

InformationsquelleAutor DarkZero | 2016-02-22