无法持久保存DStream以在下一批中使用 [英] Not able to persist the DStream for use in next batch

查看:55
本文介绍了无法持久保存DStream以在下一批中使用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

JavaRDD<String> history_ = sc.emptyRDD();

java.util.Queue<JavaRDD<String> > queue = new LinkedList<JavaRDD<String>>();
queue.add(history_);
JavaDStream<String> history_dstream = ssc.queueStream(queue);

JavaPairDStream<String,ArrayList<String>> history = history_dstream.mapToPair(r -> {
  return new Tuple2< String,ArrayList<String> >(null,null);
});  

 JavaPairInputDStream<String, GenericData.Record> stream_1 =
    KafkaUtils.createDirectStream(ssc, String.class, GenericData.Record.class, StringDecoder.class,
        GenericDataRecordDecoder.class, props, topicsSet_1);


JavaPairInputDStream<String, GenericData.Record> stream_2 =
    KafkaUtils.createDirectStream(ssc, String.class, GenericData.Record.class, StringDecoder.class,
        GenericDataRecordDecoder.class, props, topicsSet_2);

然后进行一些转换并创建类型为

then doing some transformation and creating twp DStream Data_1 and Data_2 of type

JavaPairDStream<String, <ArrayList<String>>

并按如下所示进行联接,然后过滤掉那些没有联接键的记录,并通过与Data_1进行联合将它们保存在历史记录中,以便在下一批中使用它.

and do the join as below , then filtering out those records for whom there was no joining key and saving them in history for using it in next batch by doing its union with Data_1

 Data_1 = Data_1.union(history);

JavaPairDStream<String, Tuple2<ArrayList<String>, Optional<ArrayList<String>>>> joined =
    Data_1.leftOuterJoin(Data_2).cache();


JavaPairDStream<String, Tuple2<ArrayList<String>, Optional<ArrayList<String>>>> notNULL_join = joined.filter(r -> r._2._2().isPresent());
JavaPairDStream<String, Tuple2<ArrayList<String>, Optional<ArrayList<String>>>> dstream_filtered = joined.filter(r -> !r._2._2().isPresent());

history = dstream_filtered.mapToPair(r -> {
  return new Tuple2<>(r._1,r._2._1);
}).persist;

我在上一步之后获得了历史记录(通过将其保存到hdfs中进行检查),但是在进行联合时,此历史记录仍为空.

I get history after the previous step(checked by saving it to hdfs) , but still this history is empty in batch while doing union.

推荐答案

从概念上讲,不可能记住" DStream . DStreams 是有时间限制的,并且在每个时钟滴答(称为批处理间隔")上, DStream 代表时间段.

It's conceptually not possible to "remember" a DStream. DStreams are time-bound and on each clock-tick (called "batch interval") the DStream represents the observed data in the stream during that period of time.

因此,我们不能保存旧的" DStream 和新的" DStream .所有 DStreams 都位于现在".

Hence, we cannot have an "old" DStream saved to join with a "new" DStream. All DStreams live in the "now".

Dcodes 的基础数据结构是 RDD :每个批处理间隔,我们的 DStream 将具有1个 RDD 在该时间间隔内的数据. RDD s代表数据的分布式集合.只要我们引用了它们, RDD 就是不可变的并且是永久的.

The underlying data structure of DStreams is the RDD: Each batch interval, our DStream will have 1 RDD of the data for that interval. RDDs represent a distributed collection of data. RDDs are immutable and permanent, for as long as we have a reference to them.

我们可以结合使用 RDD s和 DStream s来创建此处所需的历史记录翻转".

We can combine RDDs and DStreams to create the "history roll over" that's required here.

它看起来与问题处理方法非常相似,但仅使用 history RDD .

It looks pretty similar to the approach on the question, but only using the history RDD.

以下是建议的更改的高级视图:

Here's a high-level view of the suggested changes:

var history: RDD[(String, List[String]) = sc.emptyRDD()

val dstream1 = ...
val dstream2 = ...

val historyDStream = dstream1.transform(rdd => rdd.union(history))
val joined = historyDStream.join(dstream2)

... do stuff with joined as above, obtain dstreamFiltered ...

dstreamFiltered.foreachRDD{rdd =>
   val formatted = rdd.map{case (k,(v1,v2)) => (k,v1)} // get rid of the join info
   history.unpersist(false) // unpersist the 'old' history RDD
   history = formatted // assign the new history
   history.persist(StorageLevel.MEMORY_AND_DISK) // cache the computation
   history.count() //action to materialize this transformation
}

这只是一个起点.关于 checkpoint ing,还有其他注意事项.否则, history RDD的沿袭将无限增长,直到发生某些StackOverflow为止.该博客在此特定技术方面相当完整: http://www.spark.tc/stateful-spark-streaming-using-transform/

This is only a starting point. There're additional considerations with regards to checkpointing. Otherwise the lineage of the history RDD will grow unbounded until some StackOverflow happens. This blog is quite complete on this particular technique: http://www.spark.tc/stateful-spark-streaming-using-transform/

我还建议您使用Scala而不是Java.Java语法过于冗长,无法与Spark Streaming一起使用.

I also recommend you using Scala instead of Java. The Java syntax is too verbose to use with Spark Streaming.

这篇关于无法持久保存DStream以在下一批中使用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆