Pourquoi ne Spark application échoue avec “ClassNotFoundException: impossible de trouver la source de données: kafka” comme uber-pot avec sbt assemblée?
Je suis en train de lancer un échantillon comme https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredKafkaWordCount.scala. J'ai commencé avec l'Étincelle Structuré Streaming guide de Programmation à http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html.
Mon code est
package io.boontadata.spark.job1
import org.apache.spark.sql.SparkSession
object DirectKafkaAggregateEvents {
val FIELD_MESSAGE_ID = 0
val FIELD_DEVICE_ID = 1
val FIELD_TIMESTAMP = 2
val FIELD_CATEGORY = 3
val FIELD_MEASURE1 = 4
val FIELD_MEASURE2 = 5
def main(args: Array[String]) {
if (args.length < 3) {
System.err.println(s"""
|Usage: DirectKafkaAggregateEvents <brokers> <subscribeType> <topics>
| <brokers> is a list of one or more Kafka brokers
| <subscribeType> sample value: subscribe
| <topics> is a list of one or more kafka topics to consume from
|
""".stripMargin)
System.exit(1)
}
val Array(bootstrapServers, subscribeType, topics) = args
val spark = SparkSession
.builder
.appName("boontadata-spark-job1")
.getOrCreate()
import spark.implicits._
//Create DataSet representing the stream of input lines from kafka
val lines = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option(subscribeType, topics)
.load()
.selectExpr("CAST(value AS STRING)")
.as[String]
//Generate running word count
val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()
//Start running the query that prints the running counts to the console
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.start()
query.awaitTermination()
}
}
J'ai ajouté ce qui suit sbt fichiers:
construire.sbt:
name := "boontadata-spark-job1"
version := "0.1"
scalaVersion := "2.11.7"
libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "2.0.2" % "provided"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.0.2" % "provided"
libraryDependencies += "org.apache.spark" % "spark-sql_2.11" % "2.0.2" % "provided"
libraryDependencies += "org.apache.spark" % "spark-sql-kafka-0-10_2.11" % "2.0.2"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % "2.0.2"
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.10.1.1"
libraryDependencies += "org.apache.kafka" % "kafka_2.11" % "0.10.1.1"
//META-INF discarding
assemblyMergeStrategy in assembly := {
{
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.first
}
}
J'ai aussi ajouté du projet/de l'assemblée.sbt
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3")
Cela crée un Uber pot avec le non provided
pots.
Je me soumets avec la ligne suivante:
spark-submit boontadata-spark-job1-assembly-0.1.jar ks1:9092,ks2:9092,ks3:9092 subscribe sampletopic
mais j'obtiens cette erreur d'exécution:
Exception in thread "main" java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at https://cwiki.apache.org/confluence/display/SPARK/Third+Party+Projects
at org.apache.spark.sql.execution.datasources.DataSource.lookupDataSource(DataSource.scala:148)
at org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:79)
at org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:79)
at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:218)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:80)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:80)
at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:124)
at io.boontadata.spark.job1.DirectKafkaAggregateEvents$.main(StreamingJob.scala:41)
at io.boontadata.spark.job1.DirectKafkaAggregateEvents.main(StreamingJob.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:736)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: kafka.DefaultSource
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5$$anonfun$apply$1.apply(DataSource.scala:132)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5$$anonfun$apply$1.apply(DataSource.scala:132)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5.apply(DataSource.scala:132)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5.apply(DataSource.scala:132)
at scala.util.Try.orElse(Try.scala:84)
at org.apache.spark.sql.execution.datasources.DataSource.lookupDataSource(DataSource.scala:132)
... 18 more
16/12/23 13:32:48 INFO spark.SparkContext: Invoking stop() from shutdown hook
Est-il un moyen de savoir quelle est la classe pas trouvé de sorte que je peux rechercher le maven.org repo pour la classe.
La lookupDataSource
code source semble être à la ligne 543 à https://github.com/apache/spark/blob/83a6ace0d1be44f70e768348ae6688798c84343e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala mais je ne pouvais pas trouver un lien direct avec Kafka de la source de données...
Code source complet est ici: https://github.com/boontadata/boontadata-streams/tree/ad0d0134ddb7664d359c8dca40f1d16ddd94053f
Salut @jithinpt, veuillez voir les commentaires dans la réponse marqué comme réponse.
OriginalL'auteur benjguin | 2016-12-23
Vous devez vous connecter pour publier un commentaire.
J'ai essayé comme ça, c'est de travailler pour moi. Présenter comme ça, et laissez-moi savoir une fois que vous avez des questions
J'appellerais cela une solution de contournement et ne permet pas d'expliquer pourquoi l'uber-jar ne fonctionne pas. La cause est
assemblyMergeStrategy
dansbuild.sbt
supprime toutes lesMETA-INF
fichiers incl. les inscriptions. Voir ma réponse ci-dessous.OriginalL'auteur Sree Eedupuganti
La question est la section suivante dans
build.sbt
:Il est dit que tous les
META-INF
entrées devrait être rejeté, y compris le "code" qui en fait une source de données d'alias (par exemplekafka
).Mais la
META-INF
fichiers sont très importants pourkafka
(et d'autres alias en streaming de sources de données) pour travailler.Pour
kafka
alias de travail Spark SQL utilise META-INF/services/org.apache.spark.sql.des sources.DataSourceRegister avec l'entrée suivante:KafkaSourceProvider
est chargée de l'enregistrement deskafka
alias avec le bon flux de données source, c'est à dire KafkaSource.Juste pour vérifier que le code réel est en effet disponible, mais le "code" qui rend l'alias enregistré n'est pas, vous pouvez utiliser le
kafka
de la source de données par le nom pleinement qualifié (pas l'alias) comme suit:Vous verrez d'autres problèmes en raison du manque d'options comme
kafka.bootstrap.servers
, mais...nous sommes s'égarer.Une solution est de
MergeStrategy.concat
tousMETA-INF/services/org.apache.spark.sql.sources.DataSourceRegister
(qui permettrait de créer un uber-pot avec toutes les sources de données, incl. lekafka
source de données).OriginalL'auteur Jacek Laskowski
Dans mon cas, j'ai aussi eu cette erreur lors de la compilation avec sbt, et la cause était que
sbt assembly
n'a pas compris laspark-sql-kafka-0-10_2.11
artefact dans le cadre de la graisse pot.(Je serait très bienvenue dans les commentaires de cet article. La dépendance n'a pas été spécifié une étendue, de sorte qu'il ne devrait pas être supposé être "offert").
Donc j'ai changé pour le déploiement d'une normale (slim) bocal et y compris les dépendances avec le
--jars
paramètres d'étincelle soumettre.Afin de recueillir toutes les dépendances dans un endroit, vous pouvez ajouter
retrieveManaged := true
à votre sbt paramètres du projet, ou vous pouvez, dans le tribunal de la console, question:Qui devrait apporter toutes les dépendances de la
lib_managed
dossier.Ensuite, vous pouvez copier tous les fichiers (avec une commande bash, vous pouvez par exemple utiliser quelque chose comme cela
OriginalL'auteur ssice
Je suis en utilisant étincelle 2.1 et face au même problème
ma solution est
1)
spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0
2)
cd ~/.ivy2/jars
vous êtes ici ,toutes les bocaux sont dans ce dossier, maintenant3) copiez tous les bocaux dans ce dossier à tous les nœuds(pouvez créer un dossier spécifique qui les détient)
4) ajouter le nom du dossier à
spark.driver.extraClassPath
etspark.driver.extraClassPath
,par exemplespark.driver.extraClassPath=/opt/jars/*:your_other_jars
5
spark-submit --class ClassNm --Other-Options YourJar.jar
fonctionne très bien maintenantOriginalL'auteur dalin qin
Je l'ai résolu en téléchargeant le fichier jar du pilote de système. À partir de là, j'ai fourni le pot d'étincelle soumettre --pot option.
Également être noté, c'est que j'étais l'emballage de l'ensemble de l'étincelle 2.1 environnement dans mon uber pot (depuis mon cluster est toujours sur 1.6.1) Pour une raison quelconque, il n'est pas repris dans le cadre d'uber jar.
OriginalL'auteur Gyan
C'est en vue de Jacek Laskowski de réponse.
Ceux d'entre vous la construction de votre projet maven peut essayer.
Ajouter la ligne mentionnée ci-dessous à votre maven-ombre-plugin.
META-INF/services/org.apache.spark.sql.des sources.DataSourceRegister
J'ai posé le code du plugin pour le pom fichier comme un exemple pour montrer l'endroit où ajouter la ligne.
Veuillez excuser mon formatage des compétences.
OriginalL'auteur Algomeister