星火JoinWithCassandraTable上的滞时间戳分区键 [英] Spark JoinWithCassandraTable on TimeStamp partition key STUCK

查看:284
本文介绍了星火JoinWithCassandraTable上的滞时间戳分区键的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想通过在一个巨大的C *表中的一小部分过滤:

I'm trying to filter on a small part of a huge C* table by using:

    val snapshotsFiltered = sc.parallelize(startDate to endDate).map(TableKey(_)).joinWithCassandraTable("listener","snapshots_tspark")

    println("Done Join")
    //*******
    //get only the snapshots and create rdd temp table
    val jsons = snapshotsFiltered.map(_._2.getString("snapshot"))
    val jsonSchemaRDD = sqlContext.jsonRDD(jsons)
    jsonSchemaRDD.registerTempTable("snapshots_json")

使用:

    case class TableKey(created: Long) //(created, imei, when)--> created = partititon key | imei, when = clustering key

和卡桑德拉表架构是:

CREATE TABLE listener.snapshots_tspark (
created timestamp,
imei text,
when timestamp,
snapshot text,
PRIMARY KEY (created, imei, when) ) WITH CLUSTERING ORDER BY (imei ASC, when ASC)
AND bloom_filter_fp_chance = 0.01
AND caching = '{"keys":"ALL", "rows_per_partition":"NONE"}'
AND comment = ''
AND compaction = {'min_threshold': '4', 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32'}
AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'}
AND dclocal_read_repair_chance = 0.1
AND default_time_to_live = 0
AND gc_grace_seconds = 864000
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair_chance = 0.0
AND speculative_retry = '99.0PERCENTILE';

的问题是,该方法用在火花主UI没有错误完成的println后冻结。

The problem is that the process freezes after the println done with no errors on spark master ui.

[Stage 0:>                                                                                                                                (0 + 2) / 2]

那倒用的时间戳作为分区键加入工作?为什么结冰?

Won`t the Join work with timestamp as the partition key? Why it freezes?

推荐答案

使用:

sc.parallelize(startDate to endDate)

通过由格式从日期产生的STARTDATA和结束日期为朗斯:

With the startData and endDate as Longs generated from Dates by the format:

("yyyy-MM-dd HH:mm:ss")

我做了火花,建立一个庞大的阵列(100,000对象)的加盟与C *表中,并没有在清一色C *坚持努力工作,使连接发生,返回数据。

I made spark to build a huge array (100,000+ objects) to join with C* table and it didn't stuck at all- C* worked hard to make the join happen and return the data.

最后,我改变了我的范围:

Finally, I changed my range to:

case class TableKey(created_dh: String)
val data = Array("2015-10-29 12:00:00", "2015-10-29 13:00:00", "2015-10-29 14:00:00", "2015-10-29 15:00:00")
val snapshotsFiltered = sc.parallelize(data, 2).map(TableKey(_)).joinWithCassandraTable("listener","snapshots_tnew")

和现在是确定的。

这篇关于星火JoinWithCassandraTable上的滞时间戳分区键的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆