Spark Structured Stream仅从Kafka的一个分区获取消息 [英] Spark Structured Stream get messages from only one partition of Kafka

查看:29
本文介绍了Spark Structured Stream仅从Kafka的一个分区获取消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我遇到过 Spark 只能从 Kafka 2-patition 主题的一个分区流式传输和获取消息的情况.

I got the situation when spark can stream and get messages from only one partition of Kafka 2-patition topic.

我的主题:C:\bigdata\kafka_2.11-0.10.1.1\bin\windows>kafka-topics --create --zookeeper localhost:2181 --partitions 2 --replication-factor 1 --topic test4

卡夫卡制作人:

public class KafkaFileProducer {

// kafka producer
Producer<String, String> producer;

public KafkaFileProducer() {

    // configs
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("acks", "all");
    //props.put("group.id", "testgroup");
    props.put("batch.size", "16384");
    props.put("auto.commit.interval.ms", "1000");
    props.put("linger.ms", "0");
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("block.on.buffer.full", "true");

    // instantiate a producer
    producer = new KafkaProducer<String, String>(props);
}

/**
 * @param filePath
 */
public void sendFile(String filePath) {
    FileInputStream fis;
    BufferedReader br = null;

    try {
        fis = new FileInputStream(filePath);

        //Construct BufferedReader from InputStreamReader
        br = new BufferedReader(new InputStreamReader(fis));

        int count = 0;

        String line = null;
        while ((line = br.readLine()) != null) {
            count ++;
            // dont send the header
            if (count > 1) {
                producer.send(new ProducerRecord<String, String>("test4", count + "", line));
                Thread.sleep(10);
            }
        }

        System.out.println("Sent " + count + " lines of data");
    } catch (Exception e) {
        e.printStackTrace();
    }finally{
        try {
            br.close();
        } catch (IOException e) {
            e.printStackTrace();
        }

        producer.close();
    }
}

}

Spark 结构化流:

Spark Structured Stream:

System.setProperty("hadoop.home.dir", "C:\\bigdata\\winutils");

    final SparkSession sparkSession = SparkSession.builder().appName("Spark Data Processing").master("local[2]").getOrCreate();

    // create kafka stream to get the lines
    Dataset<Tuple2<String, String>> stream = sparkSession
            .readStream()
            .format("kafka")
            .option("kafka.bootstrap.servers", "localhost:9092")
            .option("subscribe", "test4")
            .option("startingOffsets", "{\"test4\":{\"0\":-1,\"1\":-1}}")
            .option("failOnDataLoss", "false")
            .load().selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as(Encoders.tuple(Encoders.STRING(), Encoders.STRING()));

    Dataset<String> lines = stream.map((MapFunction<Tuple2<String, String>, String>) (Tuple2<String, String> tuple) -> tuple._2, Encoders.STRING());
    Dataset<Row> result = lines.groupBy().count();
     // Start running the query that prints the running counts to the console
    StreamingQuery query = result//.orderBy("callTimeBin")
            .writeStream()
            .outputMode("complete")
            .format("console")
            .start();


    // wait for the query to finish
    try {
        query.awaitTermination();
    } catch (StreamingQueryException e) {
        e.printStackTrace();
    }

当我运行生产者在一个文件中发送 100 行时,查询只返回了 51 行.我阅读了 spark 的调试日志并注意到以下内容:

When I run the producer to send 100 lines in a file, the query only returned 51 lines. I read the debug logs of spark and noticed something as follow:

17/02/15 10:52:49 DEBUG StreamExecution: Execution stats: ExecutionStats(Map(),List(),Map(watermark -> 1970-01-01T00:00:00.000Z))
17/02/15 10:52:49 DEBUG StreamExecution: Starting Trigger Calculation
17/02/15 10:52:49 DEBUG KafkaConsumer: Pausing partition test4-1
17/02/15 10:52:49 DEBUG KafkaConsumer: Pausing partition test4-0
17/02/15 10:52:49 DEBUG KafkaSource: Partitions assigned to consumer: [test4-1, test4-0]. Seeking to the end.
17/02/15 10:52:49 DEBUG KafkaConsumer: Seeking to end of partition test4-1
17/02/15 10:52:49 DEBUG KafkaConsumer: Seeking to end of partition test4-0
17/02/15 10:52:49 DEBUG Fetcher: Resetting offset for partition test4-1 to latest offset.
17/02/15 10:52:49 DEBUG Fetcher: **Fetched {timestamp=-1, offset=49} for partition test4-1
17/02/15 10:52:49 DEBUG Fetcher: Resetting offset for partition test4-1 to earliest offset.
17/02/15 10:52:49 DEBUG Fetcher: Fetched {timestamp=-1, offset=0} for partition test4-1**
17/02/15 10:52:49 DEBUG Fetcher: Resetting offset for partition test4-0 to latest offset.
17/02/15 10:52:49 DEBUG Fetcher: Fetched {timestamp=-1, offset=51} for partition test4-0
17/02/15 10:52:49 DEBUG KafkaSource: Got latest offsets for partition : Map(test4-1 -> 0, test4-0 -> 51)
17/02/15 10:52:49 DEBUG KafkaSource: GetOffset: ArrayBuffer((test4-0,51), (test4-1,0))
17/02/15 10:52:49 DEBUG StreamExecution: getOffset took 0 ms
17/02/15 10:52:49 DEBUG StreamExecution: triggerExecution took 0 ms

我不知道为什么 test4-1 总是被重置为最简单的偏移量.

I don't know why test4-1 is always reset to ealiest offset.

如果有人知道如何从所有分区获取所有消息,我将不胜感激.谢谢,

If someone know how to get all messages from all partitions, I would greatly appreciate. Thanks,

推荐答案

0.10.1.* 客户端存在一个已知的 Kafka 问题:https://issues.apache.org/jira/browse/KAFKA-4547

There is a known Kafka issue in 0.10.1.* client: https://issues.apache.org/jira/browse/KAFKA-4547

现在您可以使用 0.10.0.1 客户端作为解决方法.它可以与 Kafka 0.10.1.* 集群通信.

Right now you can use 0.10.0.1 client as a workaround. It can talk to a Kafka 0.10.1.* cluster.

参见 https://issues.apache.org/jira/browse/SPARK-18779 了解更多详情.

See https://issues.apache.org/jira/browse/SPARK-18779 for more details.

这篇关于Spark Structured Stream仅从Kafka的一个分区获取消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆