Comment créer corriger le bloc de données pour la classification Spark ML

Je suis en train de lancer aléatoire de classification des forêts en utilisant Spark ML api mais je suis d'avoir des problèmes avec la création de droit de la trame de données d'entrée dans le pipeline.

Voici un exemple de données:

age,hours_per_week,education,sex,salaryRange
38,40,"hs-grad","male","A"
28,40,"bachelors","female","A"
52,45,"hs-grad","male","B"
31,50,"masters","female","B"
42,40,"bachelors","male","B"

âge et hours_per_week sont des entiers alors que d'autres fonctionnalités, y compris l'étiquette salaryRange sont catégoriques (String)

Le chargement de ce fichier csv (appelons cela de l'échantillon.csv) peut être fait par Spark csv bibliothèque comme ceci:

val data = sqlContext.csvFile("/home/dusan/sample.csv")

Par défaut, toutes les colonnes sont importés en tant que chaîne de caractères, donc nous avons besoin de changer "âge" et "hours_per_week" Int:

val toInt    = udf[Int, String]( _.toInt)
val dataFixed = data.withColumn("age", toInt(data("age"))).withColumn("hours_per_week",toInt(data("hours_per_week")))

Juste pour vérifier comment schéma ressemble maintenant:

scala> dataFixed.printSchema
root
 |-- age: integer (nullable = true)
 |-- hours_per_week: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- salaryRange: string (nullable = true)

Puis vous permet de définir la croix-programme de validation et de pipeline:

val rf = new RandomForestClassifier()
val pipeline = new Pipeline().setStages(Array(rf)) 
val cv = new CrossValidator().setNumFolds(10).setEstimator(pipeline).setEvaluator(new BinaryClassificationEvaluator)

D'erreur s'affiche lors de l'exécution de cette ligne:

val cmModel = cv.fit(dataFixed)

java.lang.IllegalArgumentException: Champ "caractéristiques" n'existe pas.

Il est possible de définir l'étiquette de la colonne et de la fonctionnalité de la colonne dans RandomForestClassifier ,cependant j'ai 4 colonnes comme prédicteurs (caractéristiques) pas un seul.

Comment je dois organiser mon bloc de données de sorte qu'il a l'étiquette et les caractéristiques des colonnes organisé correctement?

Pour votre confort, voici le code complet :

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.tuning.CrossValidator
import org.apache.spark.ml.Pipeline
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.mllib.linalg.{Vector, Vectors}
object SampleClassification {
def main(args: Array[String]): Unit = {
//set spark context
val conf = new SparkConf().setAppName("Simple Application").setMaster("local");
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
import com.databricks.spark.csv._
//load data by using databricks "Spark CSV Library" 
val data = sqlContext.csvFile("/home/dusan/sample.csv")
//by default all columns are imported as string so we need to change "age" and  "hours_per_week" to Int
val toInt    = udf[Int, String]( _.toInt)
val dataFixed = data.withColumn("age", toInt(data("age"))).withColumn("hours_per_week",toInt(data("hours_per_week")))
val rf = new RandomForestClassifier()
val pipeline = new Pipeline().setStages(Array(rf))
val cv = new CrossValidator().setNumFolds(10).setEstimator(pipeline).setEvaluator(new BinaryClassificationEvaluator)
//this fails with error
//java.lang.IllegalArgumentException: Field "features" does not exist.
val cmModel = cv.fit(dataFixed) 
}
}

Merci pour l'aide!