Cassandra / Spark显示大表的错误条目计数 [英] Cassandra/Spark showing incorrect entries count for large table

查看:134
本文介绍了Cassandra / Spark显示大表的错误条目计数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用spark处理大型cassandra表(约4.02亿个条目和84列),但结果却不一致。最初的要求是将某些列从该表复制到另一个表。复制数据后,我注意到新表中的某些条目丢失了。为了验证我是否对大型源表进行了计数,但是每次都得到不同的值。我尝试在较小的表(约700万条记录)上查询,结果很好。

I am trying to use spark to process a large cassandra table (~402 million entries and 84 columns) but I am getting inconsistent results. Initially the requirement was to copy some columns from this table to another table. After copying the data, I noticed that some entries in the new table were missing. To verify that I took count of the large source table but I am getting different values each time. I tried the queries on a smaller table (~7 million records) and the results were fine.

最初,我尝试使用pyspark进行计数。这是我的pyspark脚本:

Initially, I attempted to take count using pyspark. Here is my pyspark script:

spark = SparkSession.builder.appName("Datacopy App").getOrCreate() 
df = spark.read.format("org.apache.spark.sql.cassandra").options(table=sourcetable, keyspace=sourcekeyspace).load().cache() 
df.createOrReplaceTempView("data") 
query = ("select count(1) from data " ) 
vgDF = spark.sql(query) 
vgDF.show(10)

火花提交命令如下:

~/spark-2.1.0-bin-hadoop2.7/bin/spark-submit --master spark://10.128.0.18:7077 --packages datastax:spark-cassandra-connector:2.0.1-s_2.11 --conf spark.cassandra.connection.host="10.128.1.1,10.128.1.2,10.128.1.3" --conf "spark.storage.memoryFraction=1" --conf spark.local.dir=/media/db/ --executor-memory 10G --num-executors=6 --executor-cores=2 --total-executor-cores 18 pyspark_script.py

上述火花提交过程大约需要90分钟才能完成。我运行了三次,这是我得到的计数:

The above spark submit process takes ~90 minutes to complete. I ran it three times and here are the counts I got:


  • 火花迭代1:402273852

  • Spark迭代2:402273884

  • Spark迭代3:402274209

Spark不显示任何内容整个过程中的错误或异常。我在cqlsh三次中运行了相同的查询,并再次获得了不同的结果:

Spark does not show any error or exception during the entire process. I ran the same query in cqlsh thrice and got different results again:


  • Cqlsh迭代1:402273598

  • Cqlsh迭代2:402273499

  • Cqlsh迭代3:402273515

我无法找出为什么我从同一查询中得到不同的结果。 Cassandra系统日志(/var/log/cassandra/system.log)仅显示了以下错误消息:

I am unable to find out why I am getting different outcomes from the same query. Cassandra system logs (/var/log/cassandra/system.log) has shown the following error message just once:

ERROR [SSTableBatchOpen:3] 2018-02-27 09:48:23,592 CassandraDaemon.java:226 - Exception in thread Thread[SSTableBatchOpen:3,5,main]
java.lang.AssertionError: Stats component is missing for sstable /media/db/datakeyspace/sensordata1-acfa7880acba11e782fd9bf3ae460699/mc-58617-big
        at org.apache.cassandra.io.sstable.format.SSTableReader.open(SSTableReader.java:460) ~[apache-cassandra-3.9.jar:3.9]
        at org.apache.cassandra.io.sstable.format.SSTableReader.open(SSTableReader.java:375) ~[apache-cassandra-3.9.jar:3.9]
        at org.apache.cassandra.io.sstable.format.SSTableReader$4.run(SSTableReader.java:536) ~[apache-cassandra-3.9.jar:3.9]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_131]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_131]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0_131]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_131]
        at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]

版本:


  • Cassandra 3.9。

  • Spark 2.1.0。

  • Datastax的spark-cassandra-connector 2.0.1

  • Scala版本2.11

  • Cassandra 3.9.
  • Spark 2.1.0.
  • Datastax's spark-cassandra-connector 2.0.1
  • Scala version 2.11

集群:


  • 具有3个工作人员和1个主节点的火花设置。

  • 3个工作程序节点也安装了一个cassandra群集。

  • 每个工作程序节点都有8个CPU内核和40 GB RAM。

任何帮助将不胜感激。

Any help will be greatly appreciated.

推荐答案

Spark Cassandra连接器的默认读取一致性为 LOCAL_ONE,默认写入一致性为 LOCAL_QUORUM,因此可以读取使用默认值进行完全修复之前的部分数据。您可以为未能写入数据的节点读取一个,但这不是错误,因为其他两个副本成功。因此,您应该将两个级别都设置为QUORUM,或者将其中之一设置为ALL

Spark Cassandra connector default read consistency is "LOCAL_ONE" and default write consistency is "LOCAL_QUORUM", so it is possible to read partial data before full repair with that defaults. You can read "ONE" for the node that fail to write data, but that was not error because other 2 replicas success. So you should either set BOTH levels to QUORUM or one of them to ALL

config("spark.cassandra.input.consistency.level", "LOCAL_QUORUM").
config("spark.cassandra.output.consistency.level", "LOCAL_QUORUM").

默认的CQL shell级别也为ONE,因此您也应该增加它:

The default CQL shell level is also ONE, so you should also increase it:

cqlsh> CONSISTENCY QUORUM

这篇关于Cassandra / Spark显示大表的错误条目计数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆