Les requêtes avec les sources de streaming doit être exécutée avec writeStream.start();
Je suis en train de lire les messages de kafka (version 10) spark et d'essayer de l'imprimer.
import spark.implicits._
val spark = SparkSession
.builder
.appName("StructuredNetworkWordCount")
.config("spark.master", "local")
.getOrCreate()
val ds1 = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topicA") .load()
ds1.collect.foreach(println)
ds1.writeStream
.format("console")
.start()
ds1.printSchema()
d'avoir une erreur d'Exception dans le thread "main"
org.apache.spark.sql.AnalysisException: Requêtes avec les sources de streaming
doit être exécutée avec writeStream.start();;
OriginalL'auteur shivali | 2016-11-15
Vous devez vous connecter pour publier un commentaire.
Vous êtes à la ramification, le plan de requête: à partir de la même ds1 vous essayez d':
ds1.collect.foreach(...)
ds1.writeStream.format(...){...}
Mais vous êtes seulement l'appel
.start()
sur la seconde branche, en laissant les autres en balançant sans arrêt, qui à son tour lance l'exception que vous obtenez de retour.La solution est de démarrer les deux branches et d'attendre la terminaison.
.start()
les deux branches? C'est que le downvote?Je seconde le commentaire ici. Pouvons-nous obtenir une bonne solution ici? Peut-être un exemple de code? Merci!
Voir @Rajeev réponse,
awaitTermination
doit être appelée aprèsstart
OriginalL'auteur ssice
je fixe le problème en utilisant le code suivant.
OriginalL'auteur Rajeev Rathor
J'ai eu beaucoup de mal avec cette question. J'ai essayé de chaque solution suggérée à partir de divers blog.
Mais je mon cas il y a peu d'instruction entre l'appel de la méthode start() de la requête et, enfin, à la dernière, j'ai appeler awaitTerminate() fonction que la cause de cela.
S'il vous plaît essayer de cette façon, Il est parfaitement travailler pour moi.
Exemple:
Si vous écrivez dans cette façon à cause de l'exception/erreur:
va jeter exception donnée et fermera votre streaming pilote.
//some statement
s à tout juste sauvé la StreamingQuery à une variable et a immédiatement appelésQueryVar.start()
et rencontré le même problème. Cette résolu - merci!OriginalL'auteur Rajeev Rathor
De bien vouloir retirer
ds1.collect.foreach(println)
etds1.printSchema()
, utilisezoutputMode
etawaitAnyTermination
pour les processus d'arrière-plan en Attente jusqu'à ce que les requêtes sur lesspark.streams
a mis fin àOriginalL'auteur SLU