Ne Kafka soutien priorité pour le sujet ou le message?
J'ai été d'explorer le fait de savoir si Kafka prise en charge prioritaire pour toute la file d'attente de message ou de processus.
Il semble qu'il ne supporte pas une telle chose. J'ai googlé et trouvé ce mail archive, qui prend en charge aussi ceci:
http://mail-archives.apache.org/mod_mbox/incubator-kafka-users/201206.mbox/%3CCAOeJiJhVHsr=d6aSTihPsqWVg6vK5xYLam6yMDcd6UAUoXf-DQ@mail.gmail.com%3E
Est-ce que quelqu'configuré de Kafka pour hiérarchiser tout sujet ou message?
C'est un ordre séquentiel, il y a un bon d'écrire jusqu'à la conception ici
OriginalL'auteur aviundefined | 2015-06-04
Vous devez vous connecter pour publier un commentaire.
Kafka est un rapide, évolutive et distribuée dans la nature, par sa conception, partitionné et répliquées journal de validation de service.Donc, il n'y a pas de priorité d'un sujet ou d'un message.
J'ai également rencontrés le même problème que vous avez.La Solution est très simple.Créer des rubriques dans kafka file d'attente,disons:
1) high_priority_queue
2) medium_priority_queue
3) low_priority_queue
Publier le message de priorité élevée dans high_priority_queue et moyennes message de priorité dans medium_priority_queue.
Maintenant, vous pouvez créer kafka consommateur et open stream pour tous les sujet.
Vous obtenez flux de chaque sujet.Maintenant, vous pouvez commencer par lire high_priority sujet si le sujet n'a pas de message, puis le repli medium_priority_queue sujet. si medium_priority_queue est vide alors lire low_priority file d'attente.
Cette astuce fonctionne très bien pour moi.Peut être utile pour vous!!.
Vous pouvez utiliser le pool de threads en parallèle de la consommation. Prendre un coup d'oeil : cwiki.apache.org/confluence/display/KAFKA/...
Il ressemble à l'ancien consommateur de l'API - il n'est pas recommandé de façon à l'aide de la nouvelle Consommation de l'API? J'ai remarqué que les méthodes d'interrompre et de reprendre, mais ne sais pas comment faire pour savoir quand est le bon moment pour appeler pause - plus précisément, comment le trouver, qu'il y a de nouveaux messages dans plus de sujets prioritaires?
OriginalL'auteur Sky
vous avez besoin d'avoir autant de sujets et de les diffuser en fonction de leur priorité
OriginalL'auteur Yogesh BG
Vous pouvez payer priorité-kafka-client pour priorité de la consommation de sujets.
Idée de base est la suivante (copier/coller les parties du README):
Dans ce contexte, la priorité est un entier positif (N) avec des niveaux de priorité
0 < 1 < ... < N-1
PriorityKafkaProducer (implements org.apache.kafka.clients.producer.Producer):
La mise en œuvre prend en supplémentaires de l'argument de niveau de priorité
Future<RecordMetadata> send(int priority, ProducerRecord<K, V> record)
. C'est une indication pour produire de l'enregistrement sur ce niveau de priorité.Future<RecordMetadata> send(int priority, ProducerRecord<K, V> record)
par défaut de la production record sur le plus bas niveau de priorité 0. Pour chaque sujet logique XYZ - niveau de priorité 0 <= i < N est soutenu par Kafka sujetXYZ-i
CapacityBurstPriorityKafkaConsumer (implements org.apache.kafka.clients.consumer.Consumer):
La mise en œuvre entretient une KafkaConsumer pour chaque niveau de priorité 0 <= i < N. Pour chaque sujet logique XYZ et logique de l'ID de groupe ABC - niveau de priorité 0 <= i < N les consommateurs se lie à Kafka sujet
XYZ-i
avec l'ID de groupeABC-i
. Cela fonctionne en tandem avec PriorityKafkaProducer.max.poll.records
propriété est divisée en priorité sujet les consommateurs en fonction demaxPollRecordsDistributor
- par défaut,ExpMaxPollRecordsDistributor
. Reste de la KafkaConsumer configs sont passés comme à chaque thème prioritaire des consommateurs. Les soins doivent être prises lors de la définition demax.partition.fetch.bytes
,fetch.max.bytes
etmax.poll.interval.ms
que ces valeurs seront utilisées comme c'est à travers tous le thème prioritaire des consommateurs.Travaille sur l'idée de répartir
max.poll.records
de propriété sur chaque thème prioritaire aux consommateurs que leur capacité réservée. Les enregistrements sont lues séquentiellement à partir de tous les niveaux de priorité des sujets les consommateurs qui sont configurés avec la distribution demax.poll.records
valeurs. La répartition de la réserve de capacité supérieure ou le taux de traitement des priorités.Attention 1 - si nous avons faussé les partitions dans le niveau de priorité des sujets par exemple 10K enregistrements dans un la priorité 2 de la partition, 100 enregistrements dans un la priorité 1 partition, 10 enregistrements dans un la priorité 0 partition qui sont affectés à des consommateurs différents threads, puis la mise en œuvre ne sera pas synchroniser à travers ces consommateurs pour réguler la capacité et donc ne respecte pas la priorité. Les producteurs doivent s'assurer il n'y a pas biaisée des partitions (par exemple à l'aide de la méthode round-robin - ce "peut" signifie il n'y a pas de message de commande hypothèses et le consommateur peut choisir de traiter les dossiers en parallèle par la séparation de l'extraction et de la transformation des préoccupations).
Attention 2 - Si nous avons des partitions vides dans le niveau de priorité des sujets par exemple, pas en attente d'enregistrements dans une attribution de priorité 2 et 1 partitions, 10K dossiers en priorité 0 partition qui sont affectés à la même thread consommateur, alors nous voulons priorité 0 sujet de la partition de la consommation d'éclater sa capacité à
max.poll.records
et de ne pas se limiter à son réservé la capacité est basée surmaxPollRecordsDistributor
d'autre de l'ensemble de la capacité sous-utilisée.Cette mise en œuvre va essayer de régler les précautions expliqué ci-dessus. Chaque objet de consommation distinctes pour un même niveau de priorité sujet des consommateurs, avec chaque niveau de priorité des consommateurs ayant réservé la capacité est basée sur maxPollRecordsDistributor. Chaque niveau de priorité sujet consommateur va essayer d'éclater dans d'autres niveau de priorité sujet du consommateur de la capacité du groupe à condition que tous les ci-dessous sont remplies:
Il est éligible à la rafale C'est si, dans la dernière
max.poll.history.window.size
tentatives depoll()
atleastmin.poll.window.maxout.threshold
fois qu'il a reçu nombre de dossiers est " assigné max.sondage.les dossiers qui a été distribué sur la base desmaxPollRecordsDistributor
. Ceci est une indication que la partition a plus d'entrants dossiers à traiter.Supérieur niveau de priorité n'est pas admissible à l'éclatement - Il n'y a pas de priorité plus élevée niveau de la rubrique consommateur qui est admissible à l'éclatement basé sur la logique ci-dessus. Fondamentalement, céder la place à de plus hautes priorités.
Si le ci-dessus sont remplies, alors le niveau de priorité sujet consommateur va s'éclater dans tous les autres niveau de priorité sujet consommateurs capacités. Le montant de la rafale par niveau de priorité sujet de consommation est égal au moins de l'onu-capacité utilisée dans la dernière
max.poll.history.window.size
tentatives depoll()
.OriginalL'auteur kartikaditya