CommitFailedException Commettre ne peut pas être terminée, puisque le groupe a déjà rééquilibré et affecté les partitions à un autre membre
J'ai été en utilisant kafka 0.10.2 et maintenant face à un CommitFailedException. comme:
Commettre ne peut pas être terminée, puisque le groupe a déjà rééquilibré et
affecté les partitions d'un autre membre. Cela signifie que le temps
entre les appels suivants à poll() est plus long que le configuré
max.sondage.d'intervalle.ms, ce qui implique généralement que le sondage de la boucle est
passer trop de temps sur le traitement du message. Vous pouvez régler ce soit
en augmentant le délai d'expiration de session ou en réduisant la taille maximale de
les lots retournés dans poll() avec max.sondage.les enregistrements.
J'ai mis max.sondage.d'intervalle.ms Entier.MAX_VALUE. si quelqu'un peut me dire pourquoi ce qui se passe encore, même, j'ai mis la valeur ?
Une autre question est:
Je n'ai qu'une description d'ensemble de la session.délai d'attente.ms à 60000 et il arrive encore. J'essaie de reproduire par un simple code
public static void main(String[] args) throws InterruptedException {
Logger logger = Logger.getLogger(KafkaConsumer10.class);
logger.info("XX");
Properties props = new Properties();
props.put("bootstrap.servers", "kafka-broker:9098");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("max.poll.interval.ms", "300000");
props.put("session.timeout.ms", "10000");
props.put("max.poll.records", "2");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("t1"));
while (true) {
Thread.sleep(11000);
ConsumerRecords<String, String> records = consumer.poll(100);
//Thread.sleep(11000);
Thread.sleep(11000);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
lorsque j'ai mis de la session.délai d'attente.ms à 10000, j'ai essayer de dormir plus de 10000 ms dans mon sondage en boucle, mais il semble que le travail et non l'Exception. donc, je suis confus au sujet de ce. si le rythme cardiaque est déclenchée par la consommation.sondage et de la consommation.s'engager, semble battement de coeur est hors de délai d'expiration de session dans mon code. pourquoi ne pas jeter CommitFailedException ?
max.poll.intervals.ms
a été introduit. Ce qui m'intéresse, c'est pourquoi vous avez toujours CommitFailException quand max.poll.intervals.ms
est réglé en Entier.MAX_VALUE.Ma question, je suis aussi très confus au sujet de ce. ..
Pourrait-il être reproduit facilement ou c'est juste arrivé une seule fois?
OriginalL'auteur Simon Su | 2017-08-08
Vous devez vous connecter pour publier un commentaire.
Hi Pour cela, vous devez gérer le rééquilibrage de la condition dans votre code et doivent, le processus en cours de message et le valider avant le rééquilibrage
Comme :
et d'Utiliser cette syntaxe pour s'abonner à la rubrique :
L'avantage de le faire :
Messages ne vais pas répéter lorsque le rééquilibrage est en cours.
Pas commettre l'échec d'exception
led exception
OriginalL'auteur Abhimanyu
session.timeout.ms
situé sur la consommation doit être inférieure à lagroup.max.session.timeout.ms
ensemble sur Kafka courtier.Cette résolu le problème pour moi.
De crédit à github lien Commettre Des Échecs
La valeur par défaut est pas 30000 (30 secondes), c'est 300000 (5 minutes). Voir kafka.apache.org/10/documentation.html pour plus d'informations.
OriginalL'auteur Rahul Teke