Comment obtenir le dernier offset pour une partition pour un sujet kafka?

Je suis en utilisant le Python de haut niveau de consommation de Kafka et que vous voulez savoir sur les décalages plus pour chaque partition d'un sujet. Cependant je ne peux pas le faire fonctionner.

from kafka import TopicPartition
from kafka.consumer import KafkaConsumer

con = KafkaConsumer(bootstrap_servers = brokers)
ps = [TopicPartition(topic, p) for p in con.partitions_for_topic(topic)]

con.assign(ps)
for p in ps:
    print "For partition %s highwater is %s"%(p.partition,con.highwater(p))

print "Subscription = %s"%con.subscription()
print "con.seek_to_beginning() = %s"%con.seek_to_beginning()

Mais le résultat que j'obtiens est

For partition 0 highwater is None
For partition 1 highwater is None
For partition 2 highwater is None
For partition 3 highwater is None
For partition 4 highwater is None
For partition 5 highwater is None
....
For partition 96 highwater is None
For partition 97 highwater is None
For partition 98 highwater is None
For partition 99 highwater is None
Subscription = None
con.seek_to_beginning() = None
con.seek_to_end() = None

J'ai une autre approche à l'aide de assign mais le résultat est le même

con = KafkaConsumer(bootstrap_servers = brokers)
ps = [TopicPartition(topic, p) for p in con.partitions_for_topic(topic)]

con.assign(ps)
for p in ps:
    print "For partition %s highwater is %s"%(p.partition,con.highwater(p))

print "Subscription = %s"%con.subscription()
print "con.seek_to_beginning() = %s"%con.seek_to_beginning()
print "con.seek_to_end() = %s"%con.seek_to_end()

Il semble d'une partie de la documentation que je pourrais obtenir ce comportement si un fetch n'a pas été délivré. Mais je ne peux pas trouver un moyen de la force. Ce que je fais mal?

Ou est-il un autre/simple d'obtenir les plus récentes de compensations pour un sujet?

source d'informationauteur Saket