Apache Spark Rejoint exemple avec Java

Je suis très nouveau à Apache Spark. Je voudrais concentrer sur base de l'Étincelle spécification de l'API et que vous voulez comprendre et écrire des programmes en utilisant l'Étincelle de l'API.
J'ai écrit un programme en java à l'aide d'Apache Spark pour mettre en œuvre les Jointures concept.

Lorsque j'utilise une Jointure Externe Gauche -- leftOuterJoin() ou Jointure Externe Droite -- rightOuterJoin(), les deux méthodes retournent un JavaPairRDD qui contient un type spécial de Google Options. Mais je ne sais pas comment faire pour extraire les valeurs d'origine de type Facultatif.

Toute façon, je voudrais savoir puis-je utiliser les mêmes méthodes de jointure qui renvoient les données dans mon propre format. Je n'ai pas trouvé moyen de le faire. Sens, c'est quand je suis à l'aide d'Apache Spark, je ne suis pas en mesure de personnaliser le code dans mon propre style puisqu'ils ont déjà donné tous les pré-défini choses.

Vous trouverez le code ci-dessous

my 2 sample input datasets

customers_data.txt:
4000001,Kristina,Chung,55,Pilot
4000002,Paige,Chen,74,Teacher
4000003,Sherri,Melton,34,Firefighter

and

trasaction_data.txt
00000551,12-30-2011,4000001,092.88,Games,Dice & Dice Sets,Buffalo,New York,credit
00004811,11-10-2011,4000001,180.35,Outdoor Play Equipment,Water Tables,Brownsville,Texas,credit
00034388,09-11-2011,4000002,020.55,Team Sports,Beach Volleyball,Orange,California,cash
00008996,11-21-2011,4000003,121.04,Outdoor Recreation,Fishing,Colorado Springs,Colorado,credit
00009167,05-24-2011,4000003,194.94,Exercise & Fitness,Foam Rollers,El Paso,Texas,credit

Voici mon code Java

**SparkJoins.java:**
public class SparkJoins {
@SuppressWarnings("serial")
public static void main(String[] args) throws FileNotFoundException {
JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("Spark Count").setMaster("local"));
JavaRDD<String> customerInputFile = sc.textFile("C:/path/customers_data.txt");
JavaPairRDD<String, String> customerPairs = customerInputFile.mapToPair(new PairFunction<String, String, String>() {
public Tuple2<String, String> call(String s) {
String[] customerSplit = s.split(",");
return new Tuple2<String, String>(customerSplit[0], customerSplit[1]);
}
}).distinct();
JavaRDD<String> transactionInputFile = sc.textFile("C:/path/transactions_data.txt");
JavaPairRDD<String, String> transactionPairs = transactionInputFile.mapToPair(new PairFunction<String, String, String>() {
public Tuple2<String, String> call(String s) {
String[] transactionSplit = s.split(",");
return new Tuple2<String, String>(transactionSplit[2], transactionSplit[3]+","+transactionSplit[1]);
}
});
//Default Join operation (Inner join)
JavaPairRDD<String, Tuple2<String, String>> joinsOutput = customerPairs.join(transactionPairs);
System.out.println("Joins function Output: "+joinsOutput.collect());
//Left Outer join operation
JavaPairRDD<String, Iterable<Tuple2<String, Optional<String>>>> leftJoinOutput = customerPairs.leftOuterJoin(transactionPairs).groupByKey().sortByKey();
System.out.println("LeftOuterJoins function Output: "+leftJoinOutput.collect());
//Right Outer join operation
JavaPairRDD<String, Iterable<Tuple2<Optional<String>, String>>> rightJoinOutput = customerPairs.rightOuterJoin(transactionPairs).groupByKey().sortByKey();
System.out.println("RightOuterJoins function Output: "+rightJoinOutput.collect());
sc.close();
}
}

Et d'ici la sortie, ce qui j'en suis

Joins function Output: [(4000001,(Kristina,092.88,12-30-2011)), (4000001,(Kristina,180.35,11-10-2011)), (4000003,(Sherri,121.04,11-21-2011)), (4000003,(Sherri,194.94,05-24-2011)), (4000002,(Paige,020.55,09-11-2011))]
LeftOuterJoins function Output: [(4000001,[(Kristina,Optional.of(092.88,12-30-2011)), (Kristina,Optional.of(180.35,11-10-2011))]), (4000002,[(Paige,Optional.of(020.55,09-11-2011))]), (4000003,[(Sherri,Optional.of(121.04,11-21-2011)), (Sherri,Optional.of(194.94,05-24-2011))])]
RightOuterJoins function Output: [(4000001,[(Optional.of(Kristina),092.88,12-30-2011), (Optional.of(Kristina),180.35,11-10-2011)]), (4000002,[(Optional.of(Paige),020.55,09-11-2011)]), (4000003,[(Optional.of(Sherri),121.04,11-21-2011), (Optional.of(Sherri),194.94,05-24-2011)])]

Je suis en cours d'exécution de ce programme sur la plateforme Windows

Veuillez observer au-dessus de la sortie et m'aider à extraire les valeurs de type Facultatif

Merci d'avance

Pourquoi ne pas utiliser Scala à la place?
Salut @maasg, je suis fondamentalement un développeur java.. je ne sais vraiment pas Scala.. Mais je pense que Apache Spark est le plus approprié pour la Scala de programmation Java.
pouvez vous s'il vous plaît mettre à jour votre code avec comment avez-vous supprimé cette Option .. qui sera utile pour les autres.

OriginalL'auteur Shekar Patel | 2015-02-05