在Kafka-Python中流式传输CSV数据 [英] Stream CSV data in Kafka-Python

查看:421
本文介绍了在Kafka-Python中流式传输CSV数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

使用Kafka-Python将CSV数据发送到Kafka主题.消费者成功发送和接收数据.现在,我试图连续流式处理csv文件,添加到文件中的任何新条目都应自动发送到Kafka主题.任何建议对于连续流CSV文件都是有帮助的

Am sending the CSV data to Kafka topic using Kafka-Python. Data is sent and received by Consumer successfully. Now am trying to stream a csv file continuously, any new entry added to the file should be automatically sent to Kafka topic. Any suggestion would be helpful on continuous streaming of CSV file

下面是我现有的代码,

   from kafka import KafkaProducer
   import logging
   from json import dumps, loads
   import csv
   logging.basicConfig(level=logging.INFO)


   producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092', value_serializer=lambda 
   K:dumps(K).encode('utf-8'))

   with open('C:/Hadoop/Data/Job.csv', 'r') as file:
   reader = csv.reader(file, delimiter = '\t')
       for messages in reader:
       producer.send('Jim_Topic', messages)
       producer.flush()

推荐答案

Kafka Connect(Apache Kafka的一部分)是在Kafka与其他系统(包括平面文件)之间进行提取和导出的好方法.

Kafka Connect (part of Apache Kafka) is a good way to do ingest and egress between Kafka and other systems, including flat files.

您可以使用 Kafka Connect SpoolDir连接器将CSV文件流式传输到Kafka.从 Confluent Hub 安装,然后为您提供配置源文件:

You can use the Kafka Connect SpoolDir connector to stream CSV files into Kafka. Install it from Confluent Hub, and then provide it with configuration for your source file:

curl -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/source-csv-spooldir-00/config \
    -d '{
        "connector.class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector",
        "topic": "orders_spooldir_00",
        "input.path": "/data/unprocessed",
        "finished.path": "/data/processed",
        "error.path": "/data/error",
        "input.file.pattern": ".*\\.csv",
        "schema.generation.enabled":"true",
        "csv.first.row.as.header":"true"
        }'

有关更多示例,请参见此博客和详细信息.

See this blog for more examples and details.

这篇关于在Kafka-Python中流式传输CSV数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆