如何通过子查询和地图查找进行高阶函数转换? [英] How to do higher order function transform with sub query and a map lookup?
问题描述
map1:org.apache.spark.sql.DataFrame = [lookup:map< string,string>]
map1: org.apache.spark.sql.DataFrame = [lookup: map<string,string>]
scala> val ds1 = spark.sql("select 'p1' as p, Array('s2','s3') as c")
ds1:org.apache.spark.sql.DataFrame = [p:字符串,c:数组]
ds1: org.apache.spark.sql.DataFrame = [p: string, c: array]
scala> ds1.createOrReplaceTempView("ds1")
scala> map1.createOrReplaceTempView("map1")
scala> map1.show()
+--------------------+
| lookup|
+--------------------+
|[p1 -> s1, p2 -> ...|
+--------------------+
scala> ds1.show()
+---+--------+
| p| c|
+---+--------+
| p1|[s2, s3]|
+---+--------+
map1.selectExpr("element_at(`lookup`, 's2')").first()
res50:org.apache.spark.sql.Row = [p2]
res50: org.apache.spark.sql.Row = [p2]
scala> spark.sql("select element_at(`lookup`, 's1') from map1").show()
+----------------------+
|element_at(lookup, s1)|
+----------------------+
| p1|
+----------------------+
到目前为止,一切都很好.在接下来的两个步骤中,我遇到了一些问题:
So far so good. In my next two steps I am hitting some issues:
scala> ds1.selectExpr("p", "c", "transform(c, cs -> map1.selectExpr('element_at(`lookup`, cs)')) as cs").show()
20/09/28 19:44:59警告HiveConf:名称的HiveConfhive.stats.jdbc.timeout不存在20/09/28 19:44:59警告HiveConf:名称为hive.stats.retries.wait的HiveConf不存在20/09/28 19:45:03 WARN ObjectStore:在以下版本中找不到版本信息元商店.hive.metastore.schema.verification未启用,因此记录架构版本2.3.0 20/09/28 19:45:03 WARN ObjectStore:调用了setMetaStoreSchemaVersion,但禁用了录制版本:版本= 2.3.0,注释=由MetaStore root@10.1.21.76设置20/09/2819:45:03 WARN ObjectStore:无法获取数据库map1,返回NoSuchObjectException org.apache.spark.sql.AnalysisException:未定义函数:"selectExpr".此功能既不是已注册的临时功能或已注册的永久功能数据库"map1".第1行pos 19
20/09/28 19:44:59 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist 20/09/28 19:44:59 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist 20/09/28 19:45:03 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 2.3.0 20/09/28 19:45:03 WARN ObjectStore: setMetaStoreSchemaVersion called but recording version is disabled: version = 2.3.0, comment = Set by MetaStore root@10.1.21.76 20/09/28 19:45:03 WARN ObjectStore: Failed to get database map1, returning NoSuchObjectException org.apache.spark.sql.AnalysisException: Undefined function: 'selectExpr'. This function is neither a registered temporary function nor a permanent function registered in the database 'map1'.; line 1 pos 19
scala> spark.sql("""select p, c, transform(c, cs -> (select element_at(`lookup`, cs) from map1)) cc from ds1""").show()
org.apache.spark.sql.AnalysisException:无法解析给定的"
cs
"输入列:[map1.lookup];第1行pos 61;'Project [p#329,c#330,变换(c#330,lambdafunction(标量子查询#713 [],lambda cs#715,false))AS cc#714]:+-'项目[unresolvedalias('element_at(lookup#327,'cs),None)]:+-SubqueryAlias map1:+-Project [map(s1,p1,s2,p2,s3,p3)ASlookup#327]:+-OneRowRelation+-SubqueryAlias ds1 +-项目[p1 AS p#329,array(s2,s3)AS c#330]+-OneRowRelatio
org.apache.spark.sql.AnalysisException: cannot resolve '
cs
' given input columns: [map1.lookup]; line 1 pos 61; 'Project [p#329, c#330, transform(c#330, lambdafunction(scalar-subquery#713 [], lambda cs#715, false)) AS cc#714] : +- 'Project [unresolvedalias('element_at(lookup#327, 'cs), None)] : +- SubqueryAlias map1 : +- Project [map(s1, p1, s2, p2, s3, p3) AS lookup#327] : +- OneRowRelation +- SubqueryAlias ds1 +- Project [p1 AS p#329, array(s2, s3) AS c#330] +- OneRowRelatio
我该如何解决这些问题?
How can I solve these issues?
推荐答案
只需将表名添加到 from
子句中.
Simply add the table name to the from
clauses.
spark.sql("""select p, c, transform(c, cs -> element_at(`lookup`, cs)) cc from ds1 a, map1 b""").show()
+---+--------+--------+
| p| c| cc|
+---+--------+--------+
| p1|[s2, s3]|[p2, p3]|
+---+--------+--------+
这篇关于如何通过子查询和地图查找进行高阶函数转换?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!