Kafka如何连续读取文件? [英] How to continuously read files by Kafka?

查看:30
本文介绍了Kafka如何连续读取文件?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是 Kafka 新手.

I'm a Kafka newbie.

我尝试了一些文件读取示例并应用于我的项目几周.但是,我的应用程序似乎无法正常工作,所以我在寻求您的建议.

I have tried some file reading examples and applied to my project for couple of weeks. However, my application does not seem to work as I wanted so I'm asking for your advise.

我的目的是:

  1. Kafka 生产者从目录 A 读取文件.
  2. Storm 使用从 1 产生的数据.
  3. 读取文件后移到其他目录.

条件:

  1. 文件被连续发送到目录 A.

这是一个简单的逻辑,但它让我头疼.

It is a simple logic but it gives me headache.

到目前为止,我已经在本地计算机 eclipse 上创建并测试了 Kafka 生产者代码.

So far I have created and tested Kafka producer code on my local computer eclipse.

我的想法是,因为kafka生产者应该继续读取文件,即使A目录中的所有文件都被读取,该进程也必须保持活动状态.但是,一旦读取并发送了目录 A 中的所有文件,它就会终止.

What I thought is, because kafka producer should keep on reading files, the process has to be kept alive even if all the files in directory A are read. But instead it terminates as soon as all the files in directory A have been read and sent.

我在具有 3 个 broker 的单个节点上运行 Kafka,以下是 Producer 属性设置.

I run Kafka on a single node with 3 brokers, and the following is Producer properties setting.

Properties props = new Properties();

props.put("metadata.broker.list", "localhost:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("producer.type", "sync");
props.put("request.required.acks", "1");

已使用以下命令创建主题.

Topic has been created with the following command.

bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 1 --replication-factor 1 --topic test

从 Kafka 的架构角度来看,我认为连续读取文件是错误的吗?或者有什么方法我还没有找到?如果有人能回答我的问题,我将不胜感激.

Is my thought of continuously file reading wrong in Kafka's architectural perspective? Or is there a way that I yet to find for? I'd be very appreciated if someone can answer my problems.

推荐答案

使用 Kafka Connect

By Using Kafka Connect

#File connect-standalone.properties

#bootstrap kafka servers
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

# local file storing offsets and config data
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=C:\\KafkaSetup\\kafka_2.13-2.4.0\\libs

# File myFileConnector.properties 
name=local-file-source
connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector
tasks.max=1
file=C:\\KafkaSetup\\input\\data.txt
topic=aryan_topic

#Command
C:\KafkaSetup\kafka_2.13-2.4.0\bin\windows>connect-standalone.bat C:\\KafkaSetup\\kafka_2.13-2.4.0\\config\\connect-standalone.properties C:\\KafkaSetup\\kafka_2.13-2.4.0\\config\\myFileConnector.properties 

#data.txt contains
Hello By Arun

#Kafka client
C:\KafkaSetup\kafka_2.13-2.4.0\bin\windows>kafka-console-consumer --bootstrap-server 127.0.0.1:9094 --topic aryan_topic
i]3
gh
"Hello By Arun"

这篇关于Kafka如何连续读取文件?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆