来自Kafka的Spark流中的Null值 [英] Null value in spark streaming from Kafka

查看:117
本文介绍了来自Kafka的Spark流中的Null值的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个简单的程序,因为我正尝试使用 kafka 接收数据.当我启动kafka生产者并发送数据时,例如:"Hello",当我打印消息:(空,Hello)时,我得到了这个信息.而且我不知道为什么会出现此null.有什么办法可以避免这个空值?我认为这是由于 Tuple2< String,String> ,第一个参数,但是我只想打印第二个参数.另外,当我使用 System.out.println("inside map" + message); 进行打印时,它没有出现任何消​​息,有人知道为什么吗?谢谢.

I have a simple program because I'm trying to receive data using kafka. When I start a kafka producer and I send data, for example: "Hello", I get this when I print the message: (null, Hello). And I don't know why this null appears. Is there any way to avoid this null? I think it's due to Tuple2<String, String>, the first parameter, but I only want to print the second parameter. And another thing, when I print that using System.out.println("inside map "+ message); it does not appear any message, does someone know why? Thanks.

public static void main(String[] args){

    SparkConf sparkConf = new SparkConf().setAppName("org.kakfa.spark.ConsumerData").setMaster("local[4]");
    // Substitute 127.0.0.1 with the actual address of your Spark Master (or use "local" to run in local mode
    sparkConf.set("spark.cassandra.connection.host", "127.0.0.1");
    // Create the context with 2 seconds batch size
    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));

    Map<String, Integer> topicMap = new HashMap<>();
    String[] topics = KafkaProperties.TOPIC.split(",");
    for (String topic: topics) {
        topicMap.put(topic, KafkaProperties.NUM_THREADS);
    }
    /* connection to cassandra */
    CassandraConnector connector = CassandraConnector.apply(sparkConf);
    System.out.println("+++++++++++ cassandra connector created ++++++++++++++++++++++++++++");

    /* Receive kafka inputs */
    JavaPairReceiverInputDStream<String, String> messages =
            KafkaUtils.createStream(jssc, KafkaProperties.ZOOKEEPER, KafkaProperties.GROUP_CONSUMER, topicMap);
    System.out.println("+++++++++++++ streaming-kafka connection done +++++++++++++++++++++++++++");

    JavaDStream<String> lines = messages.map(
            new Function<Tuple2<String, String>, String>() {
                public String call(Tuple2<String, String> message) {
                    System.out.println("inside map "+ message);
                    return message._2();
                }
            }
    );

    messages.print();
    jssc.start();
    jssc.awaitTermination();
}

推荐答案

Q1)空值:Kafka中的消息是键控的,这意味着它们都具有(Key,Value)结构.当您看到(null,Hello)时,是因为生产者在主题中发布了(null,"Hello")值.如果要在过程中省略键,请映射原始的 Dtream 以删除键: kafkaDStream.map(new Function< String,String>(){...})

Q1) Null values: Messages in Kafka are Keyed, that means they all have a (Key, Value) structure. When you see (null, Hello) is because the producer published a (null,"Hello") value in a topic. If you want to omit the key in your process, map the original Dtream to remove the key: kafkaDStream.map( new Function<String,String>() {...})

Q2) System.out.println("inside map" + message); 不打印.几个经典原因:

Q2) System.out.println("inside map "+ message); does not print. A couple of classical reasons:

  1. 转换是在执行程序中应用的,因此,在集群中运行时,该输出将显示在执行程序中,而不是在主服务器上.

  1. Transformations are applied in the executors, so when running in a cluster, that output will appear in the executors and not on the master.

操作是惰性的,需要实现DStream才能应用操作.

Operations are lazy and DStreams need to be materialized for operations to be applied.

在这种特定情况下, JavaDStream< String>行永远不会实现,即不用于输出操作.因此,永远不会执行 map .

In this specific case, the JavaDStream<String> lines is never materialized i.e. not used for an output operation. Therefore the map is never executed.

这篇关于来自Kafka的Spark流中的Null值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆