Kafka Connect FileStreamSource 忽略附加行 [英] Kafka Connect FileStreamSource ignores appended lines

查看:33
本文介绍了Kafka Connect FileStreamSource 忽略附加行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在开发一个使用 Spark 处理日志的应用程序,我想使用 Kafka 作为从日志文件流式传输数据的一种方式.基本上我有一个日志文件(在本地文件系统上),它会不断更新新日志,而 Kafka Connect 似乎是从文件中获取数据以及新附加行的完美解决方案.

I'm working on an application to process logs with Spark and I thought to use Kafka as a way to stream the data from the log file. Basically I have a single log file (on the local file system) which is continuously updated with new logs, and Kafka Connect seems to be the perfect solution to get the data from the file along with the new appended lines.

我使用以下命令以默认配置启动服务器:

I'm starting the servers with their default configurations with the following commands:

Zookeeper 服务器:

zookeeper-server-start.sh config/zookeeper.properties

zookeeper.properties

zookeeper.properties

dataDir=/tmp/zookeeper
clientPort=2181
maxClientCnxns=0

Kafka 服务器:

kafka-server-start.sh config/server.properties

server.properties

server.properties

broker.id=0
log.dirs=/tmp/kafka-logs
zookeeper.connect=localhost:2181
[...]

然后我创建了主题connect-test":

kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic connect-test

最后我运行了 Kafka 连接器:

connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties

connect-standalone.properties

connect-standalone.properties

bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

connect-file-source.properties

connect-file-source.properties

name=my-file-connector
connector.class=FileStreamSource
tasks.max=1
file=/data/users/zamara/suivi_prod/app/data/logs.txt
topic=connect-test

起初我通过运行一个简单的控制台使用者来测试连接器:

At first I tested the connector by running a simple console consumer:

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning

一切正常,消费者正在接收来自文件的日志,当我添加日志时,消费者不断更新新的日志.

Everything was working perfectly, the consumer was receiving the logs from the file and as I was adding logs the consumer kept updating with the new ones.

(然后我按照本指南尝试将 Spark 作为消费者":https://spark.apache.org/docs/2.2.0/streaming-kafka-0-8-integration.html#approach-2-direct-approach-no-receivers 还是可以的)

(Then I tried Spark as a "consumer" following this guide: https://spark.apache.org/docs/2.2.0/streaming-kafka-0-8-integration.html#approach-2-direct-approach-no-receivers and it was still fine)

此后,我从日志文件中删除了一些日志并更改了主题(我删除了connect-test"主题,创建了另一个主题并使用新主题编辑了 connect-file-source.properties).

After this, I removed some of the logs from the log file and changed the topic (I deleted the 'connect-test' topic, created another one and edited the connect-file-source.properties with the new topic).

但是现在连接器不再以同样的方式工作了.使用控制台使用者时,我只获取文件中已有的日志,并且我添加的每一行都被忽略.也许在不更改连接器名称的情况下更改主题(和/或修改日志文件中的数据)破坏了 Kafka 中的某些内容...

But now the connector doesn't work the same way anymore. When using the console consumer, I only get the logs that were already in the file and every new line I add is ignored. Maybe changing the topic (and/or modifying the data from the log file) without changing the connector name broke something in Kafka...

这就是 Kafka Connect 对我的主题new-topic"和连接器new-file-connector"所做的:

This is what Kafka Connect does with my topic 'new-topic' and connector 'new-file-connector', :

[2018-05-16 15:06:42,454] INFO Created connector new-file-connector (org.apache.kafka.connect.cli.ConnectStandalone:104)
[2018-05-16 15:06:42,487] INFO Cluster ID: qjm74WJOSomos3pakXb2hA (org.apache.kafka.clients.Metadata:265)
[2018-05-16 15:06:42,522] INFO Updated PartitionLeaderEpoch. New: {epoch:0, offset:0}, Current: {epoch:-1, offset:-1} for Partition: new-topic-0. Cache now contains 0 entries. (kafka.server.epoch.LeaderEpochFileCache)
[2018-05-16 15:06:52,453] INFO WorkerSourceTask{id=new-file-connector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:328)
[2018-05-16 15:06:52,453] INFO WorkerSourceTask{id=new-file-connector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:345)
[2018-05-16 15:06:52,458] INFO WorkerSourceTask{id=new-file-connector-0} Finished commitOffsets successfully in 5 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:427)
[2018-05-16 15:07:02,459] INFO WorkerSourceTask{id=new-file-connector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:328)
[2018-05-16 15:07:02,459] INFO WorkerSourceTask{id=new-file-connector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:345)
[2018-05-16 15:07:12,459] INFO WorkerSourceTask{id=new-file-connector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:328)
[2018-05-16 15:07:12,460] INFO WorkerSourceTask{id=new-file-connector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:345)

(即使在向文件添加新行后,它仍会刷新 0 个未完成的消息)

(it keeps flushing 0 outstanding messages even after appending new lines to the file)

所以我尝试重新开始:我删除了/tmp/kafka-logs 目录、/tmp/connect.offset 文件,并使用了全新的主题名称、连接器名称和日志文件,以防万一.但是,连接器仍然会忽略新日志......我什至尝试删除我的 kafka,从存档中重新提取它并再次运行整个过程(以防 Kafka 中的某些内容发生变化),但没有成功.

So I tried to start over: I deleted the /tmp/kafka-logs directory, the /tmp/connect.offset file, and used a brand new topic name, connector name and log file, just in case. But still, the connector ignores new logs... I even tried to delete my kafka, re-extract it from the archive and run the whole process again (in case something changed in Kafka), but no success.

我对问题出在哪里感到困惑,任何帮助将不胜感激!

I'm confused as to where the problem is, any help would be appreciated !

推荐答案

Per 文档:

FileStream 连接器示例旨在展示一个简单的连接器如何以用户或开发人员的身份首次开始使用 Kafka Connect.不推荐用于生产用途.

The FileStream Connector examples are intended to show how a simple connector runs for those first getting started with Kafka Connect as either a user or developer. It is not recommended for production use.

我会使用类似 Filebeat(带有 Kafka 输出)之类的东西来代替摄取登录卡夫卡.或者 kafka-connect-spooldir 如果您的日志没有直接附加到,而是独立文件放置在文件夹中以供摄取.

I would use something like Filebeat (with its Kafka output) instead for ingesting logs into Kafka. Or kafka-connect-spooldir if your logs are not appended to directly but are standalone files placed in a folder for ingest.

这篇关于Kafka Connect FileStreamSource 忽略附加行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆