使用 Spark 从 Kafka 主题中的特定分区流式传输数据 [英] Stream data using Spark from a partiticular partition within Kafka topics
问题描述
我已经看到一个类似的问题 clickhere一个>
I have already seen a similar question as clickhere
但我仍然想知道是否不可能从特定分区流式传输数据?我在 Spark Streaming 订阅方法中使用了 Kafka Consumer Strategies.
But still I want to know if streaming data from a particular partition not possible? I have used Kafka Consumer Strategies in Spark Streaming subscribe method.
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams,偏移量)
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offsets)
这是我尝试订阅主题和分区的代码片段,
This is the code snippet I tried out for subscribing to topic and partition,
val topics = Array("cdc-classic")
val topic="cdc-classic"
val partition=2;
val offsets=
Map(new TopicPartition(topic, partition) -> 2L)//I am not clear with this line, (I tried to set topic and partition number as 2)
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams,offsets))
但是当我运行此代码时,出现以下异常,
But whenI run this code I get the following exception,
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 0.0 failed 1 times, most recent failure: Lost task 5.0 in stage 0.0 (TID 5, localhost, executor driver): org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {cdc-classic-2=2}
at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:878)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:525)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1110)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:70)
Caused by: org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {cdc-classic-2=2}
at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:878)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:525)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1110)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
P.S:cdc-classic 是 17 个分区的主题名称
推荐答案
指定分区编号和该分区的起始偏移量在这一行流式传输数据,
Specify the partition number and starting offset of the partition to stream data in this line ,
Map(new TopicPartition(topic, partition) -> 2L)
哪里,
partition 是分区号
partition is the partition number
2L 是指分区的起始偏移量.
2L refers to the starting offset number of the partition.
然后我们可以从选定的分区流式传输数据.
Then we can stream the data from selected partitions.
这篇关于使用 Spark 从 Kafka 主题中的特定分区流式传输数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!