Kafka + Spark Streaming:1 秒的恒定延迟 [英] Kafka + Spark Streaming: constant delay of 1 second

查看:32
本文介绍了Kafka + Spark Streaming:1 秒的恒定延迟的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

最后,我使用 Java 制作了自己的生产者,并且运行良好,所以问题出在 Kafka-console-producer.kafka-console-consumer 运行良好.

Finally I have made my own producer using Java and it works well, so the problem is in the Kafka-console-producer. The kafka-console-consumer works well.

我已经尝试过 0.9.0.1 版本并且具有相同的行为.

I have already tried with the version 0.9.0.1 and has the same behaviour.

我正在完成我的学士期末项目,即 Spark Streaming 和 Flink 之间的比较.在这两个框架之前,我使用 Kafka 和脚本来生成数据(如下所述).我的第一个测试是比较两个框架与简单工作负载之间的延迟,而 Kafka 给了我一个非常高的延迟(持续 1 秒).为简单起见,目前我只在一台机器上运行 Kafka 和 Spark.

I am working on my bachelor's final project, a comparison between Spark Streaming and Flink. Before both frameworks I am using Kafka and a script to generate the data (explained below). My first test is to compare the latency between both frameworks with simple workloads and Kafka is giving me a really high latency (1 second constantly). For simplicity, for the moment I am running in only one machine both Kafka and Spark.

我已经寻找并发现了类似的问题,并尝试了他们提供的解决方案,但没有任何改变.我还检查了官方文档中的所有 Kafka 配置,并将延迟的重要信息放在我的配置文件中,这是我的配置:

I have already looked for and found similar problems, and tried the solutions they give but nothing changed. I have checked also all the Kafka configurations in the official documentation and put the importants for the latency in my config files, this is my configuration:

Kafka 0.10.2.1 - Spark 2.1.0

Kafka 0.10.2.1 - Spark 2.1.0

server.properties:

num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
num.partitions=2
num.recovery.threads.per.data.dir=1
log.flush.interval.messages=1000
log.flush.interval.ms=50
log.retention.hours=24
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
flush.messages=100
flush.ms=10

producer.properties:

compression.type=none
max.block.ms=200
linger.ms=50
batch.size=0

Spark Streaming 程序:(打印接收到的数据,以及数据创建时间和函数处理时间之间的差异)

Spark Streaming program: (which prints the received data, and the difference between when the data was created and when is being processed for the function)

package com.tfg.spark1.spark1;

import java.util.Map;
import java.util.HashMap;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;
import org.apache.spark.streaming.kafka.*;

public final class Timestamp {

    public static void main(String[] args) throws Exception {
        if (args.length < 2) {
            System.err.println("Usage: Timestamp <topics> <numThreads>");
            System.exit(1);
        }

        SparkConf conf = new SparkConf().setMaster("spark://192.168.0.155:7077").setAppName("Timestamp");
        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.milliseconds(100));


        Map<String, Integer> topicMap = new HashMap<String, Integer>();
        int numThreads = Integer.parseInt(args[1]);
        topicMap.put(args[0], numThreads);

        JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, "192.168.0.155:2181", "grupo-spark", topicMap); //Map<"test", 2>

        JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
            private static final long serialVersionUID = 1L;

            public String call (Tuple2<String, String> tuple2) {
                return tuple2._2();
            }
        });

        JavaDStream<String> newLine = lines.map(new Function<String, String>() {
            private static final long serialVersionUID = 1L;

            public String call(String line) {
                String[] tuple = line.split(" ");
                String totalTime = String.valueOf(System.currentTimeMillis() - Long.valueOf(tuple[1]));
                //String newLine = line.concat(" " + String.valueOf(System.currentTimeMillis()) + " " + totalTime);

                return totalTime;
            }
        });

        lines.print();
        newLine.print();

        jssc.start();
        jssc.awaitTermination();
    }
}

生成的数据具有以下格式:

"Random bits" + " " + "current time in ms"
01 1496421618634
11 1496421619044
00 1496421619451
00 1496421618836
10 1496421619247

最后,当我运行 Spark Streaming 程序和每 200 毫秒生成一次数据的脚本生成器时,Spark(批处理间隔 = 100 毫秒)打印 9 个空批次,并且每秒(始终为 900 毫秒时刻,如本例中所示:时间:1496421619900 毫秒)这个结果:

Finally when I run my Spark Streaming program and the script generator, which generates the data every 200ms, Spark (batch interval=100ms) prints 9 empty batches, and every second (always 900ms moment, like in this example: Time: 1496421619900 ms) this results:

-------------------------------------------
Time: 1496421619900 ms
-------------------------------------------
01 1496421618634
11 1496421619044
00 1496421619451
00 1496421618836
10 1496421619247
-------------------------------------------
Time: 1496421619900 ms
-------------------------------------------
1416
1006
599
1214
803

此外,如果我运行一个 Kafka 命令行生产者和另一个命令行消费者,在消费者中打印生产的数据总是需要一些时间.

Also if I run one Kafka command-line-producer and another command-line-consumer, it always takes some time to print the produced data in the consumer.

预先感谢您的帮助!

推荐答案

我刚刚更新了您打开的 JIRA,原因是您总是看到 1000 毫秒的延迟.

I have just updated the JIRA you opened with the reason why you always see the 1000 ms delay.

https://issues.apache.org/jira/browse/KAFKA-5426

我在这里报告原因......

I report here the reason ...

linger.ms 参数使用命令行上的 --timeout 选项设置,如果未指定,则为 1000 毫秒.同时,batch.size 参数使用命令行上的 --max-partition-memory-bytes 选项设置,如果未指定,则为 16384.这意味着即使您使用 --producer-property 或 --producer.config 指定 linger.ms 和 batch.size,它们也将始终被上述特定"选项覆盖.

the linger.ms parameter is set using the --timeout option on the command line which if not specified is 1000 ms. At same time the batch.size parameter is set using the --max-partition-memory-bytes option on the command line which if not specified is 16384. It means that even if you specify linger.ms and batch.size using --producer-property or --producer.config, they will be always overwritten by the above "specific" options.

这篇关于Kafka + Spark Streaming:1 秒的恒定延迟的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆