Limite de Kafka lots de taille lors de l'utilisation de Spark Streaming
Est-il possible de limiter la taille des lots retournés par le Kafka de consommation pour Spark en Streaming?
Je demande parce que le premier lot je reçois a des centaines de millions d'enregistrements et ça prend une éternité de processus et de point de contrôle entre eux.
- Quel est votre traitement actuel de l'intervalle? si son plus essayer de réduire l'intervalle de lot, de sorte que vous pouvez obtenir moins de données.
- Négliger votre question, que vous avez mentionné premier lot..
Vous devez vous connecter pour publier un commentaire.
Je pense que votre problème peut être résolu par Spark Streaming de contre-Pression.
Vérifier
spark.streaming.backpressure.enabled
etspark.streaming.backpressure.initialRate
.Par défaut
spark.streaming.backpressure.initialRate
est pas mis etspark.streaming.backpressure.enabled
est désactivé par défaut, donc je suppose que l'étincelle se prendre autant qu'il le peut.De Apache Spark Kafka configuration
spark.streaming.backpressure.enabled
:Et puisque vous voulez contrôler le premier lot, ou pour être plus précis, le nombre de messages dans le premier lot, je pense que vous avez besoin
spark.streaming.backpressure.initialRate
spark.streaming.backpressure.initialRate
:Celui-ci est bon, quand l'Étincelle de l'emploi (respectivement Étincelle travailleurs à tous) est capable de traiter disons 10000 messages de kafka, mais kafka courtiers donner à votre travail de 100000 messages.
Peut-être que vous serez aussi intéressé à vérifier
spark.streaming.kafka.maxRatePerPartition
et aussi un peu de recherche et des suggestions pour ces propriétés sur l'exemple réel par Jeroen van Wilgenburg sur son blog.spark.streaming.backpressure.initialRate
fonctionne, mais comme Jeroen van Wilgenburg remarqué dans son blog "C'est une bonne idée de fixer un maximum parce que la contre-pression de l'algorithme n'est pas instantanée (ce qui serait impossible)... de la difficulté avec un travail avec Kafka entrée qui pourrait gérer de près de 1000 événements/sec lorsque Kafka a décidé de nous donner 50.000 dossiers/sec dans les premières secondes." .. mais je suis confus cause n'a pas fonctionné.spark.streaming.backpressure.enabled
devrait "en Interne, de manière dynamique, définit le nombre maximum de réception taux de récepteurs"spark.streaming.backpressure.enabled
prend en charge des taux de manière dynamique. "Spark 1.5, nous avons introduit une fonctionnalité appelée contre-pression qui éliminent le besoin de fixer ce taux limite, comme la Spark Streaming automatiquement chiffres sur le taux de limites et s'ajuste de façon dynamique si les conditions de transformation du changement.", ce qui pourrait expliquer pourquoi les taux ne fonctionne pas si la propriété est définie sur true.En dehors de réponses ci-dessus. La taille des lots est le produit de 3 paramètres
batchDuration
: L'intervalle de temps entre les flux de données seront divisées en lots (en Secondes).spark.streaming.kafka.maxRatePerPartition
: définir le nombre maximal de messages par partition par seconde. Ce lorsqu'il est combiné avecbatchDuration
permettra de contrôler la taille des lots. Vous voulez lamaxRatePerPartition
d'être ensemble, et de grande taille (autrement vous sont effectivement la limitation de votre travail) etbatchDuration
très petite.Pour une meilleure explication comment ce produit fonctionne lorsque la contre-pression permet d'activer/de désactiver (réglez étincelle.de streaming.kafka.maxRatePerPartition pour createDirectStream)
Limiter le niveau maximum de la taille des lots va grandement aider à contrôler le temps de traitement, cependant, il augmente la latence de traitement de message.
Par les paramètres ci-dessous propriétés, nous pourrions contrôler la taille du lot
spark.de streaming.le récepteur.maxRate=
spark.de streaming.kafka.maxRatePerPartition=
Vous pouvez même définir dynamiquement la taille des lots en fonction du temps de traitement, en permettant à la pression de retour
spark.de streaming.la contre-pression.enabled:true
spark.de streaming.la contre-pression.initialRate: