Comment obtenir le dernier offset pour une partition pour un sujet kafka?
Je suis en utilisant le Python de haut niveau de consommation de Kafka et que vous voulez savoir sur les décalages plus pour chaque partition d'un sujet. Cependant je ne peux pas le faire fonctionner.
from kafka import TopicPartition
from kafka.consumer import KafkaConsumer
con = KafkaConsumer(bootstrap_servers = brokers)
ps = [TopicPartition(topic, p) for p in con.partitions_for_topic(topic)]
con.assign(ps)
for p in ps:
print "For partition %s highwater is %s"%(p.partition,con.highwater(p))
print "Subscription = %s"%con.subscription()
print "con.seek_to_beginning() = %s"%con.seek_to_beginning()
Mais le résultat que j'obtiens est
For partition 0 highwater is None
For partition 1 highwater is None
For partition 2 highwater is None
For partition 3 highwater is None
For partition 4 highwater is None
For partition 5 highwater is None
....
For partition 96 highwater is None
For partition 97 highwater is None
For partition 98 highwater is None
For partition 99 highwater is None
Subscription = None
con.seek_to_beginning() = None
con.seek_to_end() = None
J'ai une autre approche à l'aide de assign
mais le résultat est le même
con = KafkaConsumer(bootstrap_servers = brokers)
ps = [TopicPartition(topic, p) for p in con.partitions_for_topic(topic)]
con.assign(ps)
for p in ps:
print "For partition %s highwater is %s"%(p.partition,con.highwater(p))
print "Subscription = %s"%con.subscription()
print "con.seek_to_beginning() = %s"%con.seek_to_beginning()
print "con.seek_to_end() = %s"%con.seek_to_end()
Il semble d'une partie de la documentation que je pourrais obtenir ce comportement si un fetch
n'a pas été délivré. Mais je ne peux pas trouver un moyen de la force. Ce que je fais mal?
Ou est-il un autre/simple d'obtenir les plus récentes de compensations pour un sujet?
source d'informationauteur Saket
Vous devez vous connecter pour publier un commentaire.
Enfin après avoir passé une journée sur ce sujet et plusieurs faux départs, j'ai été capable de trouver une solution et obtenir ce travail. Le poster son de sorte que d'autres puissent s'y référer.
Si vous souhaitez utiliser Kafka shell scripts présents dans kafka/bin, alors vous pouvez obtenir la dernière et la plus petite des compensations par l'aide de kafka-run-class.sh.
Pour obtenir la dernière commande offset ressemblera à ceci
Pour obtenir le plus petit décalage de commande ressemblera à ceci
Vous pouvez trouver plus d'informations sur les Décalages de Shell à partir de la suite lien
Espérons que cette aide!
Une autre façon d'y parvenir est par interrogation de la consommation pour obtenir la dernière consommation d'offset et puis, à l'aide de la seek_to_end méthode pour obtenir les plus récentes disponibles décalage de la partition.
Cette méthode particulièrement utile lors de l'utilisation de groupes de défense des consommateurs.
SOURCES: