Comment régler la taille des messages dans Kafka?
Je suis actuellement à l'aide de Kafka 0.9.0.1. Selon certaines sources, j'ai trouvé, la façon de définir la taille des messages est de modifier les valeurs de clés suivantes dans server.properties
.
- message.max.octets
- réplique.fetch.max.octets
- chercher.message.max.octets
Mon server.properties
fichier a effectivement ces paramètres.
message.max.bytes=10485760
replica.fetch.max.bytes=20971520
fetch.message.max.bytes=10485760
D'autres paramètres qui peuvent être pertinents sont ci-dessous.
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
Cependant, quand j'essaye d'envoyer des messages avec des charges utiles de 4 à 6 MO en taille, le consommateur ne reçoit jamais de messages. Le producteur semble envoyer des messages sans aucune exception levée. Si je dois envoyer de petites charges utiles (< 1 MO), le consommateur se fait de recevoir les messages.
Aucune idée sur ce que je fais mal en termes de paramètres de configuration?
Voici l'exemple de code pour envoyer un message.
Producer<String, byte[]> producer = new KafkaProducer<>(getProducerProps());
File dir = new File("/path/to/dir");
for(String s : dir.list()) {
File f = new File(dir, s);
byte[] data = Files.readAllBytes(f.toPath());
Payload payload = new Payload(data); //a simple pojo to store payload
String key = String.valueOf(System.currentTimeMillis());
byte[] val = KryoUtil.toBytes(payload); //custom util to use kryo to get bytes[]
producer.send(new ProducerRecord<>("test", key, val));
}
producer.close();
Voici l'exemple de code pour recevoir un message.
KafkaConsumer consumer = new KafkaConsumer<>(getConsumerProps());
consumer.subscribe(Arrays.asList("test"));
while(true) {
ConsumerRecord<String, byte[]> records = consumer.poll(100);
for(ConsumerRecord<String, byte[]> record : records) {
long offset = record.offset();
String key = record.key();
byte[] val = record.value();
Payload payload = (Payload)KryoUtil.toObject(val, Payload.class); //custom util to use kryo to deserialize back to object
System.out.println(
System.format("offset=%d, key=%s", offset, key));
}
}
Ici sont les méthodes pour remplir les fichiers de propriétés pour le producteur et le consommateur.
public static Properties getProducerProps() {
Properties props = new Properties();
props.put("bootstrap.servers", "qc1:9092,qc2:9092,qc3:9092,qc4:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("compression.type", "snappy");
props.put("max.request.size", 10485760); //need this
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
return props;
}
public static Properties getConsumerProps() {
Properties props = new Properties();
props.put("bootstrap.servers", "qc1:9092,qc2:9092,qc3:9092,qc4:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("max.partition.fetch.bytes", 10485760); //need this too
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
return props;
}
OriginalL'auteur Jane Wayne | 2016-02-29
Vous devez vous connecter pour publier un commentaire.
Jane,
N'utilisez pas de
fetch.message.max.bytes
tout d'abord parce que c'est une propriété qui est au consommateur et ne pas aller dans le serveur.les propriétés de fichier et la deuxième raison est pour l'ancienne version de la consommation, au lieuutilisation
max.partition.fetch.bytes
lorsque vous créez le Consommateur dans le cadre de l'propriétés que vous utilisez pour l'instancier.Il s'avère que j'ai besoin de régler à la fois
max.request.size
pour le producteur etmax.partition.fetch.bytes
pour le consommateur. Je vais bricoler un peu avec le code pour voir simax.partition.fetch.bytes
est vraiment nécessaire.Oui, il s'avère que j'ai besoin des deux paramètres. Si je n'ai pas mis
max.partition.fetch.bytes
, puis-je obtenir unRecordTooLargeException
.Oui, vous allez avoir besoin d'un max.demande.la taille aussi, mais depuis que vous m'avez dit que l'envoi n'était pas un problème je n'ai pas beaucoup d'attention à ce paramètre. Pouvez-vous accepter la réponse?
OriginalL'auteur Nautilus