Comment lire les données à l'aide de Kafka à la Consommation de l'API depuis le début?
Svp quelqu'un peut me dire comment lire les messages à l'aide de la Kafka à la Consommation de l'API depuis le début à chaque fois, quand je lance le consommateur jar.
Vous devez vous connecter pour publier un commentaire.
Cela fonctionne avec la 0.9.x consommateur. Fondamentalement, lorsque vous créez un consommateur, vous devez affecter un groupe de consommateurs id à cette consommation à l'aide de la propriété
ConsumerConfig.GROUP_ID_CONFIG
. Générer la consommation de l'id de groupe au hasard chaque fois que vous démarrez le consommateur de faire quelque chose comme ceproperties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
(propriétés est une instance de java.util.Les propriétés que vous allez passer au constructeurnew KafkaConsumer(properties)
).Générer le client de manière aléatoire signifie que le nouveau groupe de consommateurs qui n'ont pas de décalage associées dans kafka. Donc, ce que nous avons à faire, après c'est pour définir une stratégie pour ce scénario. La documentation pour la
auto.offset.reset
bien dit:Donc parmi les options énumérées ci-dessus, nous devons choisir
earliest
politique de sorte que le nouveau groupe de consommateurs qui commence dès le début à chaque fois.Votre code en java, devrait ressembler à quelque chose comme ceci:
La seule chose que vous devez comprendre maintenant, c'est d'avoir de multiples consommateurs qui appartiennent au même groupe de consommateurs, mais sont répartis de la façon de générer un id aléatoire, et de les distribuer entre ces instances afin qu'ils appartiennent tous au même groupe de consommateurs.
Espère que cela aide!
Une possibilité de le faire, ce serait d'avoir un unique id de groupe chaque fois que vous démarrez ce qui signifie que Kafka aurait vous envoyer les messages dans le sujet depuis le début. Faire quelque chose comme ceci lorsque vous définissez vos propriétés pour
KafkaConsumer
:L'autre option est d'utiliser
consumer.seekToBeginning(consumer.assignment())
mais cela ne fonctionnera pas à moins que Kafka obtient d'abord un battement de coeur de votre consommation en faisant l'appel des consommateurs de la méthode de sondage. Alors appelezpoll()
, puis faire unseekToBeginning()
et puis de nouveau appelpoll()
si vous voulez voir tous les enregistrements à partir du début. C'est un peu hackey mais cela semble être le moyen le plus fiable de le faire que de la version 0.9.Une solution possible est d'utiliser une mise en œuvre de ConsumerRebalanceListener lors de la souscription à un ou plusieurs sujets. Le ConsumerRebalanceListener contient des méthodes de rappel lorsque de nouvelles partitions sont attribués ou retirés à partir d'un consommateur. L'exemple de code suivant illustre cela :
}
Maintenant, chaque fois que les partitions sont affectées à la consommation, chaque partition sera lu depuis le début.
for
-boucle à l'intérieur de laonPartitionsAssigned
méthode peut être remplacé parconsumer.seekToBeginning(partitions)
(œuvres de kafka-2.1.0 dans mes tests).1) https://stackoverflow.com/a/17084401/3821653
2) http://mail-archives.apache.org/mod_mbox/kafka-users/201403.mbox/%3CCAOG_4QYz2ynH45a8kXb8qw7xw4vDRRwNqMn5j9ERFxJ8RfKGCg@mail.gmail.com%3E
Pour réinitialiser le groupe de consommateurs, vous pouvez supprimer la Gardienne de l'id de groupe
tout en utilisant le Haut Niveau des
props.put("auto.offset.reset", "smallest");
en temps de la création de laConsumerConfig
Si vous utilisez la java des consommateurs api plus précisément org.apache.kafka.clients.à la consommation.Consommateurs, Vous pouvez essayer de le chercher* méthodes.
Ici, à la consommation.affectation() retourne toutes les partitions assignées à un consommateur et seekToBeginning va commencer à partir de la première compensation de la collection donnée de partitions.
Donc pour moi ce qui a fonctionné a été une combinaison de ce qui a été indiqué ci-dessus. Le changement majeur a été d'inclure
et ont généré de façon aléatoire une ID de GROUPE à chaque fois. Mais cela seul ne fonctionne pas pour moi. Pour certaines raisons, la première fois j'ai eu le consommateur, il n'a jamais eu de tous les enregistrements. J'ai dû le modifier pour le faire fonctionner -
Je suis nouveau sur KAFKA et n'ai aucune idée de pourquoi cela se passe, mais pour quelqu'un d'autre encore à essayer d'obtenir ce travail, espérons que cette aide.
Si tout simplement d'éviter l'enregistrement de tous les décalages, le consommateur sera toujours remise à zéro au début.