Kafka Niveau Élevé de Consommation Récupérer Tous les Messages De Rubrique à l'Aide de l'API Java (Équivalent à --à partir de début)
Je suis en train de tester le Kafka de Haut Niveau de Consommation à l'aide de la ConsumerGroupExample code de la Kafka site. Je voudrais récupérer tous les messages existants sur le sujet appelé "test" que j'ai dans la Kafka dans la configuration du serveur. Regarder d'autres blogs, de l'automobile.décalage.la réinitialisation doit être réglé sur "plus petit" pour être en mesure d'obtenir tous les messages:
private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
Properties props = new Properties();
props.put("zookeeper.connect", a_zookeeper);
props.put("group.id", a_groupId);
props.put("auto.offset.reset", "smallest");
props.put("zookeeper.session.timeout.ms", "10000");
return new ConsumerConfig(props);
}
La question, j'ai vraiment de ceci: ce qui est l'équivalent de l'api Java appel pour le Haut Niveau de Consommation, qui est l'équivalent de:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --rubrique test, à partir de début
Vous devez vous connecter pour publier un commentaire.
En gros, a chaque fois qu'un nouveau type de consommateur essaie de consommer un sujet, il va lire les messages depuis le début. Si vous êtes particulièrement la consommation depuis le début à chaque fois pour des fins de test, chaque fois que vous utilisez votre consommation avec une nouvelle id de groupe, il va lire les messages depuis le début. Voici comment je l'ai fait :
et de lire les messages depuis le début à chaque fois!
Ressemble, vous devez utiliser le "faible niveau de SimpleConsumer API"
Cet exemple travaillé pour obtenir tous les messages d'un sujet avec les arguments suivants: (à noter que le port est le Kafka de port, pas la Gardienne du port, des sujets mis en place à partir cet exemple):
Plus précisément, il existe une méthode pour obtenir readOffset qui prend kafka.l'api.OffsetRequest.EarliestTime():
Voici un autre poste peut fournir certains autres idées sur la façon de régler cette question: Comment obtenir les données à partir de vieux décalage de point de Kafka?
Pour extraire les messages depuis le début, vous pouvez le faire:
puis il suffit de suivre le travail de routine...
Cette propriétés de vous aider.