如何通过Kafka连续读取文件? [英] How to continuously read files by Kafka?
问题描述
我是Kafka新手.
我尝试了一些文件读取示例,并在我的项目中应用了几个星期.但是,我的应用程序似乎无法正常运行,因此我需要您的建议.
I have tried some file reading examples and applied to my project for couple of weeks. However, my application does not seem to work as I wanted so I'm asking for your advise.
我的意图是:
- Kafka生产者从目录A读取文件.
- 暴风雨使用从1产生的数据.
- 将读取的文件移到其他目录.
条件:
- 文件连续发送到目录A.
这是一个简单的逻辑,但是却让我头疼.
It is a simple logic but it gives me headache.
到目前为止,我已经在本地计算机eclipse上创建并测试了Kafka生产者代码.
So far I have created and tested Kafka producer code on my local computer eclipse.
我的想法是,由于kafka生产者应继续读取文件,因此即使读取目录A中的所有文件,该过程也必须保持活动状态.但是,一旦目录A中的所有文件都已被读取并发送,它就会终止.
What I thought is, because kafka producer should keep on reading files, the process has to be kept alive even if all the files in directory A are read. But instead it terminates as soon as all the files in directory A have been read and sent.
我在具有3个代理的单个节点上运行Kafka,以下是Producer属性设置.
I run Kafka on a single node with 3 brokers, and the following is Producer properties setting.
Properties props = new Properties();
props.put("metadata.broker.list", "localhost:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("producer.type", "sync");
props.put("request.required.acks", "1");
主题已使用以下命令创建.
Topic has been created with the following command.
bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 1 --replication-factor 1 --topic test
从Kafka的架构角度来看,我认为持续不断地读取文件错误吗?还是有我找不到的方法?如果有人可以回答我的问题,我将不胜感激.
Is my thought of continuously file reading wrong in Kafka's architectural perspective? Or is there a way that I yet to find for? I'd be very appreciated if someone can answer my problems.
推荐答案
通过使用Kafka Connect
By Using Kafka Connect
#File connect-standalone.properties
#bootstrap kafka servers
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
# local file storing offsets and config data
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=C:\\KafkaSetup\\kafka_2.13-2.4.0\\libs
# File myFileConnector.properties
name=local-file-source
connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector
tasks.max=1
file=C:\\KafkaSetup\\input\\data.txt
topic=aryan_topic
#Command
C:\KafkaSetup\kafka_2.13-2.4.0\bin\windows>connect-standalone.bat C:\\KafkaSetup\\kafka_2.13-2.4.0\\config\\connect-standalone.properties C:\\KafkaSetup\\kafka_2.13-2.4.0\\config\\myFileConnector.properties
#data.txt contains
Hello By Arun
#Kafka client
C:\KafkaSetup\kafka_2.13-2.4.0\bin\windows>kafka-console-consumer --bootstrap-server 127.0.0.1:9094 --topic aryan_topic
i]3
gh
"Hello By Arun"
这篇关于如何通过Kafka连续读取文件?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!