Kafka - Retard dans la File d'attente de la mise en œuvre de haut niveau de consommation

Souhaitez mettre en œuvre un retard de consommateur en utilisant le haut niveau de consommation de l'api

idée principale:

  • produire des messages par clé (chaque msg contient le timestamp de création) cela permet de s'assurer que chaque partition a commandé les messages par le produit du temps.
  • auto.commettre.enable=false (explicitement valider après chaque message de processus)
  • consommer un message
  • message de vérification de l'horodatage et de vérifier si assez de temps a passé
  • processus de message (cette opération ne manquera jamais)
  • commettre 1 offset
    while (it.hasNext()) {
      val msg = it.next().message()
      //checks timestamp in msg to see delay period exceeded
      while (!delayedPeriodPassed(msg)) { 
         waitSomeTime() //Thread.sleep or something....
      }
      //certain that the msg was delayed and can now be handled
      Try { process(msg) } //the msg process will never fail the consumer
      consumer.commitOffsets //commit each msg
    }

certaines préoccupations au sujet de cette mise en œuvre:

  1. commettre chaque décalage peut ralentir ZK bas
  2. du consommateur.commitOffsets lever une exception? si oui, je vais consommer le même message deux fois (peut résoudre avec la quantité de messages)
  3. problème d'attente long temps sans commettre le décalage, par exemple le délai est de 24 heures, sera à côté de itérateur, le sommeil pendant 24 heures, de processus et de s'engager (ZK délai d'expiration de session ?)
  4. comment peut-ZK session keep-alive sans commettre de nouvelles compensations ? (définition d'une ruche zookeeper.session.délai d'attente.ms peut résoudre les morts à la consommation sans le reconnaître)
  5. d'autres problèmes im manque?

Merci!

1. à partir de 0.8.2 vous pouvez commettre des décalages de kafka (zk est encore largement utilisé) 2. oui, et c'est un problème fondamental (de l'esprit exactement une fois le traitement) 3. votre zk session va expirer (et si vous avez de nombreux consommateurs dans le message de groupe peut être rééquilibré loin de l'origine de consommateurs). Franchement kafka ne sonne pas comme un bon ajustement si vous avez 1 message par jour
j'ai beaucoup de messages (disons ~10k tr / min), mais dans certains cas, je veux retarder le message de la consommation (par exemple d'avoir un retard de mécanisme de nouvelle tentative après un message d'échec du traitement). si un rééquilibrage se produit ce sera encore du travail, la consommation, le délai de message
dans ce cas, vous allez vous retrouver avec un message d'être planifié sur de nombreux consommateurs: Un consomme de message 1, les horaires d'exécuter dans les 24 heures, ne pas valider le décalage et la session a expiré. B des coups de pied dans, consomme le même message, les horaires d'exécuter dans les 24 heures, ... en fin de compte, cela va se répandre comme un virus. Si vous ne le message de validation, il peut être perdu en cas de dysfonctionnement, vous pouvez choisir celle qui est la préférée pour vous (personnellement je pencherais pour plus tard, il simplifie la sémantique). Est-il une option pour avoir un peu occupé attente?
je n'ai pas de calendrier pour l'exécuter dans les 24 heures. j'ai vérifier le temps qu'il a été soumis (sa partie du message) et de vérifier l'heure actuelle et de voir si les 24 heures ont passé. de cette façon, il l'habitude de se "répandre", comme un virus et seront consommés. comment puis-je régler la session expirent pas ?
il est zookeeper.session.timeout.ms paramètre par défaut est défini à 6 secondes, mais le paramètre de la valeur extrême sonne comme un abus de la technologie (zk ne serait pas en mesure de suivre les consommateurs sont réellement morts à cause de cela).

OriginalL'auteur Nimrod007 | 2015-08-02