Comment puis-je me connecter à une base de données postgreSQL dans Apache Spark utilisation de scala?
Je veux savoir comment puis-je faire les choses suivantes dans la scala?
- Se connecter à une base de données postgreSQL à l'aide de l'Étincelle scala.
- Écrire des requêtes SQL comme SELECT , UPDATE, etc. pour modifier une table dans
cette base de données.
Je sais faire à l'aide de la scala, mais comment faire pour importer le connecteur pot de psql scala en sbt, tandis que l'emballage il?
- Pourquoi les downvotes? Je pense que c'est une bonne question. Il est assez générique, mais alors la réponse peut également être générique et aider beaucoup d'utilisateurs.
- vous êtes-vous retrouvée à l'aide de mysql ou postgres? Si postgres est-il possible d'avoir un oeil à votre sbt et l'exemple de code?
Vous devez vous connecter pour publier un commentaire.
Notre objectif consiste à exécuter en parallèle des requêtes SQL à partir de l'Étincelle travailleurs.
Construire le programme d'installation
Ajouter le connecteur JDBC et à la
libraryDependencies
dansbuild.sbt
. J'ai seulement essayé cela avec MySQL, donc je vais utiliser que dans mes exemples, mais Postgres devrait être sensiblement le même.Code
Lorsque vous créez le
SparkContext
vous dira quels pots pour copier les exécuteurs testamentaires. Inclure le connecteur jar. Une belle façon de le faire:Maintenant Étincelle est prêt à se connecter à la base de données. Chaque exécuteur testamentaire exécuter la partie de la requête, de sorte que les résultats sont prêtes pour le calcul distribué.
Il y a deux options pour cela. L'ancienne approche est d'utiliser
org.apache.spark.rdd.JdbcRDD
:Consultez la documentation pour les paramètres. Brièvement:
SparkContext
.SELECT * FROM FOO WHERE 0 <= KEY AND KEY <= 100
dans l'exemple.ResultSet
en quelque chose. Dans l'exemple que nous avons de le convertir en unString
, si vous vous retrouvez avec unRDD[String]
.Depuis Apache Spark version 1.3.0 une autre méthode est disponible à travers le DataFrame de l'API. Au lieu de la
JdbcRDD
vous devez créer unorg.apache.spark.sql.DataFrame
:Voir https://spark.apache.org/docs/1.3.1/sql-programming-guide.html#jdbc-to-other-databases pour la liste complète des options (la clé de la gamme et le nombre de partitions peut être définie comme avec
JdbcRDD
).Mises à jour
JdbcRDD
ne prend pas en charge les mises à jour. Mais vous pouvez simplement les faire dans unforeachPartition
.(Ce qui crée une connexion par partition. Si c'est un problème, utilisez un pool de connexion!)
DataFrame
s charge les mises à jour par le biais de lacreateJDBCTable
etinsertIntoJDBC
méthodes.ConnectionPool
objet qu'ils utilisent est un personnage fictif de l'API. En tant que tel, je préfère ne pas l'inclure dans ma réponse. Mais j'ai ajouté un paragraphe à suggérer à l'aide d'un pool de connexions.