Kafka - la Manière la plus Simple pour Obtenir le Dernier Décalage
Je suis en train de construire une application qui permet d'abonnements à kafka sujets à être ajoutés et supprimés de façon dynamique. Lorsqu'un sujet d'abonnement est ajouté que je voulais exécuter un travail par lots chaque heure qui obtient tous les nouveaux messages et les pousse vers une autre banque de données.
Ce que je veux comprendre, c'est comment pour obtenir le décalage actuel d'un sujet. Dès qu'un abonnement est ajouté, je veux que le prochain lot d'emploi pour obtenir tous les messages depuis le temps approximatif de l'abonnement.
Par exemple, imaginez que j'ai une rubrique appelée "TopicA" qui est constamment à la réception de messages. Si j'ajoute un abonnement à 7.15 pm, lorsque le lot tâche s'exécute à 8pm je veux tous les messages depuis 7.15 pm pour être groupées en place. Je suis heureux pour le temps à être approximatif - 7.10, 7.20 etc. 5 ou 10 minutes de chaque côté me cause aucun souci.
Donc ma destinée solution est d'obtenir le décalage actuel de un thème le moment un abonnement est ajouté. J'ai regardé le simple consommateur, mais je ne veux pas m'impliquer dans l'ensemble de la grappe managemnet aspects de cette base de cas d'utilisation.
J'ai aussi regardé le haut niveau de consommation. Je pouvais quelque chose comme ceci:
consumer.createMessageStreamsByFilter(new Whitelist(topicName)).head.head.offset
Ce qui m'inquiète avec cette approche est l'appel à la "tête" est en fait un flux. Donc, je crois qu'il va bloquer en attente pour le prochain message. Le blocage est problématique, car il peut causer d'autres abonnements à être mis en file d'attente jusqu'à ce que le message suivant arrive.
Je suis heureux de passer un peu de temps à la mise en œuvre de cette dernière approche, mais si il y a un moyen plus simple qui ne nécessite pas de m'écrire le risque d'erreur de code simultané, alors je préfère ne pas perdre mon temps.
Je vais également besoin d'un moyen pour obtenir tous les journaux depuis que l'offset.
Vous devez vous connecter pour publier un commentaire.
Chaque réponse à une demande de récupération retourne un "HighWaterMark" qui représente le dernier décalage dans le journal de la partition actuellement consommée.
Donc, en théorie, vous pouvez récupérer le plus tôt message, ou n'importe quel message (en supposant que celle-ci existe) pour un sujet donné, et tirez la HighWaterMark de la réponse.
Il n'y a plus de détails sur la HighWaterMark ici:
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-FetchResponse
Bien sûr, être en mesure de tirer le HighWaterMarkOffset de la réponse dépend de votre client que les données disponibles par le biais de ses propres Kafka API.