CommitFailedException Commettre ne peut pas être terminée, puisque le groupe a déjà rééquilibré et affecté les partitions à un autre membre

J'ai été en utilisant kafka 0.10.2 et maintenant face à un CommitFailedException. comme:

Commettre ne peut pas être terminée, puisque le groupe a déjà rééquilibré et
affecté les partitions d'un autre membre. Cela signifie que le temps
entre les appels suivants à poll() est plus long que le configuré
max.sondage.d'intervalle.ms, ce qui implique généralement que le sondage de la boucle est
passer trop de temps sur le traitement du message. Vous pouvez régler ce soit
en augmentant le délai d'expiration de session ou en réduisant la taille maximale de
les lots retournés dans poll() avec max.sondage.les enregistrements.

J'ai mis max.sondage.d'intervalle.ms Entier.MAX_VALUE. si quelqu'un peut me dire pourquoi ce qui se passe encore, même, j'ai mis la valeur ?

Une autre question est:
Je n'ai qu'une description d'ensemble de la session.délai d'attente.ms à 60000 et il arrive encore. J'essaie de reproduire par un simple code

 public static void main(String[] args) throws InterruptedException {     
        Logger logger = Logger.getLogger(KafkaConsumer10.class);
        logger.info("XX");
        Properties props = new Properties();
        props.put("bootstrap.servers", "kafka-broker:9098");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("max.poll.interval.ms", "300000");
        props.put("session.timeout.ms", "10000");
        props.put("max.poll.records", "2");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("t1"));
        while (true) {
            Thread.sleep(11000);
            ConsumerRecords<String, String> records = consumer.poll(100);
            //Thread.sleep(11000);
            Thread.sleep(11000);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }

lorsque j'ai mis de la session.délai d'attente.ms à 10000, j'ai essayer de dormir plus de 10000 ms dans mon sondage en boucle, mais il semble que le travail et non l'Exception. donc, je suis confus au sujet de ce. si le rythme cardiaque est déclenchée par la consommation.sondage et de la consommation.s'engager, semble battement de coeur est hors de délai d'expiration de session dans mon code. pourquoi ne pas jeter CommitFailedException ?

Bien que l'utilisateur thread se bloque pendant plus de 10 secondes, le rythme cardiaque thread pourrait encore être en mesure d'envoyer des battements de coeur, normalement, c'est pourquoi aucune des exceptions ont été jetés et c'est aussi la raison pour laquelle max.poll.intervals.ms a été introduit. Ce qui m'intéresse, c'est pourquoi vous avez toujours CommitFailException quand max.poll.intervals.ms est réglé en Entier.MAX_VALUE.
Ma question, je suis aussi très confus au sujet de ce. ..
Pourrait-il être reproduit facilement ou c'est juste arrivé une seule fois?

OriginalL'auteur Simon Su | 2017-08-08