Kafka Connect FileStreamSource忽略附加的行 [英] Kafka Connect FileStreamSource ignores appended lines

查看:99
本文介绍了Kafka Connect FileStreamSource忽略附加的行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在开发一个使用Spark处理日志的应用程序,我想使用Kafka作为从日志文件流式传输数据的一种方式.基本上,我有一个日志文件(在本地文件系统上),该文件会不断用新日志更新,而Kafka Connect似乎是从文件中获取数据以及新附加行的理想解决方案.

I'm working on an application to process logs with Spark and I thought to use Kafka as a way to stream the data from the log file. Basically I have a single log file (on the local file system) which is continuously updated with new logs, and Kafka Connect seems to be the perfect solution to get the data from the file along with the new appended lines.

我正在使用以下命令以默认配置启动服务器:

I'm starting the servers with their default configurations with the following commands:

Zookeeper服务器:

zookeeper-server-start.sh config/zookeeper.properties

zookeeper.properties

zookeeper.properties

dataDir=/tmp/zookeeper
clientPort=2181
maxClientCnxns=0

Kafka服务器:

kafka-server-start.sh config/server.properties

server.properties

server.properties

broker.id=0
log.dirs=/tmp/kafka-logs
zookeeper.connect=localhost:2181
[...]

然后我创建了主题"connect-test":

kafka-topics.sh --create --zookeeper localhost:2181-复制因子1-分区1 --topic连接测试

最后我运行Kafka Connector:

connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties

connect-standalone.properties

connect-standalone.properties

bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

connect-file-source.properties

connect-file-source.properties

name=my-file-connector
connector.class=FileStreamSource
tasks.max=1
file=/data/users/zamara/suivi_prod/app/data/logs.txt
topic=connect-test

首先,我通过运行一个简单的控制台使用者来测试连接器:

At first I tested the connector by running a simple console consumer:

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning

一切正常,消费者正在从文件中接收日志,而在我添加日志时,消费者不断使用新日志进行更新.

Everything was working perfectly, the consumer was receiving the logs from the file and as I was adding logs the consumer kept updating with the new ones.

(然后,我按照此指南尝试将Spark作为消费者"使用:

(Then I tried Spark as a "consumer" following this guide: https://spark.apache.org/docs/2.2.0/streaming-kafka-0-8-integration.html#approach-2-direct-approach-no-receivers and it was still fine)

此后,我从日志文件中删除了一些日志并更改了主题(我删除了"connect-test"主题,创建了另一个主题,并使用新主题编辑了connect-file-source.properties).

After this, I removed some of the logs from the log file and changed the topic (I deleted the 'connect-test' topic, created another one and edited the connect-file-source.properties with the new topic).

但是现在连接器不再以相同的方式工作.使用控制台使用者时,我只会获取文件中已经存在的日志,并且添加的每一行都将被忽略.也许在不更改连接器名称的情况下更改主题(和/或修改日志文件中的数据)在Kafka中造成了某些问题……

But now the connector doesn't work the same way anymore. When using the console consumer, I only get the logs that were already in the file and every new line I add is ignored. Maybe changing the topic (and/or modifying the data from the log file) without changing the connector name broke something in Kafka...

这是Kafka Connect对我的主题"new-topic"和连接器"new-file-connector"进行的操作:

This is what Kafka Connect does with my topic 'new-topic' and connector 'new-file-connector', :

[2018-05-16 15:06:42,454] INFO Created connector new-file-connector (org.apache.kafka.connect.cli.ConnectStandalone:104)
[2018-05-16 15:06:42,487] INFO Cluster ID: qjm74WJOSomos3pakXb2hA (org.apache.kafka.clients.Metadata:265)
[2018-05-16 15:06:42,522] INFO Updated PartitionLeaderEpoch. New: {epoch:0, offset:0}, Current: {epoch:-1, offset:-1} for Partition: new-topic-0. Cache now contains 0 entries. (kafka.server.epoch.LeaderEpochFileCache)
[2018-05-16 15:06:52,453] INFO WorkerSourceTask{id=new-file-connector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:328)
[2018-05-16 15:06:52,453] INFO WorkerSourceTask{id=new-file-connector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:345)
[2018-05-16 15:06:52,458] INFO WorkerSourceTask{id=new-file-connector-0} Finished commitOffsets successfully in 5 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:427)
[2018-05-16 15:07:02,459] INFO WorkerSourceTask{id=new-file-connector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:328)
[2018-05-16 15:07:02,459] INFO WorkerSourceTask{id=new-file-connector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:345)
[2018-05-16 15:07:12,459] INFO WorkerSourceTask{id=new-file-connector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:328)
[2018-05-16 15:07:12,460] INFO WorkerSourceTask{id=new-file-connector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:345)

(即使在文件中添加新行之后,它仍会刷新0条未完成的消息)

(it keeps flushing 0 outstanding messages even after appending new lines to the file)

因此,我尝试重新开始:我删除了/tmp/kafka-logs目录,/tmp/connect.offset文件,并使用了全新的主题名称,连接器名称和日志文件,以防万一.但是,连接器仍然会忽略新日志...我什至试图删除我的kafka,从存档中重新提取它,然后再次运行整个过程(以防Kafka中发生了某些更改),但是没有成功.

So I tried to start over: I deleted the /tmp/kafka-logs directory, the /tmp/connect.offset file, and used a brand new topic name, connector name and log file, just in case. But still, the connector ignores new logs... I even tried to delete my kafka, re-extract it from the archive and run the whole process again (in case something changed in Kafka), but no success.

我对问题出在哪里感到困惑,任何帮助将不胜感激!

I'm confused as to where the problem is, any help would be appreciated !

推荐答案

每个

FileStream连接器示例旨在显示简单的连接器如何以用户或开发人员的身份首次为使用Kafka Connect的用户运行.不建议将其用于生产.

The FileStream Connector examples are intended to show how a simple connector runs for those first getting started with Kafka Connect as either a user or developer. It is not recommended for production use.

我会使用 Filebeat (带有Kafka输出)之类的东西来代替摄取登录到Kafka.或者 kafka-connect-spooldir 如果您的日志未直接附加到独立文件中放在要提取的文件夹中.

I would use something like Filebeat (with its Kafka output) instead for ingesting logs into Kafka. Or kafka-connect-spooldir if your logs are not appended to directly but are standalone files placed in a folder for ingest.

这篇关于Kafka Connect FileStreamSource忽略附加的行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆