来自Kafka的火花流中的空值 [英] Null value in spark streaming from Kafka

查看:30
本文介绍了来自Kafka的火花流中的空值的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个简单的程序,因为我正在尝试使用 kafka 接收数据.当我启动一个 kafka 生产者并发送数据时,例如:你好",当我打印消息时得到这个:(null, Hello).我不知道为什么会出现这个空值.有没有办法避免这个空值?我认为这是由于 Tuple2,第一个参数,但我只想打印第二个参数.还有一件事,当我使用 System.out.println("inside map "+ message); 打印它时,它没有出现任何消​​息,有人知道为什么吗?谢谢.

I have a simple program because I'm trying to receive data using kafka. When I start a kafka producer and I send data, for example: "Hello", I get this when I print the message: (null, Hello). And I don't know why this null appears. Is there any way to avoid this null? I think it's due to Tuple2<String, String>, the first parameter, but I only want to print the second parameter. And another thing, when I print that using System.out.println("inside map "+ message); it does not appear any message, does someone know why? Thanks.

public static void main(String[] args){

    SparkConf sparkConf = new SparkConf().setAppName("org.kakfa.spark.ConsumerData").setMaster("local[4]");
    // Substitute 127.0.0.1 with the actual address of your Spark Master (or use "local" to run in local mode
    sparkConf.set("spark.cassandra.connection.host", "127.0.0.1");
    // Create the context with 2 seconds batch size
    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));

    Map<String, Integer> topicMap = new HashMap<>();
    String[] topics = KafkaProperties.TOPIC.split(",");
    for (String topic: topics) {
        topicMap.put(topic, KafkaProperties.NUM_THREADS);
    }
    /* connection to cassandra */
    CassandraConnector connector = CassandraConnector.apply(sparkConf);
    System.out.println("+++++++++++ cassandra connector created ++++++++++++++++++++++++++++");

    /* Receive kafka inputs */
    JavaPairReceiverInputDStream<String, String> messages =
            KafkaUtils.createStream(jssc, KafkaProperties.ZOOKEEPER, KafkaProperties.GROUP_CONSUMER, topicMap);
    System.out.println("+++++++++++++ streaming-kafka connection done +++++++++++++++++++++++++++");

    JavaDStream<String> lines = messages.map(
            new Function<Tuple2<String, String>, String>() {
                public String call(Tuple2<String, String> message) {
                    System.out.println("inside map "+ message);
                    return message._2();
                }
            }
    );

    messages.print();
    jssc.start();
    jssc.awaitTermination();
}

推荐答案

Q1) 空值:Kafka 中的消息是 Keyed 的,这意味着它们都有一个 (Key, Value) 结构.当您看到 (null, Hello) 时,是因为生产者在主题中发布了一个 (null,"Hello") 值.如果你想在你的过程中省略key,映射原来的Dtream来去掉key:kafkaDStream.map( new Function() {...})

Q1) Null values: Messages in Kafka are Keyed, that means they all have a (Key, Value) structure. When you see (null, Hello) is because the producer published a (null,"Hello") value in a topic. If you want to omit the key in your process, map the original Dtream to remove the key: kafkaDStream.map( new Function<String,String>() {...})

Q2) System.out.println("inside map "+ message); 不打印.几个经典原因:

Q2) System.out.println("inside map "+ message); does not print. A couple of classical reasons:

  1. 转换应用在执行器中,因此在集群中运行时,该输出将出现在执行器中,而不是主服务器上.

  1. Transformations are applied in the executors, so when running in a cluster, that output will appear in the executors and not on the master.

操作是惰性的,需要实现 DStream 才能应用操作.

Operations are lazy and DStreams need to be materialized for operations to be applied.

在这种特定情况下,JavaDStream 永远不会具体化,即不用于输出操作.因此 map 永远不会被执行.

In this specific case, the JavaDStream<String> lines is never materialized i.e. not used for an output operation. Therefore the map is never executed.

这篇关于来自Kafka的火花流中的空值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆