Kafka使用HDFS上的查找数据进行流式处理 [英] Kafka Streams with lookup data on HDFS

查看:214
本文介绍了Kafka使用HDFS上的查找数据进行流式处理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Kafka Streams(v0.10.0.1)编写应用程序,并希望通过查找数据丰富我正在处理的记录。这个数据(时间戳文件)每天写入一个HDFS目录(或每天2-3次)。



如何在 Kafka Streams 应用程序并加入到实际的 KStream 中?

当有新文件到达时,重读HDFS数据的最佳做法是什么?



或者最好切换到 Kafka Connect ,然后将RDBMS表格内容写入Kafka主题,所有Kafka Streams应用程序实例都使用它?

更新

建议使用 Kafka Connect 。由于查找数据是在每日基础上在关系数据库中更新的,因此我正在考虑将Kafka Connect作为计划运行一次性工作,而不是始终打开连接。是的,因为语义和保持连接总是打开的开销,并确保它不会中断..等等。对于我来说,在这种情况下预定的抓取看起来更安全。

查找数据不大,记录可能被删除/添加/修改。我也不知道我是如何总能完全转换成卡夫卡主题并截断以前的记录的。启用日志压缩并为已删除的密钥发送空值可能不会工作,因为我不知道源系统中已删除了什么。此外,AFAIK在压缩发生时我没有控制权。

解决方案

推荐的方法的确是将查找数据Kafka也是如此 - 例如通过Kafka Connect--就像你自己在上面提出的那样。


但是在这种情况下,我该如何安排连接工作每天运行,而不是连续从源表中获取,这在我的情况下是不必要的?


也许你可以更新你的问题你不想有连续的Kafka Connect工作?您是否担心资源消耗(在数据库上加载),如果它不是每日最新消息,或者... ...您是否担心处理的语义?或者...?


更新:
正如Kafka Connect所建议的那样。由于每天在RDBMS中更新查找数据,因此我正考虑将Kafka Connect作为计划的一次性工作来运行,而不是始终打开连接。是的,因为语义和保持连接总是打开的开销,并确保它不会中断..等等。对于我来说,在这种情况下预定的抓取看起来更安全。


Kafka Connect 是安全的,并且JDBC连接器的构建完全是为了提供数据库表以一种强大的,容错的和高性能的方式(已经有很多生产部署)进入卡夫卡。所以我建议不要因为看起来比较安全而后退到批量更新模式;个人而言,我认为触发每日摄取在操作上并不方便,而仅仅是为了持续(实时!)摄取而运行,并且还会导致实际用例的几个缺点(请参阅下一段)。



但是当然,你的里程可能会有所不同 - 所以如果你每天只更新一次,那就去做吧。但是你失去了a)在富集发生的时间点用最新的DB数据丰富你的传入记录的能力,相反,b)你可能实际上用过时/旧数据丰富了传入记录,直到下一个每日更新完成,这很可能会导致您向下游发送的错误数据/使其他应用程序可用于消费。例如,如果客户更新她的送货地址(在数据库中),但您只能将此信息每天一次提供给流处理应用程序(以及潜在的其他许多应用程序),那么订单处理应用程序会将包裹寄送到错误地址,直到下一个每日摄取完成。


查找数据不大,记录可能会被删除/添加/修改。我也不知道我是如何总能完全转换成卡夫卡主题并截断以前的记录的。启用日志压缩并为已删除的密钥发送空值可能不起作用,因为我不知道在源系统中删除了什么。


Kafka Connect的JDBC连接器已经为您自动处理:1.确保数据库插入/更新/删除正确反映在Kafka主题中,2. Kafka的日志压缩确保目标话题不会超出界限。也许您可能需要阅读文档中的JDBC连接器,以了解您可以免费获得哪些功能: http://docs.confluent.io/current/connect/connect-jdbc/docs/


I'm writing an application with Kafka Streams (v0.10.0.1) and would like to enrich the records I'm processing with lookup data. This data (timestamped file) is written into a HDFS directory on daily basis (or 2-3 times a day).

How can I load this in the Kafka Streams application and join to the actual KStream?
What would be the best practice to reread the data from HDFS when a new file arrives there?

Or would it be better switching to Kafka Connect and write the RDBMS table content to a Kafka topic which can be consumed by all the Kafka Streams application instances?

Update:
As suggested Kafka Connect would be the way to go. Because the lookup data is updated in the RDBMS on a daily basis I was thinking about running Kafka Connect as a scheduled one-off job instead of keeping the connection always open. Yes, because of semantics and the overhead of keeping a connection always open and making sure that it won't be interrupted..etc. For me having a scheduled fetch in this case looks safer.

The lookup data is not big and records may be deleted / added / modified. I don't know either how I can always have a full dump into a Kafka topic and truncate the previous records. Enabling log compaction and sending null values for the keys that have been deleted would probably won't work as I don't know what has been deleted in the source system. Additionally AFAIK I don't have a control when the compaction happens.

解决方案

The recommend approach is indeed to ingest the lookup data into Kafka, too -- for example via Kafka Connect -- as you suggested above yourself.

But in this case how can I schedule the Connect job to run on a daily basis rather than continuously fetch from the source table which is not necessary in my case?

Perhaps you can update your question you do not want to have a continuous Kafka Connect job running? Are you concerned about resource consumption (load on the DB), are you concerned about the semantics of the processing if it's not "daily udpates", or...?

Update: As suggested Kafka Connect would be the way to go. Because the lookup data is updated in the RDBMS on a daily basis I was thinking about running Kafka Connect as a scheduled one-off job instead of keeping the connection always open. Yes, because of semantics and the overhead of keeping a connection always open and making sure that it won't be interrupted..etc. For me having a scheduled fetch in this case looks safer.

Kafka Connect is safe, and the JDBC connector has been built for exactly the purpose of feeding DB tables into Kafka in a robust, fault-tolerant, and performant way (there are many production deployments already). So I would suggest to not fallback to "batch update" pattern just because "it looks safer"; personally, I think triggering daily ingestions is operationally less convenient than just keeping it running for continuous (and real-time!) ingestion, and it also leads to several downsides for your actual use case (see next paragraph).

But of course, your mileage may vary -- so if you are set on updating just once a day, go for it. But you lose a) the ability to enrich your incoming records with the very latest DB data at the point in time when the enrichment happens, and, conversely, b) you might actually enrich the incoming records with stale/old data until the next daily update completed, which most probably will lead to incorrect data that you are sending downstream / making available to other applications for consumption. If, for example, a customer updates her shipping address (in the DB) but you only make this information available to your stream processing app (and potentially many other apps) once per day, then an order processing app will ship packages to the wrong address until the next daily ingest will complete.

The lookup data is not big and records may be deleted / added / modified. I don't know either how I can always have a full dump into a Kafka topic and truncate the previous records. Enabling log compaction and sending null values for the keys that have been deleted would probably won't work as I don't know what has been deleted in the source system.

The JDBC connector for Kafka Connect already handles this automatically for you: 1. it ensures that DB inserts/updates/deletes are properly reflected in a Kafka topic, and 2. Kafka's log compaction ensures that the target topic doesn't grow out of bounds. Perhaps you may want to read up on the JDBC connector in the docs to learn which functionality you just get for free: http://docs.confluent.io/current/connect/connect-jdbc/docs/ ?

这篇关于Kafka使用HDFS上的查找数据进行流式处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆