Comment diviser les fichiers parquet en plusieurs partitions dans Spark?
Donc j'ai juste 1 parquet fichier que je suis en train de lire avec Spark (à l'aide de SQL trucs) et j'aimerais qu'il soit traité avec plus de 100 partitions. J'ai essayé de réglage spark.default.parallelism
à 100, nous avons également essayé de changer la compression du parquet à aucun (gzip). Peu importe ce que nous faisons de la première étape de la bougie d'emploi n'a qu'une seule partition (une fois une lecture aléatoire se produit, elle devient repartitionné en 100 et, par la suite, évidemment, les choses sont beaucoup beaucoup plus rapide).
Maintenant selon quelques sources (comme ci-dessous) parquet doit être splittable (même si à l'aide de gzip!), donc, je suis super confus et aimerais quelques conseils.
Je suis en utilisant étincelle 1.0.0, et, apparemment, la valeur par défaut pour spark.sql.shuffle.partitions
est de 200, donc il ne peut pas l'être. En fait, toutes les valeurs par défaut de parallélisme sont beaucoup plus que 1, donc je ne comprends pas ce qui se passe.
source d'informationauteur samthebest
Vous devez vous connecter pour publier un commentaire.
Vous devez écrire votre parquet fichiers avec une taille de bloc. Par défaut est de 128 mo par bloc, mais c'est configurable par le paramètre
parquet.block.size
configuration de l'écrivain.La source de ParquetOuputFormat est icisi vous voulez creuser dans les détails.
La taille de bloc est minimale quantité de données que vous pouvez lire sur du parquet de fichier qui est logiquement lisible (car le parquet est en forme de colonne, vous ne pouvez pas simplement diviser en ligne ou quelque chose de trivial comme cela), alors vous ne pouvez pas avoir plus de lecture de threads que de blocs d'entrées.
Peut-être que votre parquet fichier ne prend qu'un seul HDFS bloc. Créer un grand parquet fichier qui a beaucoup de HDFS blocs et de les charger
Vous verrez même nombre de partitions que HDFS blocs. Cela a bien fonctionné pour moi (spark-1.1.0)
Vous avez mentionné que vous voulez de contrôle de la distribution au cours de l'écriture de parquet. Lorsque vous créez parquet de Rdd parquet conserve les partitions de la RDD. Donc, si vous créez des RDD et spécifier 100 partitions et de dataframe avec parquet format, puis il sera écrit de 100 parquet fichiers de fs.
Pour les lire, vous pouvez spécifier
spark.sql.shuffle.partitions
paramètre.À réaliser que vous devez utiliser
SparkContext
pour définir la configuration Hadoop (sc.hadoopConfiguration
) propriétémapreduce.input.fileinputformat.split.maxsize
.Par la définition de cette propriété à une valeur inférieure à hdfs.la taille de bloc, que vous obtiendrez autant de partitions que le nombre de divisions.
Par exemple:
Lorsque
hdfs.blockSize
= 134217728 (128 MO)et lorsqu'un fichier est lu qui contient exactement un bloc complet,
et
mapreduce.input.fileinputformat.split.maxsize
= 67108864 (64 MO)Alors il y aura deux partitions de ces refontes lire dans.
La nouvelle façon de faire (Spark 2.x) est le réglage
spark.sql.files.maxPartitionBytes
Source: https://issues.apache.org/jira/browse/SPARK-17998 (la documentation officielle n'est pas correct encore, manque le cadre .sql)
De mon expérience Hadoop paramètres n'ont plus d'effet.