filtre étincelle dataframe avec la ligne de champ qui est un tableau de chaînes de caractères
À l'aide de l'Étincelle de 1,5 et Scala 2.10.6
Je suis en train de filtrer un dataframe par l'intermédiaire d'un champ "tags" qui est un tableau de chaînes de caractères. Recherche de toutes les lignes qui ont le tag "privé".
val report = df.select("*")
.where(df("tags").contains("private"))
:
Exception in thread "main" org.apache.spark.sql.AnalysisException:
ne peut pas résoudre "Contient(tags, privé)" en raison d'incompatibilité de type de données:
argument 1 exige de type chaîne de caractères, cependant, "tags" est de tableau
type.;
Est la méthode de filtrage mieux adapté?
Mise à JOUR:
les données proviennent de cassandra adaptateur, mais un minimum d'exemple qui montre ce que je suis en train de faire et aussi obtient l'erreur ci-dessus est:
def testData (sc: SparkContext): DataFrame = {
val stringRDD = sc.parallelize(Seq("""
{ "name": "ed",
"tags": ["red", "private"]
}""",
"""{ "name": "fred",
"tags": ["public", "blue"]
}""")
)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
sqlContext.read.json(stringRDD)
}
def run(sc: SparkContext) {
val df1 = testData(sc)
df1.show()
val report = df1.select("*")
.where(df1("tags").contains("private"))
report.show()
}
Mise à JOUR: les balises de tableau peut être n'importe quelle longueur et le "privé" de la balise peut être dans n'importe quelle position
Mise à JOUR: une solution qui fonctionne: UDF
val filterPriv = udf {(tags: mutable.WrappedArray[String]) => tags.contains("private")}
val report = df1.filter(filterPriv(df1("tags")))
Une option est de construire un UDF.
Eh bien, après avoir regardé le code source (depuis le scaladoc pour
Column.contains
dit seulement "Contient l'autre" qui n'est pas très instructif), je vois que Column.contains
construit une instance de org.apache.spark.sql.catalyst.expressions.Contains
qui dit "Une fonction qui renvoie true si la chaîne left
contient la chaîne de caractères right
". Il semble donc que df1("tags").contains
ne peut pas faire ce que nous voulons faire dans ce cas. Je ne sais pas quelle autre solution à proposer. Il y a un ArrayContains
également dans ...expressions
mais Column
ne semble pas en faire usage.En effet, après avoir changé les données à des chaînes au lieu d'un tableau de chaînes de caractères, je trouve que la requête fonctionne.
J'ai eu un UDF de travail:
val filterPriv = udf {(tags: mutable.WrappedArray[String]) => tags.contains("private")}; val report = df1.filter(filterPriv(df1("tags")))
toujours à la recherche de quelque chose de plus sympa, mais au moins je ne suis pas bloqué. thx!OriginalL'auteur navicore | 2016-01-17
Vous devez vous connecter pour publier un commentaire.
Je pense que si vous utilisez
where(array_contains(...))
il va travailler. Voici mon résultat:Remarque qu'il fonctionne si vous écrivez
where(array_contains(df("tags"), "private"))
, mais si vous écrivezwhere(df("tags").array_contains("private"))
(plus directement analogue à ce que vous avez écrit à l'origine), il échoue avecarray_contains is not a member of org.apache.spark.sql.Column
. En regardant le code source deColumn
, je vois des trucs pour gérercontains
(construction d'unContains
exemple) mais pasarray_contains
. C'est peut-être un oubli..select("*")
n'est pas nécessaire =>df.where(...) ...
OriginalL'auteur Robert Dodier
Vous pouvez utiliser ordinale à consulter le tableau json est pour, par exemple, dans votre cas
df("tags")(0)
. Voici un exemple de travailensuite, ton code devrait fonctionner. vérifier ma mise à jour
intéressant, je suis en manque de quelque chose, ressemble exactement à ce que je faisais, mais l'obtention de l'erreur. double vérification étincelle versions maintenant...
c'est sur 1.5.4
thx. Je dois être croisement de main quelque part. J'ai essayé 1.5.1 1.6 et
val report = df.select("*").where(df("tags").contains("private"))
me donne cette erreur dans l'orig post. creuser...OriginalL'auteur Aravind R. Yarram