Est Spark DataFrame de la structure imbriquée limitée pour la sélection?
J'ai un fichier json avec quelques données, je suis en mesure de créer DataFrame hors de lui et que le schéma de particulier, je suis intéressé par ressemble à ce qui suit:
val json: DataFrame = sqlc.load("entities_with_address2.json", "json")
root
|-- attributes: struct (nullable = true)
| |-- Address2: array (nullable = true)
| | |-- value: struct (nullable = true)
| | | |-- Zip: array (nullable = true)
| | | | |-- element: struct (containsNull = true)
| | | | | |-- value: struct (nullable = true)
| | | | | | |-- Zip5: array (nullable = true)
| | | | | | | |-- element: struct (containsNull = true)
| | | | | | | | |-- value: string (nullable = true)
Quand je suis en train de sélectionner les plus profondes de champ:
json.select("attributes.Address2.value.Zip.value.Zip5").collect()
Il me donne une exception:
org.apache.spark.sql.AnalysisException: GetField is not valid on fields of type ArrayType(ArrayType(StructType(StructField(value, StructType(StructField(Zip5, ArrayType(StructType(StructField(value, StringType, true)), true), true)), true)), true), true);
En regardant le resolveGetField méthode de LogicalPlan je vois qu'il est possible de sélectionner à partir de StructType ou de ArrayType(StructType), mais est-il possible de sélectionner plus profond? Comment puis-je sélectionner les champs dont j'ai besoin?
Ici, c'est la pleine exception.
org.apache.spark.sql.AnalysisException: GetField is not valid on fields of type ArrayType(ArrayType(StructType(StructField(value,StructType(StructField(Zip5,ArrayType(StructType(StructField(value,StringType,true)),true),true)),true)),true),true);
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveGetField(LogicalPlan.scala:265)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$3.apply(LogicalPlan.scala:214)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$3.apply(LogicalPlan.scala:214)
at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111)
at scala.collection.immutable.List.foldLeft(List.scala:84)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:214)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveChildren(LogicalPlan.scala:117)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$1.applyOrElse(CheckAnalysis.scala:50)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$1.applyOrElse(CheckAnalysis.scala:46)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:252)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:252)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:251)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$transformExpressionUp$1(QueryPlan.scala:108)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2$$anonfun$apply$2.apply(QueryPlan.scala:123)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:122)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:127)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:46)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:44)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:89)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:44)
at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:40)
at org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:1080)
at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:133)
at org.apache.spark.sql.DataFrame.logicalPlanToDataFrame(DataFrame.scala:157)
at org.apache.spark.sql.DataFrame.select(DataFrame.scala:476)
at org.apache.spark.sql.DataFrame.select(DataFrame.scala:491)
at com.reltio.analytics.PREDF.test(PREDF.scala:55)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:74)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:211)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:67)
Vous devez vous connecter pour publier un commentaire.
Le problème est le ArrayType -- vous pouvez recréer cette erreur est très simple:
À quel point
df.printSchema
montre:Et maintenant, si vous essayez:
Vous obtenez exactement la même exception --
GetField is not valid...
Vous avez un couple de différentes options à la détente de l'
Array
. Vous pouvez obtenir des éléments individuels avecgetItem
comme ceci:Et depuis
getItem
renvoie une autreColumn
, vous pouvez creuser aussi profond que vous le souhaitez:Mais avec un tableau, vous voulez probablement de programmation pour la détente de l'ensemble du Tableau. Si vous regardez la façon dont Ruche poignées de cela, vous avez besoin de faire un
LATERAL VIEW
. Dans Spark, vous allez avoir à utiliserexplode
de créer l'équivalent d'une RucheLATERAL VIEW
:Noter que j'utilise une Classe de Cas dans ma map -- c'est ce que les docs ont. Si vous ne voulez pas créer une classe de cas que vous pouvez simplement retourner un
Tuple1
(ouTuple2
ouTuple3
etc):