Comment enregistrer la dernière compenser cette Étincelle consommée pour ZK ou Kafka et peut lire après le redémarrage
Je suis en utilisant Kafka 0.8.2
pour recevoir des données d'AdExchange puis-je utiliser Spark Streaming 1.4.1
pour stocker des données à MongoDB
.
Mon problème est que lorsque je redémarre mon Spark Streaming
Travail par exemple comme la mise à jour nouvelle version corrige le bug, ajouter de nouvelles fonctionnalités. Il continuera de lire le dernier offset
de kafka
à l'époque alors j'ai perdu des données AdX pousser à kafka pendant le redémarrage de l'emploi.
J'essaie quelque chose comme auto.offset.reset -> smallest
mais il recevra de 0 -> dernier de données est énorme et en double en db.
J'ai aussi essayer de définir un ensemble de group.id
et consumer.id
à Spark
mais il la même.
Comment enregistrer la dernière offset
étincelle consommée pour zookeeper
ou kafka
peut alors relire les dernières offset
?
OriginalL'auteur giaosudau | 2015-08-06
Vous devez vous connecter pour publier un commentaire.
L'un des constructeurs de createDirectStream fonction peut obtenir une carte qui contiendra l'id de partition que la clé et le décalage à partir de laquelle vous commencez à consommer en tant que valeur.
Il suffit de regarder l'api ici: http://spark.apache.org/docs/2.2.0/api/java/org/apache/spark/streaming/kafka/KafkaUtils.html
La carte que je parlais généralement appelée: fromOffsets
Vous pouvez insérer les données de la carte:
Et l'utiliser lorsque vous créez le direct stream:
Après chaque itération, vous pouvez obtenir le traité des décalages de l'aide:
Vous serez en mesure d'utiliser ces données pour construire le fromOffsets carte dans la prochaine itération.
Vous pouvez voir l'intégralité du code et d'utilisation ici: https://spark.apache.org/docs/latest/streaming-kafka-integration.html à la fin de la page
kafkaParams ++= Map[String, String]("auto.commit.interval.ms" -> "1000") kafkaParams ++= Map[String, String]("zookeeper.sync.time.ms" -> "200") kafkaParams ++= Map[String, String]("zookeeper.session.timeout.ms" -> "400")
mais il ne fonctionne pasUne des options est comme je l'ai dit, vous pouvez utiliser le .offsetRanges structure de données. Après avoir traité votre flux dans une itération donnée que vous pouvez faire:
dStream.foreachRDD { rdd => val x = rdd.asInstanceOf[HasOffsetRanges].offsetRanges; // Do something with X (save it external FS for example) }
x tiendra le dernier traitement de décalage pour chaque sujet de la partition de la combinaison de la RDD. Si vous avez besoin d'avoir exactement une fois sémantique, vous devez appuyer manuellement, mais il est possible.Mon idée que je ne veux pas enregistrer de stockage externe parce que ZK et Kafka pouvez gérer cela.
Je crois qu'ils ne le peuvent pas. Spark 1.3.1 modifier son approche sur la façon d'utiliser Kafka comme source de données d'Écrire à l'Avance les Journaux de diriger les flux. Direct stream utilise Kafka SimpleConsumer de recevoir des messages de Kafka. Et vous pouvez lire ici: cwiki.apache.org/confluence/display/KAFKA/... que l'un des bas côtés de l'utilisation de SimpleConsumer est que vous devez garder une trace de vous-même pour les compensations que vous avez déjà consommé. Aussi longtemps que la Spark streaming utilise de simples consommateurs, vous ne trouverez pas une solution de Kafka / ZK point de vue. Mais l'Étincelle, peuvent ajouter leurs propres manipulation sur le haut de Kafka.
Toute la fiabilité de stockage devrait faire le travail. Je suis généralement à l'enregistrement de données dans HDFS parce que je pense que c'est la solution la plus simple. Je ne peux pas penser à une raison pourquoi Redis ne serez pas en mesure de faire le travail aussi bien.
OriginalL'auteur Michael Kopaniov
À ajouter à Michael Kopaniov réponse, si vous voulez vraiment utiliser ZK comme l'endroit où vous stockez et chargez votre carte de décalages à partir, vous pouvez.
Cependant, parce que vos résultats ne sont pas en cours de sortie ZK, vous n'obtiendrez pas fiable sémantique, à moins que votre opération de sortie est idempotent (on dirait qu'il n'est pas).
Si il est possible de stocker vos résultats dans le même document dans mongo côtés les décalages en un seul atomique, qui pourrait être mieux pour vous.
Pour plus de détails, voir https://www.youtube.com/watch?v=fXnNEq1v3VA
OriginalL'auteur Cody Koeninger
Voici un code que vous pouvez utiliser pour stocker des décalages dans ZK http://geeks.aretotally.in/spark-streaming-kafka-direct-api-store-offsets-in-zk/
Et voici un code que vous pouvez utiliser pour utiliser l'offset lorsque vous appelez KafkaUtils.createDirectStream:
http://geeks.aretotally.in/spark-streaming-direct-api-reusing-offset-from-zookeeper/
OriginalL'auteur Felipe Oliveira
Je n'ai pas compris à 100%, mais votre meilleur pari est probablement pour configurer JavaStreamingContext.point de contrôle().
Voir https://spark.apache.org/docs/1.3.0/streaming-programming-guide.html#checkpointing pour un exemple.
Selon certains entrées de blog https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md il y a quelques mises en garde, mais il se sent presque comme il implique une certaine frange des cas qui ne sont évoquées et pas vraiment expliqué.
est-il possible de somme de contrôle le contexte de la fonction et de l'invalider le contexte précédent si la fonction a changé? Auquel cas, il devrait revenir à la lecture des décalages à partir d'un magasin (fs, base de données)
Quelques jours se sont écoulés depuis que j'ai traité ce problème donc je me trompe peut-être mais comme je me souviens bien dans l'ancien Spark streaming (<2.0) Vous pouvez soit créer un nouveau StreamingContext ou de la lecture d'un StreamingContext qui a été précédemment défini du point de contrôle de répertoire. Vous ne créez pas un nouveau StreamingContext pour chaque itération et il suffit de comparer avec le contexte à partir du point de contrôle répertoire, Donc si j'ai bien compris votre question, vous ne pouvez pas invalider précédemment enregistré contexte.
mais ce que vous pouvez faire, est d'avoir quelques configurable par paramètre qui indique si vous souhaitez utiliser le streaming de contexte à partir du poste de contrôle de répertoire ou vous souhaitez en créer une nouvelle de votre propre. Si ce paramètre spécifie que vous souhaitez créer un nouveau contexte, alors vous allez créer si, à partir de (fs, base de données) et d'ignorer le contexte précédent lors de la vérification des données vers le point de contrôle de répertoire.
Pas le même - de la doc: "Si vous activez l'Étincelle de la vérification, les compensations seront stockées dans le point de contrôle. C'est facile à activer, mais il ya des inconvénients. Votre opération de sortie doit être idempotent, puisque vous aurez répété sorties; les opérations ne sont pas une option. En outre, vous ne pouvez pas récupérer à partir d'un point de contrôle si votre demande de code a changé. Pour des mises à niveau prévues, vous pouvez réduire ce en exécutant le nouveau code en même temps que l'ancien code (depuis les sorties doivent être idempotent de toute façon, ils ne devraient pas clash). Mais pour les défaillances imprévues qui nécessitent des modifications de code, vous risquez de perdre des données.."
OriginalL'auteur PatE