spark结构化流无法接收kafka消息 [英] spark structured streaming can not recive kafka message
问题描述
我正在测试 spark 结构化流使用 kafka.i 在 host28
上有一个 kafka-broker(0.10.1),默认分区 num:num.partitions=1
I am testing spark structured streaming use kafka.i have a kafka-broker(0.10.1) on host28
,default partition num:num.partitions=1
我的制作人:
bin/kafka-console-producer.sh --broker-list host28:6667 --topic test
当我使用
bin/kafka-console-consumer.sh --zookeeper host26:2181,host27:2181,host28:2181 --topic test --from-beginning
或
bin/kafka-console-consumer.sh --bootstrap-server host8:6667 --topic test --from-beginning --partition 0
可以接收来自kafka的消息.
can recive message from kafka.
但是当使用
bin/kafka-console-consumer.sh --bootstrap-server host28:6667 --topic test --from-beginning
或火花结构化流无法接收消息
or spark structured streaming can't recive message
public class Main {
private static String APP_NAE = "test_streaming_from_kafka";
private static String KAFKA_HOST = "host28:6667";
private static String KAFKA_SUBSCRIBE = "test";
public static void main(String[] args) throws Exception {
SparkSession spark = SparkSession
.builder()
.appName(APP_NAE)
.getOrCreate();
DataStreamReader reader = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", KAFKA_HOST)
.option("subscribe", KAFKA_SUBSCRIBE);
StreamingQuery query = reader.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "topic", "CAST(partition AS STRING)", "CAST(offset AS STRING)")
.writeStream()
.format("console")
.start();
query.awaitTermination();
}
}
推荐答案
SOLVED!
我把spark log从INFO
改成了DEBUG
,然后我发现:
I changed spark log from INFO
to DEBUG
,then i found this:
18/08/17 21:12:07 DEBUG AbstractCoordinator: 接收组协调器响应 ClientResponse(receivedTimeMs=1534511527794,断开连接=假,请求=客户端请求(期望响应=真,回调=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@3d2afb1b,request=RequestSend(header={api_key=10,api_version=0,correlation_id=117,client_id=consumer-1},body={group_id=spark-kafka-source-f7b2afd9-e1c6-4d16-b299-6d629599cdc8-42875004-driver-0}),createdTimeMs=1534511527794, sendTimeMs=1534511527794),responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}})18/08/17 21:12:07 DEBUG AbstractCoordinator:组协调器查找团体spark-kafka-source-f7b2afd9-e1c6-4d16-b299-6d629599cdc8-42875004-driver-0失败:小组协调员不可用.
18/08/17 21:12:07 DEBUG AbstractCoordinator: Received group coordinator response ClientResponse(receivedTimeMs=1534511527794, disconnected=false, request=ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@3d2afb1b, request=RequestSend(header={api_key=10,api_version=0,correlation_id=117,client_id=consumer-1}, body={group_id=spark-kafka-source-f7b2afd9-e1c6-4d16-b299-6d629599cdc8-42875004-driver-0}), createdTimeMs=1534511527794, sendTimeMs=1534511527794), responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}}) 18/08/17 21:12:07 DEBUG AbstractCoordinator: Group coordinator lookup for group spark-kafka-source-f7b2afd9-e1c6-4d16-b299-6d629599cdc8-42875004-driver-0 failed: The group coordinator is not available.
google 组协调员不可用
这个
这篇关于spark结构化流无法接收kafka消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!