无法保留 DStream 以供下一批使用 [英] Not able to persist the DStream for use in next batch

查看:25
本文介绍了无法保留 DStream 以供下一批使用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

JavaRDD<String> history_ = sc.emptyRDD();

java.util.Queue<JavaRDD<String> > queue = new LinkedList<JavaRDD<String>>();
queue.add(history_);
JavaDStream<String> history_dstream = ssc.queueStream(queue);

JavaPairDStream<String,ArrayList<String>> history = history_dstream.mapToPair(r -> {
  return new Tuple2< String,ArrayList<String> >(null,null);
});  

 JavaPairInputDStream<String, GenericData.Record> stream_1 =
    KafkaUtils.createDirectStream(ssc, String.class, GenericData.Record.class, StringDecoder.class,
        GenericDataRecordDecoder.class, props, topicsSet_1);


JavaPairInputDStream<String, GenericData.Record> stream_2 =
    KafkaUtils.createDirectStream(ssc, String.class, GenericData.Record.class, StringDecoder.class,
        GenericDataRecordDecoder.class, props, topicsSet_2);

然后进行一些转换并创建类型为 twp DStream Data_1 和 Data_2

then doing some transformation and creating twp DStream Data_1 and Data_2 of type

JavaPairDStream<String, <ArrayList<String>>

并按如下方式进行连接,然后过滤掉那些没有连接键的记录,并通过与 Data_1 进行联合将它们保存在历史记录中以便在下一批中使用它

and do the join as below , then filtering out those records for whom there was no joining key and saving them in history for using it in next batch by doing its union with Data_1

 Data_1 = Data_1.union(history);

JavaPairDStream<String, Tuple2<ArrayList<String>, Optional<ArrayList<String>>>> joined =
    Data_1.leftOuterJoin(Data_2).cache();


JavaPairDStream<String, Tuple2<ArrayList<String>, Optional<ArrayList<String>>>> notNULL_join = joined.filter(r -> r._2._2().isPresent());
JavaPairDStream<String, Tuple2<ArrayList<String>, Optional<ArrayList<String>>>> dstream_filtered = joined.filter(r -> !r._2._2().isPresent());

history = dstream_filtered.mapToPair(r -> {
  return new Tuple2<>(r._1,r._2._1);
}).persist;

我在上一步之后获得了历史记录(通过将其保存到 hdfs 来检查),但是在执行联合时,该历史记录仍然批量为空.

I get history after the previous step(checked by saving it to hdfs) , but still this history is empty in batch while doing union.

推荐答案

从概念上讲不可能记住"DStream.DStreams 是有时间限制的,并且在每个时钟滴答(称为批处理间隔")上,DStream 代表 期间在流中观察到的数据一段时间.

It's conceptually not possible to "remember" a DStream. DStreams are time-bound and on each clock-tick (called "batch interval") the DStream represents the observed data in the stream during that period of time.

因此,我们不能将旧"DStream 保存到新"DStream 中.所有 DStreams 都生活在现在"中.

Hence, we cannot have an "old" DStream saved to join with a "new" DStream. All DStreams live in the "now".

DStreams的底层数据结构是RDD:每个batch间隔,我们的DStream会有1个RDD> 该间隔的数据.RDDs 代表一个分布式的数据集合.RDD 是不可变和永久的,只要我们有对它们的引用.

The underlying data structure of DStreams is the RDD: Each batch interval, our DStream will have 1 RDD of the data for that interval. RDDs represent a distributed collection of data. RDDs are immutable and permanent, for as long as we have a reference to them.

我们可以结合 RDDDStream 来创建此处所需的历史翻转".

We can combine RDDs and DStreams to create the "history roll over" that's required here.

它看起来与问题的方法非常相似,但仅使用history RDD.

It looks pretty similar to the approach on the question, but only using the history RDD.

以下是建议更改的高级视图:

Here's a high-level view of the suggested changes:

var history: RDD[(String, List[String]) = sc.emptyRDD()

val dstream1 = ...
val dstream2 = ...

val historyDStream = dstream1.transform(rdd => rdd.union(history))
val joined = historyDStream.join(dstream2)

... do stuff with joined as above, obtain dstreamFiltered ...

dstreamFiltered.foreachRDD{rdd =>
   val formatted = rdd.map{case (k,(v1,v2)) => (k,v1)} // get rid of the join info
   history.unpersist(false) // unpersist the 'old' history RDD
   history = formatted // assign the new history
   history.persist(StorageLevel.MEMORY_AND_DISK) // cache the computation
   history.count() //action to materialize this transformation
}

这只是一个起点.关于checkpointing,还有其他注意事项.否则 history RDD 的谱系将无限增长,直到发生某些 StackOverflow.这个博客非常完整地介绍了这种特殊技术:http://www.spark.tc/stateful-spark-streaming-using-transform/

This is only a starting point. There're additional considerations with regards to checkpointing. Otherwise the lineage of the history RDD will grow unbounded until some StackOverflow happens. This blog is quite complete on this particular technique: http://www.spark.tc/stateful-spark-streaming-using-transform/

我还建议您使用 Scala 而不是 Java.Java 语法过于冗长,无法与 Spark Streaming 一起使用.

I also recommend you using Scala instead of Java. The Java syntax is too verbose to use with Spark Streaming.

这篇关于无法保留 DStream 以供下一批使用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆