在Kafka主题内的特定分区中使用Spark流数据 [英] Stream data using Spark from a partiticular partition within Kafka topics

查看:274
本文介绍了在Kafka主题内的特定分区中使用Spark流数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经看到与单击此处类似的问题

I have already seen a similar question as clickhere

但是我仍然想知道是否无法从特定分区流式传输数据?我在Spark Streaming订阅方法中使用了 Kafka消费者策略 .

But still I want to know if streaming data from a particular partition not possible? I have used Kafka Consumer Strategies in Spark Streaming subscribe method.

ConsumerStrategies.Subscribe [String,String](topics,kafkaParams, 偏移量)

ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offsets)

这是我尝试订阅主题和分区的代码段,

This is the code snippet I tried out for subscribing to topic and partition,

val topics = Array("cdc-classic")
val topic="cdc-classic"
val partition=2;
val offsets= 
Map(new TopicPartition(topic, partition) -> 2L)//I am not clear with this line, (I tried to set topic and partition number as 2)
val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams,offsets))

但是当我运行这段代码时,我得到了以下异常,

But whenI run this code I get the following exception,

     Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 0.0 failed 1 times, most recent failure: Lost task 5.0 in stage 0.0 (TID 5, localhost, executor driver): org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {cdc-classic-2=2}
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:878)
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:525)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1110)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
    at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
    at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:70)
Caused by: org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {cdc-classic-2=2}
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:878)
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:525)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1110)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
    at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)

PS:cdc-classic是具有17个分区的主题名称

推荐答案

在该行中指定分区号和分区的起始偏移量以流式传输数据,

Specify the partition number and starting offset of the partition to stream data in this line ,

Map(new TopicPartition(topic, partition) -> 2L)

哪里

  • partition是分区号

  • partition is the partition number

2L是指分区的起始偏移号.

2L refers to the starting offset number of the partition.

然后,我们可以流式传输来自选定分区的数据.

Then we can stream the data from selected partitions.

这篇关于在Kafka主题内的特定分区中使用Spark流数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆