在Kafka主题内的特定分区中使用Spark流数据 [英] Stream data using Spark from a partiticular partition within Kafka topics
问题描述
我已经看到与单击此处类似的问题
I have already seen a similar question as clickhere
但是我仍然想知道是否无法从特定分区流式传输数据?我在Spark Streaming订阅方法中使用了 Kafka消费者策略 .
But still I want to know if streaming data from a particular partition not possible? I have used Kafka Consumer Strategies in Spark Streaming subscribe method.
ConsumerStrategies.Subscribe [String,String](topics,kafkaParams, 偏移量)
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offsets)
这是我尝试订阅主题和分区的代码段,
This is the code snippet I tried out for subscribing to topic and partition,
val topics = Array("cdc-classic")
val topic="cdc-classic"
val partition=2;
val offsets=
Map(new TopicPartition(topic, partition) -> 2L)//I am not clear with this line, (I tried to set topic and partition number as 2)
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams,offsets))
但是当我运行这段代码时,我得到了以下异常,
But whenI run this code I get the following exception,
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 0.0 failed 1 times, most recent failure: Lost task 5.0 in stage 0.0 (TID 5, localhost, executor driver): org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {cdc-classic-2=2}
at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:878)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:525)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1110)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:70)
Caused by: org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {cdc-classic-2=2}
at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:878)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:525)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1110)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
PS:cdc-classic是具有17个分区的主题名称
推荐答案
在该行中指定分区号和分区的起始偏移量以流式传输数据,
Specify the partition number and starting offset of the partition to stream data in this line ,
Map(new TopicPartition(topic, partition) -> 2L)
哪里
-
partition是分区号
partition is the partition number
2L是指分区的起始偏移号.
2L refers to the starting offset number of the partition.
然后,我们可以流式传输来自选定分区的数据.
Then we can stream the data from selected partitions.
这篇关于在Kafka主题内的特定分区中使用Spark流数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!