Comment obtenir les données à partir de vieux décalage de point de Kafka?
Je suis en utilisant zookeeper pour obtenir des données de kafka. Et ici, j'ai toujours obtenir les données pour le dernier décalage de point. Est-il possible de spécifier le temps de décalage pour récupérer les anciennes données?
Il y a une option autooffset.réinitialiser. Il accepte plus petit ou plus grand. Quelqu'un peut-il expliquer ce qu'est le plus petit et le plus grand. Peut autooffset.réinitialiser les aide à obtenir des données à partir de vieux décalage de point au lieu de la dernière décalage de point?
Vous devez vous connecter pour publier un commentaire.
Les consommateurs appartiennent toujours à un groupe et, pour chaque partition, la Gardienne d'animaux permet de suivre les progrès de ce groupe de consommateurs dans la partition.
Pour aller chercher depuis le début, vous pouvez supprimer toutes les données associées avec les progrès que Hussain référencé
Vous pouvez également spécifier le décalage de la partition que vous voulez, comme spécifié dans la base/src/main/scala/kafka/outils/UpdateOffsetsInZK.scala
Cependant, le décalage n'est pas le temps indexées, mais vous savez pour chaque partition est une séquence.
Si votre message contient un timestamp (et méfiez-vous que cet horodatage n'a rien à voir avec le moment Kafka reçu votre message), vous pouvez essayer de faire un indexeur qui tente de récupérer une entrée dans les étapes en incrémentant le décalage par N, et de stocker le n-uplet (rubrique X, partie 2, offset 100, timestamp) quelque part.
Lorsque vous souhaitez récupérer les entrées à partir d'un certain moment dans le temps, vous pouvez demander une recherche binaire pour l'ébauche de votre index jusqu'à ce que vous trouviez l'entrée que vous voulez et de chercher à partir de là.
De la Kafka la documentation ils disent
"kafka.l'api.OffsetRequest.EarliestTime() détecte le début des données dans les journaux et commence la diffusion en continu à partir de là, kafka.l'api.OffsetRequest.LatestTime() n'flux de nouveaux messages. Ne présumez pas que l'offset 0 est le début de décalage, puisque les messages d'âge du journal au fil du temps. "
Utiliser le SimpleConsumerExample ici: https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Exemple
Question similaire: Kafka Niveau Élevé de Consommation Récupérer Tous les Messages De Rubrique à l'Aide de l'API Java (Équivalent à --à partir de début)
Cela peut aider
Consulter la doc à propos de kafka config : http://kafka.apache.org/08/configuration.html pour votre requête sur les plus petits et les plus grandes valeurs de paramètre d'offset.
BTW, Tout en explorant de kafka, je me demandais comment faire pour relire tous les messages d'un consommateur. Je veux dire, si un groupe de consommateurs a interrogé tous les messages et il veut ré-obtenir celles-ci.
La façon dont il peut être atteint est de supprimer les données de la gardienne. Utilisez la kafka.utils.ZkUtils classe pour supprimer un nœud sur la gardienne. Ci-dessous est son utilisation :
Pour L'Instant
Kafka FAQ donner une réponse à ce problème.
Futur Plan
Kafka va ajouter un horodatage au format de message. Reportez-vous à
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enrichi+Message+Métadonnées
Kafka Protocole Doc est une grande source de jouer avec de requête/réponse/Offsets/Messages:
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+De+La+Kafka+Protocole de
vous utilisez Simple Consommateur exemple où le code suivant démontrer l'état:
ensemble readOffset pour démarrer le décalage initial de. mais vous devez vérifier que le max offset ainsi que ci-dessus donnera limitée des décalages de compter comme par FetchSize dans le dernier param de addFetch méthode.
À l'aide de la KafkaConsumer vous pouvez utiliser le Chercher, SeekToBeginning et SeekToEnd pour vous déplacer dans le flux.
https://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#seekToBeginning(java.util.Collection)
Aussi, Si aucune partition n'est fourni, il va chercher à la première compenser pour toutes les partitions attribuées.
avez-vous essayé?
bin/kafka-console-consumer.sh --bootstrap-serveur localhost:9092 --rubrique test, à partir de début
Il serait d'imprimer tous les messages de la rubrique "test" dans cet exemple.
Plus de détails à partir de ce lien https://kafka.apache.org/quickstart