Cassandra 3.7 CDC/增量数据加载 [英] Cassandra 3.7 CDC / incremental data load

查看:83
本文介绍了Cassandra 3.7 CDC/增量数据加载的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我对ETL领域还很陌生,我希望使用Cassandra 3.7和Spark实现增量数据加载.我知道Cassandra的更高版本确实支持CDC,但我只能使用Cassandra 3.7.是否有一种方法,我可以通过该方法仅跟踪更改的记录并使用spark加载它们,从而执行增量数据加载?

I'm very new to the ETL world and I wish to implement Incremental Data Loading with Cassandra 3.7 and Spark. I'm aware that later versions of Cassandra do support CDC, but I can only use Cassandra 3.7. Is there a method through which I can track the changed records only and use spark to load them, thereby performing incremental data loading?

如果不能在cassandra端完成,则在Spark方面也欢迎其他建议:)

If it can't be done on the cassandra end, any other suggestions are also welcome on the Spark side :)

推荐答案

这是一个广泛的话题,有效的解决方案将取决于表中的数据量,表结构,数据的插入/更新方式等.,具体解决方案可能取决于可用的Spark版本.纯火花方法的一个缺点是,如果没有完整的先前状态副本,就无法轻松检测数据的删除,因此您可以在两种状态之间产生差异.

It's quite a broad topic, and efficient solution will depend on the amount of data in your tables, table structure, how data is inserted/updated, etc. Also, specific solution may depend on the version of Spark available. One downside of Spark-only method is you can't easily detect deletes of the data, without having a complete copy of previous state, so you can generate a diff between 2 states.

在所有情况下,都需要执行全表扫描以查找更改的条目,但是如果表是专门为此任务组织的,则可以避免读取所有数据.例如,如果您有一个具有以下结构的表:

In all cases you'll need to perform full table scan to find changed entries, but if your table is organized specifically for this task, you can avoid reading of all data. For example, if you have a table with following structure:

create table test.tbl (
  pk int,
  ts timestamp,
  v1 ...,
  v2 ...,
  primary key(pk, ts));

然后,如果您执行以下查询:

then if you do following query:

import org.apache.spark.sql.cassandra._
val data = spark.read.cassandraFormat("tbl", "test").load()
val filtered = data.filter("""ts >= cast('2019-03-10T14:41:34.373+0000' as timestamp) 
                              AND ts <= cast('2019-03-10T19:01:56.316+0000' as timestamp)""")

然后,Spark Cassandra Connector将把该查询下推到Cassandra,并将仅读取 ts 在给定时间范围内的数据-您可以通过执行 filtered.explain ,并检查两个时间过滤器是否都标记有 * 符号.

then Spark Cassandra Connector will push this query down to the Cassandra, and will read only data where ts is in the given time range - you can check this by executing filtered.explain and checking that both time filters are marked with * symbol.

另一种检测更改的方法是从Cassandra中检索写入时间,然后根据该信息过滤掉更改.写入时间的获取是

Another way to detect changes is to retrieve the write time from Cassandra, and filter out the changes based on that information. Fetching of writetime is supported in RDD API for all recent versions of SCC, and is supported in the Dataframe API since release of SCC 2.5.0 (requires at least Spark 2.4, although may work with 2.3 as well). After fetching this information, you can apply filters on the data & extract changes. But you need to keep in mind several things:

  • 无法使用此方法检测删除
  • 写入时间信息仅适用于常规&静态列,但不适用于主键列
  • 如果在插入后该行进行了部分更新,则每个列可能都有其自己的写入时间值
  • 在大多数版本的Cassandra中,对收集列(列表/地图/集合)完成操作后,调用 writetime 函数将生成错误,并且可能会返回 null 用于用户定义类型的列
  • there is no way to detect deletes using this method
  • write time information exists only for regular & static columns, but not for columns of primary key
  • each column may have its own write time value, in case if there was a partial update of the row after insertion
  • in most versions of Cassandra, call of writetime function will generate error when it's done for collection column (list/map/set), and will/may return null for column with user-defined type

P.S.即使您启用了CDC,正确使用它也不是一件容易的事:

P.S. Even if you had CDC enabled, it's not a trivial task to use it correctly:

  • 您需要删除重复项-您拥有更改的RF副本
  • 例如,当节点关闭时,某些更改可能会丢失,然后通过提示或修复进行传播
  • TTL不容易处理
  • ...

对于CDC,您可以在2019年DataStax Accelerate会议上寻找演示文稿-有关该主题的一些演讲.

For CDC you may look for presentations from 2019th DataStax Accelerate conference - there were several talks on that topic.

这篇关于Cassandra 3.7 CDC/增量数据加载的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆