Kafka - Retard dans la File d'attente de la mise en œuvre de haut niveau de consommation
Souhaitez mettre en œuvre un retard de consommateur en utilisant le haut niveau de consommation de l'api
idée principale:
- produire des messages par clé (chaque msg contient le timestamp de création) cela permet de s'assurer que chaque partition a commandé les messages par le produit du temps.
- auto.commettre.enable=false (explicitement valider après chaque message de processus)
- consommer un message
- message de vérification de l'horodatage et de vérifier si assez de temps a passé
- processus de message (cette opération ne manquera jamais)
- commettre 1 offset
while (it.hasNext()) { val msg = it.next().message() //checks timestamp in msg to see delay period exceeded while (!delayedPeriodPassed(msg)) { waitSomeTime() //Thread.sleep or something.... } //certain that the msg was delayed and can now be handled Try { process(msg) } //the msg process will never fail the consumer consumer.commitOffsets //commit each msg }
certaines préoccupations au sujet de cette mise en œuvre:
- commettre chaque décalage peut ralentir ZK bas
- du consommateur.commitOffsets lever une exception? si oui, je vais consommer le même message deux fois (peut résoudre avec la quantité de messages)
- problème d'attente long temps sans commettre le décalage, par exemple le délai est de 24 heures, sera à côté de itérateur, le sommeil pendant 24 heures, de processus et de s'engager (ZK délai d'expiration de session ?)
- comment peut-ZK session keep-alive sans commettre de nouvelles compensations ? (définition d'une ruche zookeeper.session.délai d'attente.ms peut résoudre les morts à la consommation sans le reconnaître)
- d'autres problèmes im manque?
Merci!
1. à partir de 0.8.2 vous pouvez commettre des décalages de kafka (zk est encore largement utilisé) 2. oui, et c'est un problème fondamental (de l'esprit exactement une fois le traitement) 3. votre zk session va expirer (et si vous avez de nombreux consommateurs dans le message de groupe peut être rééquilibré loin de l'origine de consommateurs). Franchement kafka ne sonne pas comme un bon ajustement si vous avez 1 message par jour
j'ai beaucoup de messages (disons ~10k tr / min), mais dans certains cas, je veux retarder le message de la consommation (par exemple d'avoir un retard de mécanisme de nouvelle tentative après un message d'échec du traitement). si un rééquilibrage se produit ce sera encore du travail, la consommation, le délai de message
dans ce cas, vous allez vous retrouver avec un message d'être planifié sur de nombreux consommateurs: Un consomme de message 1, les horaires d'exécuter dans les 24 heures, ne pas valider le décalage et la session a expiré. B des coups de pied dans, consomme le même message, les horaires d'exécuter dans les 24 heures, ... en fin de compte, cela va se répandre comme un virus. Si vous ne le message de validation, il peut être perdu en cas de dysfonctionnement, vous pouvez choisir celle qui est la préférée pour vous (personnellement je pencherais pour plus tard, il simplifie la sémantique). Est-il une option pour avoir un peu occupé attente?
je n'ai pas de calendrier pour l'exécuter dans les 24 heures. j'ai vérifier le temps qu'il a été soumis (sa partie du message) et de vérifier l'heure actuelle et de voir si les 24 heures ont passé. de cette façon, il l'habitude de se "répandre", comme un virus et seront consommés. comment puis-je régler la session expirent pas ?
il est
j'ai beaucoup de messages (disons ~10k tr / min), mais dans certains cas, je veux retarder le message de la consommation (par exemple d'avoir un retard de mécanisme de nouvelle tentative après un message d'échec du traitement). si un rééquilibrage se produit ce sera encore du travail, la consommation, le délai de message
dans ce cas, vous allez vous retrouver avec un message d'être planifié sur de nombreux consommateurs: Un consomme de message 1, les horaires d'exécuter dans les 24 heures, ne pas valider le décalage et la session a expiré. B des coups de pied dans, consomme le même message, les horaires d'exécuter dans les 24 heures, ... en fin de compte, cela va se répandre comme un virus. Si vous ne le message de validation, il peut être perdu en cas de dysfonctionnement, vous pouvez choisir celle qui est la préférée pour vous (personnellement je pencherais pour plus tard, il simplifie la sémantique). Est-il une option pour avoir un peu occupé attente?
je n'ai pas de calendrier pour l'exécuter dans les 24 heures. j'ai vérifier le temps qu'il a été soumis (sa partie du message) et de vérifier l'heure actuelle et de voir si les 24 heures ont passé. de cette façon, il l'habitude de se "répandre", comme un virus et seront consommés. comment puis-je régler la session expirent pas ?
il est
zookeeper.session.timeout.ms
paramètre par défaut est défini à 6 secondes, mais le paramètre de la valeur extrême sonne comme un abus de la technologie (zk ne serait pas en mesure de suivre les consommateurs sont réellement morts à cause de cela).OriginalL'auteur Nimrod007 | 2015-08-02
Vous devez vous connecter pour publier un commentaire.
Une façon de le faire serait d'utiliser un autre sujet où vous poussez tous les messages qui doivent être retardée. Si tous les messages différés doivent être traitées après le même temps de retard, ce sera assez simple:
Tous les messages réguliers seront désormais traités dès que possible, tandis que ceux qui a besoin d'un délai est mis sur un autre sujet.
La bonne chose est que nous savons que le message à la tête de la le retard sujet est celui qui doit être traitée en premier depuis sa delayTo valeur sera le plus petit. Par conséquent, nous pouvons mettre en place un autre consommateur qui lit la tête de message, vérifie si le timestamp est dans le passé et, dans l'affirmative traite le message et s'engage à les compenser. Si non, elle n'engage pas le décalage et la place dort jusqu'à ce que le temps:
Dans le cas où il y a de diffrent temps de retard, vous pouvez partitionner le sujet sur le retard (par exemple, 24 heures, 12 heures, 6 heures). Si le temps de retard est plus dynamique que celui qu'elle devient un peu plus complexe. Vous pourriez résoudre par l'introduction d'avoir deux de retard sujets. Lire tous les messages en retard à la coupure sujet
A
et de traiter tous les messages dontdelayTo
valeur sont dans le passé. Parmi les autres que tu viens de trouver celui le plus proche dedelayTo
et ensuite de les placer sur le sujetB
. Le sommeil jusqu'à ce que le plus proche doit être traitée de tout faire dans le sens inverse, c'est à dire de traiter les messages du sujetB
et de mettre à la fois à ne pas encore être traité correctement de retour sur le sujetA
.Pour répondre à vos questions (certains ont été abordés dans les commentaires à votre question)
Vous pourriez envisager de passer à la mémorisation de l'offset dans Kafka (une fonctionnalité disponible à partir de 0.8.2, découvrez
offsets.storage
des biens de consommation config)Je crois qu'il peut s'il n'est pas en mesure de communiquer avec le décalage de stockage par exemple. À l'aide de la quantité de messages permet de résoudre ce problème tu, comme tu dis.
Ce ne sera pas un problème avec la solution décrite ci-dessus, à moins que le traitement du message lui-même prend de plus que le délai d'expiration de session.
À nouveau avec ce qui précède, vous ne devriez pas avoir besoin de définir une longue expiration de la session.
Il en a toujours 😉
ConsumerIterator.peek()
hérité deIteratorTemplate
ne change rien dans leConsumerIterator
. Il sera constamment vous donner la même valeur jusqu'à ce queConsumerIterator.next()
méthode est appelée. Comparer: github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/... github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/... . En bref, il ne déplace pas l'itérateur de l'avant.OriginalL'auteur Emil H
Je suggère une autre voie dans votre cas.
Il ne fait pas de sens pour aborder le temps d'attente dans le thread principal de la consommation. Ce sera un anti-modèle dans la façon dont les files d'attente sont utilisées. Conceptuellement, vous avez besoin de traiter les messages comme étant le plus rapide possible et de garder la file d'attente à faible facteur de charge.
Au lieu de cela, je voudrais utiliser un planificateur de tâches permet de planifier des tâches pour chaque message, vous devez retard. De cette façon, vous pouvez traiter la file d'attente et de créer des tâches asynchrones qui sera déclenché à des dates prédéfinies.
La chute de l'utilisation de cette technique est qu'elle est sensible à l'état de la machine qui détient le les travaux programmés dans la mémoire. Si JVM échoue, vous perdez les tâches planifiées et vous ne savez pas si la tâche a été ou n'a pas été exécutée.
Il y a le planificateur implémentations, mais qui peut être configuré pour s'exécuter dans un environnement de cluster, vous gardant ainsi à l'abri de la JVM se bloque.
Prendre un coup d'oeil à ce java cadre de la planification: http://www.quartz-scheduler.org/
Utilisation Tibco EMS ou d'autres files d'Attente JMS. Ils ont réessayer retard construit en . Kafka n'est pas le bon choix de conception pour ce que vous faites .
Je suis d'accord.
OriginalL'auteur nucatus
Utilisation Tibco EMS ou d'autres files d'Attente JMS. Ils ont réessayer retard construit en . Kafka n'est pas le bon choix de conception pour ce que vous faites
OriginalL'auteur Dhyan
Keyed-liste de l'annexe ou de ses redis alternative peut être la meilleure des approches.
OriginalL'auteur softwarevamp