无法使用kafka命令行将json tweets事件发送到Kafka主题/生产者 [英] Not able to send json tweets events to Kafka topic/producer using kafka command line

查看:93
本文介绍了无法使用kafka命令行将json tweets事件发送到Kafka主题/生产者的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我创建了一个python脚本raw_tweets_stream.py,以使用twitter api流Twitter数据.使用以下脚本将来自twitter的json数据发送给kafka生产者.

I have created a python script raw_tweets_stream.py to stream twitter data using twitter api. The json data from twitter is pipped to kafka producer using the script below.

`python raw_tweets_stream.py | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list localhost:2181 --topic raw_json_tweets`


raw_json_tweets是为这些推文创建的kafka主题. python脚本raw_tweets_stream.py运行得很好,但是在将其发送给kafka生产者时会抛出错误.我正在使用Hortonworks HDP 2.3.1沙箱,并且已经确保启动了zookeeper和kafka.


raw_json_tweets is the kafka topic created for these tweets. The python script raw_tweets_stream.py runs just fine but it throws error while sending it to the kafka producer. I am using Hortonworks HDP 2.3.1 sandbox and I have made sure that the zookeeper and kafka are started.

/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic raw_json_tweets

Topic:raw_json_tweets      PartitionCount:1        ReplicationFactor:1     Configs:
            Topic: raw_json_tweets     Partition: 0    Leader: 0       Replicas: 0     Isr: 0

错误:

[2016-08-25 22:36:26,212] ERROR Failed to send requests for topics raw_json_tweets with correlation ids in [57,64] (kafka.producer.async.DefaultEventHandler)
[2016-08-25 22:36:26,213] ERROR Error in handling batch of 131 events (kafka.producer.async.ProducerSendThread)
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
        at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:91)
        at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
        at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
        at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
        at scala.collection.immutable.Stream.foreach(Stream.scala:547)
        at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
        at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
[2016-08-25 22:36:27,217] WARN Fetching topic metadata with correlation id 65 for topics [Set(json_tweets1)] from broker [BrokerEndPoint(0,localhost,2181)] failed (kafka.client.ClientUtils$)
java.io.EOFException: Received -1 when reading from channel, socket has likely been closed.
        at kafka.utils.CoreUtils$.read(CoreUtils.scala:193)
        at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
        at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
        at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
        at kafka.network.BlockingChannel.receive(BlockingChannel.scala:131)
        at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:77)
        at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:74)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:115)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
        at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
        at kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
        at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:188)
        at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:152)
        at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:151)
        at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96)
        at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)
        at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
        at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
        at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
        at scala.collection.immutable.Stream.foreach(Stream.scala:547)
        at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
        at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)


更新:解决方案

  1. 转到Ambari Services,并将Kafka日志目录更改为/tmp/kafka-logs.
  2. 修改了原始脚本,以包括正确的端口和主机名.

  1. Went to Ambari Services and changed the Kafka logs directory to /tmp/kafka-logs.
  2. Modified the original script to include the correct port and hostname.

python raw_tweets_stream.py | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list sandbox.hortonworks.com:6667 --topic raw_json_tweets

验证是否已使用控制台使用者将事件发送到kafka主题.

Verified that the events are sent to the kafka topic using console consumer.

/usr/hdp/2.3.0.0-2557/kafka/bin/kafka-console-consumer.sh -zookeeper sandbox.hortonworks.com:2181 -topic raw_json_tweets -from-beginning

推荐答案

您似乎将--broker-list指向了zookeeper(2181),相反,您需要指向默认端口为6667在Ambari上.

It looks like you're pointing the --broker-list at zookeeper (2181) instead you need to point at the kafka broker whose default port is 9092 or 6667 on Ambari.

这篇关于无法使用kafka命令行将json tweets事件发送到Kafka主题/生产者的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆