来自Kafka的Spark流中的Null值 [英] Null value in spark streaming from Kafka
问题描述
我有一个简单的程序,因为我正尝试使用 kafka
接收数据.当我启动kafka生产者并发送数据时,例如:"Hello",当我打印消息:(空,Hello)
时,我得到了这个信息.而且我不知道为什么会出现此null.有什么办法可以避免这个空值?我认为这是由于 Tuple2< String,String>
,第一个参数,但是我只想打印第二个参数.另外,当我使用 System.out.println("inside map" + message);
进行打印时,它没有出现任何消息,有人知道为什么吗?谢谢.
I have a simple program because I'm trying to receive data using kafka
. When I start a kafka producer and I send data, for example: "Hello", I get this when I print the message: (null, Hello)
. And I don't know why this null appears. Is there any way to avoid this null? I think it's due to Tuple2<String, String>
, the first parameter, but I only want to print the second parameter. And another thing, when I print that using System.out.println("inside map "+ message);
it does not appear any message, does someone know why? Thanks.
public static void main(String[] args){
SparkConf sparkConf = new SparkConf().setAppName("org.kakfa.spark.ConsumerData").setMaster("local[4]");
// Substitute 127.0.0.1 with the actual address of your Spark Master (or use "local" to run in local mode
sparkConf.set("spark.cassandra.connection.host", "127.0.0.1");
// Create the context with 2 seconds batch size
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
Map<String, Integer> topicMap = new HashMap<>();
String[] topics = KafkaProperties.TOPIC.split(",");
for (String topic: topics) {
topicMap.put(topic, KafkaProperties.NUM_THREADS);
}
/* connection to cassandra */
CassandraConnector connector = CassandraConnector.apply(sparkConf);
System.out.println("+++++++++++ cassandra connector created ++++++++++++++++++++++++++++");
/* Receive kafka inputs */
JavaPairReceiverInputDStream<String, String> messages =
KafkaUtils.createStream(jssc, KafkaProperties.ZOOKEEPER, KafkaProperties.GROUP_CONSUMER, topicMap);
System.out.println("+++++++++++++ streaming-kafka connection done +++++++++++++++++++++++++++");
JavaDStream<String> lines = messages.map(
new Function<Tuple2<String, String>, String>() {
public String call(Tuple2<String, String> message) {
System.out.println("inside map "+ message);
return message._2();
}
}
);
messages.print();
jssc.start();
jssc.awaitTermination();
}
推荐答案
Q1)空值:Kafka中的消息是键控的,这意味着它们都具有(Key,Value)结构.当您看到(null,Hello)
时,是因为生产者在主题中发布了(null,"Hello")
值.如果要在过程中省略键,请映射原始的 Dtream
以删除键: kafkaDStream.map(new Function< String,String>(){...})
Q1) Null values:
Messages in Kafka are Keyed, that means they all have a (Key, Value) structure.
When you see (null, Hello)
is because the producer published a (null,"Hello")
value in a topic.
If you want to omit the key in your process, map the original Dtream
to remove the key: kafkaDStream.map( new Function<String,String>() {...})
Q2) System.out.println("inside map" + message);
不打印.几个经典原因:
Q2) System.out.println("inside map "+ message);
does not print. A couple of classical reasons:
-
转换是在执行程序中应用的,因此,在集群中运行时,该输出将显示在执行程序中,而不是在主服务器上.
Transformations are applied in the executors, so when running in a cluster, that output will appear in the executors and not on the master.
操作是惰性的,需要实现DStream才能应用操作.
Operations are lazy and DStreams need to be materialized for operations to be applied.
在这种特定情况下, JavaDStream< String>行
永远不会实现,即不用于输出操作.因此,永远不会执行 map
.
In this specific case, the JavaDStream<String> lines
is never materialized i.e. not used for an output operation. Therefore the map
is never executed.
这篇关于来自Kafka的Spark流中的Null值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!