如何通过Kafka连续读取文件? [英] How to continuously read files by Kafka?

查看:161
本文介绍了如何通过Kafka连续读取文件?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是Kafka新手.

我尝试了一些文件读取示例,并在我的项目中应用了几个星期.但是,我的应用程序似乎无法正常运行,因此我需要您的建议.

I have tried some file reading examples and applied to my project for couple of weeks. However, my application does not seem to work as I wanted so I'm asking for your advise.

我的意图是:

  1. Kafka生产者从目录A读取文件.
  2. 暴风雨使用从1产生的数据.
  3. 将读取的文件移到其他目录.

条件:

  1. 文件连续发送到目录A.

这是一个简单的逻辑,但是却让我头疼.

It is a simple logic but it gives me headache.

到目前为止,我已经在本地计算机eclipse上创建并测试了Kafka生产者代码.

So far I have created and tested Kafka producer code on my local computer eclipse.

我的想法是,由于kafka生产者应继续读取文件,因此即使读取目录A中的所有文件,该过程也必须保持活动状态.但是,一旦目录A中的所有文件都已被读取并发送,它就会终止.

What I thought is, because kafka producer should keep on reading files, the process has to be kept alive even if all the files in directory A are read. But instead it terminates as soon as all the files in directory A have been read and sent.

我在具有3个代理的单个节点上运行Kafka,以下是Producer属性设置.

I run Kafka on a single node with 3 brokers, and the following is Producer properties setting.

Properties props = new Properties();

props.put("metadata.broker.list", "localhost:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("producer.type", "sync");
props.put("request.required.acks", "1");

主题已使用以下命令创建.

Topic has been created with the following command.

bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 1 --replication-factor 1 --topic test

从Kafka的架构角度来看,我认为持续不断地读取文件错误吗?还是有我找不到的方法?如果有人可以回答我的问题,我将不胜感激.

Is my thought of continuously file reading wrong in Kafka's architectural perspective? Or is there a way that I yet to find for? I'd be very appreciated if someone can answer my problems.

推荐答案

通过使用Kafka Connect

By Using Kafka Connect

#File connect-standalone.properties

#bootstrap kafka servers
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

# local file storing offsets and config data
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=C:\\KafkaSetup\\kafka_2.13-2.4.0\\libs

# File myFileConnector.properties 
name=local-file-source
connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector
tasks.max=1
file=C:\\KafkaSetup\\input\\data.txt
topic=aryan_topic

#Command
C:\KafkaSetup\kafka_2.13-2.4.0\bin\windows>connect-standalone.bat C:\\KafkaSetup\\kafka_2.13-2.4.0\\config\\connect-standalone.properties C:\\KafkaSetup\\kafka_2.13-2.4.0\\config\\myFileConnector.properties 

#data.txt contains
Hello By Arun

#Kafka client
C:\KafkaSetup\kafka_2.13-2.4.0\bin\windows>kafka-console-consumer --bootstrap-server 127.0.0.1:9094 --topic aryan_topic
i]3
gh
"Hello By Arun"

这篇关于如何通过Kafka连续读取文件?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆