Dataframe where子句在使用spark cassandra连接器时不起作用 [英] Dataframe where clause doesn't work when use spark cassandra connector

查看:340
本文介绍了Dataframe where子句在使用spark cassandra连接器时不起作用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们使用python spark cassandra驱动程序V3.0.0。 from datastax
当尝试使用dataframe加载数据时,where子句不起作用。
但是CQL本身在Datastax DevCenter中工作。代码如下所示

  dataf = sqlc.read.format(org.apache.spark.sql.cassandra)\\ \\ 

.options(table =tran_history,keyspace =test)\

.load()\

.where (usr_id ='abc'log_ts> = maxtimeuuid('2016-02-01 10:09:26-0800'))\

.collect()



似乎驱动程序无法识别方法maxtimeuuid



----------------以下是错误

 文件C:\ Spark $\\spark-1.4.1-bin-hadoop2.6.2 \python\lib\pyspark.zip\pyspark\sql\dataframe.py,行759,在过滤器

文件C:\Spark\spark-1.4.1-bin-hadoop2.6.2\python\lib\py4j-0.8.2.1-src.zip\py4j\java_gateway.py,第538行,in __call__

文件C:\Spark\spark-1.4.1-bin-hadoop2.6.2\python\lib\py4j-0.8.2.1-src.zip\ py4j \protocol.py,第300行,在get_return_value

py4j.protocol.Py4JJavaError:调用o41.filter时出错。

:java.util.NoSuchElementException:key not found:maxtimeuuid

不确保如果有任何版本匹配问题。我们正在使用DSE 4.8.1。

解决方案

Api冲突



数据框不使用SparkCassandra连接器api,所以当你在DataFrame上键入其中它实际上是调用Catalyst调用。这不是被转移到底层的CQL层,而是应用在Spark本身。 Spark不知道maxtimeuuid是什么,因此失败。


使用给定的SQL表达式过滤行。


请参阅 http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrame



由于这个谓词无效,它永远不会到达连接器,所以我们将无法在数据源级别处理这样的子句。



只有Spark Cassandra连接器RDD.where子句会直接将CQL传递给底层的RDD。


添加一个CQL WHERE谓词s)到查询。用于在Cassandra中利用
二级索引。隐式地向WHERE子句添加一个ALLOW FILTERING
子句,但是请注意,Cassandra会拒绝某些谓词可能
,特别是在对
无索引的非聚簇列进行过滤的情况下。 / p>

http://datastax.github.io/spark-cassandra-connector/ApiDocs/1.6.0-M1/ spark-cassandra-connector /#com.datastax.spark.connector.rdd.CassandraRDD



Dataframes和TimeUUID



由于Catalyst没有将 TimeUUID 作为类型的概念,因此连接器将它们(通过DataFrames)读取为字符串,因此将TimeUUID与Dataframes进行比较会很困难。这是一个问题,因为TimeUUID不是词汇可比性的,所以即使生成TimeUUID,然后直接比较它而不是调用函数,你也不会得到正确的答案。


We use python spark cassandra driver V3.0.0. from datastax When try to load data by using dataframe, the where clause doesn't work. However the CQL itself does work in Datastax DevCenter. The code looks like this

dataf = sqlc.read.format("org.apache.spark.sql.cassandra")\

.options(table="tran_history", keyspace="test")\

.load()\

.where("usr_id='abc' log_ts >= maxtimeuuid('2016-02-01 10:09:26-0800')")\

.collect()

It seems the driver doesn't recognize method maxtimeuuid

------------------Below is the error

File "C:\Spark\spark-1.4.1-bin-hadoop2.6.2\python\lib\pyspark.zip\pyspark\sql\dataframe.py", line 759, in filter

File "C:\Spark\spark-1.4.1-bin-hadoop2.6.2\python\lib\py4j-0.8.2.1-src.zip\py4j\java_gateway.py", line 538, in __call__

File "C:\Spark\spark-1.4.1-bin-hadoop2.6.2\python\lib\py4j-0.8.2.1-src.zip\py4j\protocol.py", line 300, in get_return_value

py4j.protocol.Py4JJavaError: An error occurred while calling o41.filter.

: java.util.NoSuchElementException: key not found: maxtimeuuid

Not sure if there is any version match issue. We are using DSE 4.8.1.

解决方案

Api Conflicts

Dataframes do not use the SparkCassandra connector api, so when you type where on a DataFrame it is actually invoking a Catalyst call. This is not being transferred to the underlying CQL layer but instead being applied in Spark itself. Spark doesn't know what "maxtimeuuid" is so it fails.

Filters rows using the given SQL expression.

See http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrame

Since this predicate is invalid it will never reach the connector so we will not be able to process a clause like this at the datasource level.

Only the Spark Cassandra Connector RDD.where clause will directly pass CQL to the underlying RDD.

Adds a CQL WHERE predicate(s) to the query. Useful for leveraging secondary indexes in Cassandra. Implicitly adds an ALLOW FILTERING clause to the WHERE clause, however beware that some predicates might be rejected by Cassandra, particularly in cases when they filter on an unindexed, non-clustering column.

http://datastax.github.io/spark-cassandra-connector/ApiDocs/1.6.0-M1/spark-cassandra-connector/#com.datastax.spark.connector.rdd.CassandraRDD

Dataframes and TimeUUID

Comparing TimeUUIDs with Dataframes is going to be difficult since Catalyst has no notion of TimeUUID as a type so the Connector Reads them (through DataFrames) as a String. This is a problem because TimeUUIDs are not lexically comparable so you won't get the right answer even if you generate the TimeUUID and then compare with it directly instead of calling a function.

这篇关于Dataframe where子句在使用spark cassandra连接器时不起作用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆