如何从python客户端将JSON对象发送到kafka [英] how to send JSON object to kafka from python client

查看:46
本文介绍了如何从python客户端将JSON对象发送到kafka的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个简单的 JSON 对象,如下所示

I have a simple JSON object like the following

d = { 'tag ': 'blah',
  'name' : 'sam',
  'score': 
    {'row1': 100,
      'row2': 200
     }
}

以下是我的python代码,它向Kafka发送消息

The following is my python code which is sending messages to Kafka

from kafka import SimpleProducer, KafkaClient
import json 

# To send messages synchronously
kafka = KafkaClient('10.20.30.12:9092')
producer = SimpleProducer(kafka)
jd = json.dumps(d)
producer.send_messages(b'message1',jd)

我在风暴日志中看到消息正在被接收但它正在抛出元组{此处的json结构}的转换为空不确定需要做什么才能解决这个问题?..

I see in the storm logs that the message is being received but its throwing Transformation null for tuple { json structure in here } not sure what needs to be done in order to fix this ?..

推荐答案

下面是我给 kafka 的生产者代码.我唯一不同的是使用 yaml.safe_load 加载 json 内容.它将内容作为字符串而不是 unicode 返回.以下是片段

The below is my code for producer to kafka. The only thing i did differently was to use yaml.safe_load to load the json content. It returns the contents as strings instead of unicode. The following is the snippet

with open('smaller_test_prod.txt') as f:
    for line in f:
        d = yaml.safe_load(line)
        jd = json.dumps(d)
        producer.send_messages(b'zeus_metrics',jd)

这里的每一行都是一个存储在文件中的json数据.

In here every line is a json data stored in a file.

这篇关于如何从python客户端将JSON对象发送到kafka的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆