Apache Spark Rejoint exemple avec Java
Je suis très nouveau à Apache Spark. Je voudrais concentrer sur base de l'Étincelle spécification de l'API et que vous voulez comprendre et écrire des programmes en utilisant l'Étincelle de l'API.
J'ai écrit un programme en java à l'aide d'Apache Spark pour mettre en œuvre les Jointures concept.
Lorsque j'utilise une Jointure Externe Gauche -- leftOuterJoin() ou Jointure Externe Droite -- rightOuterJoin(), les deux méthodes retournent un JavaPairRDD qui contient un type spécial de Google Options. Mais je ne sais pas comment faire pour extraire les valeurs d'origine de type Facultatif.
Toute façon, je voudrais savoir puis-je utiliser les mêmes méthodes de jointure qui renvoient les données dans mon propre format. Je n'ai pas trouvé moyen de le faire. Sens, c'est quand je suis à l'aide d'Apache Spark, je ne suis pas en mesure de personnaliser le code dans mon propre style puisqu'ils ont déjà donné tous les pré-défini choses.
Vous trouverez le code ci-dessous
my 2 sample input datasets
customers_data.txt:
4000001,Kristina,Chung,55,Pilot
4000002,Paige,Chen,74,Teacher
4000003,Sherri,Melton,34,Firefighter
and
trasaction_data.txt
00000551,12-30-2011,4000001,092.88,Games,Dice & Dice Sets,Buffalo,New York,credit
00004811,11-10-2011,4000001,180.35,Outdoor Play Equipment,Water Tables,Brownsville,Texas,credit
00034388,09-11-2011,4000002,020.55,Team Sports,Beach Volleyball,Orange,California,cash
00008996,11-21-2011,4000003,121.04,Outdoor Recreation,Fishing,Colorado Springs,Colorado,credit
00009167,05-24-2011,4000003,194.94,Exercise & Fitness,Foam Rollers,El Paso,Texas,credit
Voici mon code Java
**SparkJoins.java:**
public class SparkJoins {
@SuppressWarnings("serial")
public static void main(String[] args) throws FileNotFoundException {
JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("Spark Count").setMaster("local"));
JavaRDD<String> customerInputFile = sc.textFile("C:/path/customers_data.txt");
JavaPairRDD<String, String> customerPairs = customerInputFile.mapToPair(new PairFunction<String, String, String>() {
public Tuple2<String, String> call(String s) {
String[] customerSplit = s.split(",");
return new Tuple2<String, String>(customerSplit[0], customerSplit[1]);
}
}).distinct();
JavaRDD<String> transactionInputFile = sc.textFile("C:/path/transactions_data.txt");
JavaPairRDD<String, String> transactionPairs = transactionInputFile.mapToPair(new PairFunction<String, String, String>() {
public Tuple2<String, String> call(String s) {
String[] transactionSplit = s.split(",");
return new Tuple2<String, String>(transactionSplit[2], transactionSplit[3]+","+transactionSplit[1]);
}
});
//Default Join operation (Inner join)
JavaPairRDD<String, Tuple2<String, String>> joinsOutput = customerPairs.join(transactionPairs);
System.out.println("Joins function Output: "+joinsOutput.collect());
//Left Outer join operation
JavaPairRDD<String, Iterable<Tuple2<String, Optional<String>>>> leftJoinOutput = customerPairs.leftOuterJoin(transactionPairs).groupByKey().sortByKey();
System.out.println("LeftOuterJoins function Output: "+leftJoinOutput.collect());
//Right Outer join operation
JavaPairRDD<String, Iterable<Tuple2<Optional<String>, String>>> rightJoinOutput = customerPairs.rightOuterJoin(transactionPairs).groupByKey().sortByKey();
System.out.println("RightOuterJoins function Output: "+rightJoinOutput.collect());
sc.close();
}
}
Et d'ici la sortie, ce qui j'en suis
Joins function Output: [(4000001,(Kristina,092.88,12-30-2011)), (4000001,(Kristina,180.35,11-10-2011)), (4000003,(Sherri,121.04,11-21-2011)), (4000003,(Sherri,194.94,05-24-2011)), (4000002,(Paige,020.55,09-11-2011))]
LeftOuterJoins function Output: [(4000001,[(Kristina,Optional.of(092.88,12-30-2011)), (Kristina,Optional.of(180.35,11-10-2011))]), (4000002,[(Paige,Optional.of(020.55,09-11-2011))]), (4000003,[(Sherri,Optional.of(121.04,11-21-2011)), (Sherri,Optional.of(194.94,05-24-2011))])]
RightOuterJoins function Output: [(4000001,[(Optional.of(Kristina),092.88,12-30-2011), (Optional.of(Kristina),180.35,11-10-2011)]), (4000002,[(Optional.of(Paige),020.55,09-11-2011)]), (4000003,[(Optional.of(Sherri),121.04,11-21-2011), (Optional.of(Sherri),194.94,05-24-2011)])]
Je suis en cours d'exécution de ce programme sur la plateforme Windows
Veuillez observer au-dessus de la sortie et m'aider à extraire les valeurs de type Facultatif
Merci d'avance
Salut @maasg, je suis fondamentalement un développeur java.. je ne sais vraiment pas Scala.. Mais je pense que Apache Spark est le plus approprié pour la Scala de programmation Java.
pouvez vous s'il vous plaît mettre à jour votre code avec comment avez-vous supprimé cette Option .. qui sera utile pour les autres.
OriginalL'auteur Shekar Patel | 2015-02-05
Vous devez vous connecter pour publier un commentaire.
Lorsque vous faites une jointure externe gauche et à droite de la jointure externe, vous pourriez avoir des valeurs null. droite!
Donc étincelle retourne objet Facultatif. après l'obtention de ce résultat, vous pouvez mapper ce résultat à votre propre format.
votre pouvez utiliser isPresent() la méthode de l'Option à la carte de vos données.
Voici l'exemple :
comment la carte que résultat de notre propre format? je suis confrontée au même problème.
J'ai ajouté l'exemple dans la réponse ci-dessus. mappedRDD est votre propre format.
OriginalL'auteur sms_1190
En Java, nous pouvons également mettre en œuvre des Jointures à l'aide DataFrames comme suit:
1) créer l'étincelle de la session de:
2) j'ai pris l'Employé d'entrée:
101,Alan,Franklyn Street,Melbourne,QLD
104,Stuart,Lonsdale Street,Sydney,NSW
créer DataFrame:
où l'Employé est la classe POJO contenant setter, de lecture le long avec le constructeur.
3) de la même manière créer un autre DF pour la deuxième tableau (disons salaire)
4) s'Appliquent INNER join sur des éléments distincts de deux points de vue:
5) de même, la jointure externe gauche:
OriginalL'auteur RPaul