Comment filtrer une étincelle dataframe contre un autre dataframe

Je suis en train de filtrer un dataframe contre l'autre:

scala> val df1 = sc.parallelize((1 to 100).map(a=>(s"user $a", a*0.123, a))).toDF("name", "score", "user_id")
scala> val df2 = sc.parallelize(List(2,3,4,5,6)).toDF("valid_id")

Maintenant, je veux filtre df1 et obtenir un dataframe qui contient toutes les lignes dans df1 où user_id est en df2("valid_id"). En d'autres termes, je veux toutes les lignes dans df1 où l'user_id est soit 2,3,4,5 ou 6

scala> df1.select("user_id").filter($"user_id" in df2("valid_id"))
warning: there were 1 deprecation warning(s); re-run with -deprecation for details
org.apache.spark.sql.AnalysisException: resolved attribute(s) valid_id#20 missing from user_id#18 in operator !Filter user_id#18 IN (valid_id#20);  

D'un autre côté quand j'essaie de faire un filtre à l'encontre d'une fonction, tout ce qui ressemble beaucoup:

scala> df1.select("user_id").filter(($"user_id" % 2) === 0)
res1: org.apache.spark.sql.DataFrame = [user_id: int]

Pourquoi j'obtiens cette erreur? Est-il quelque chose de mal avec ma syntaxe?

commentaire suivant j'ai essayé de faire une jointure externe gauche:

scala> df1.show
+-------+------------------+-------+
|   name|             score|user_id|
+-------+------------------+-------+
| user 1|             0.123|      1|
| user 2|             0.246|      2|
| user 3|             0.369|      3|
| user 4|             0.492|      4|
| user 5|             0.615|      5|
| user 6|             0.738|      6|
| user 7|             0.861|      7|
| user 8|             0.984|      8|
| user 9|             1.107|      9|
|user 10|              1.23|     10|
|user 11|             1.353|     11|
|user 12|             1.476|     12|
|user 13|             1.599|     13|
|user 14|             1.722|     14|
|user 15|             1.845|     15|
|user 16|             1.968|     16|
|user 17|             2.091|     17|
|user 18|             2.214|     18|
|user 19|2.3369999999999997|     19|
|user 20|              2.46|     20|
+-------+------------------+-------+
only showing top 20 rows
scala> df2.show
+--------+
|valid_id|
+--------+
|       2|
|       3|
|       4|
|       5|
|       6|
+--------+
scala> df1.join(df2, df1("user_id") === df2("valid_id"))
res6: org.apache.spark.sql.DataFrame = [name: string, score: double, user_id: int, valid_id: int]
scala> res6.collect
res7: Array[org.apache.spark.sql.Row] = Array()
scala> df1.join(df2, df1("user_id") === df2("valid_id"), "left_outer")
res8: org.apache.spark.sql.DataFrame = [name: string, score: double, user_id: int, valid_id: int]
scala> res8.count
res9: Long = 0

Je suis à court d'étincelle 1.5.0 avec scala 2.10.5

  • Vous souhaitez filtrer ou d'effectuer une jointure sur deux Dataframes?
  • Je veux obtenir un dataframe avec un sous-ensemble de lignes de df1. pour chaque ligne r en df1, si la valeur de r("user_id") est en df2("valid_id"), puis ligne r sera inclus dans le résultat dataframe.
  • Ensuite, vous aurez à effectuer une jointure externe gauche de df1 à df2 sur userId == validId
  • quand j'essaie, j'ai un vide dataframe, et il contient en fait une union de toutes les colonnes. Je vais ajouter un exemple à la question
InformationsquelleAutor polo | 2015-09-18