了解Kafka poll(),flush()& amp;犯罪() [英] Understanding Kafka poll(), flush() & commit()

查看:125
本文介绍了了解Kafka poll(),flush()& amp;犯罪()的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是Kafka的新手,正在为我的新应用程序尝试一些小用例.用例基本上是卡夫卡制片人—>卡夫卡消费品—>槽-卡夫卡水源—> flume-hdfs-sink.

I’m new to Kafka and trying out few small usecase for my new application. The use case is basically, Kafka-producer —> Kafka-Consumer—> flume-Kafka source—>flume-hdfs-sink.

在消耗(步骤2)时,以下是步骤顺序.1. Consumer.Poll(1.0)1.a.产生多个主题(正在监听多个水槽代理)1.b.生产.轮询()2.每25毫秒一次Flush()3.每隔msgs提交一次(asynchCommit = false)

When Consuming(step2), below is the sequence of steps.. 1. consumer.Poll(1.0) 1.a. Produce to multiple topics (multiple flume agents are listening) 1.b. Produce. Poll() 2. Flush() every 25 msgs 3. Commit() every msgs (asynchCommit=false)

问题1:这个动作顺序对吗!?!

Question 1: Is this sequence of action right!?!

问题2:这将导致任何数据丢失,因为刷新是每25 msgs,而提交是针对每个msg吗?!?

Question2: Will this cause any data loss as the flush is every 25 msgs and commit is for every msg?!?

问题3:生产者的poll()与消费者的poll()之间有何区别?

Question3 :Difference between poll() for producer and poll ()consumer?

问题4:提交消息但未刷新消息时会发生什么!?!

Question4 :What happens when messages are committed but not flushed!?!

如果有人可以帮助我理解生产者/消费者之间用于民意调查,刷新和提交的抵消示例,我将不胜感激.

I will really appreciate if someone can help me understand with offset examples between producer/consumer for poll,flush and commit.

提前谢谢!

推荐答案

让我们首先简单地了解一下Kafka:

Let us first understand Kafka in short:

什么是卡夫卡生产商:

t.turner@devs:~/developers/softwares/kafka_2.12-2.2.0$ bin/kafka-console-producer.sh --broker-list 100.102.1.40:9092,100.102.1.41:9092 --topic company_wallet_db_v3-V3_0_0-transactions
>{"created_at":1563415200000,"payload":{"action":"insert","entity":{"amount":40.0,"channel":"INTERNAL","cost_rate":1.0,"created_at":"2019-07-18T02:00:00Z","currency_id":1,"direction":"debit","effective_rate":1.0,"explanation":"Voucher,"exchange_rate":null,expired","id":1563415200,"instrument":null,"instrument_id":null,"latitude":null,"longitude":null,"other_party":null,"primary_account_id":2,"receiver_phone":null,"secondary_account_id":362,"sequence":1,"settlement_id":null,"status":"success","type":"voucher_expiration","updated_at":"2019-07-18T02:00:00Z","primary_account_previous_balance":0.0,"secondary_account_previous_balance":0.0}},"track_id":"a011ad33-2cdd-48a5-9597-5c27c8193033"}
[2019-07-21 11:53:37,907] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 7 : {company_wallet_db_v3-V3_0_0-transactions=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

您可以忽略该警告.Kafka找不到该主题并自动创建该主题.

You can ignore the warning. It appears as Kafka could not find the topic and auto-creates the topic.

让我们看看kafka如何存储此消息:

Let us see how kafka has stored this message:

生产者在代理服务器中的/kafka-logs (对于apache kafka)或/kafka-cf-data (对于融合版本)中创建目录

The producer creates a directory in the broker server at /kafka-logs (for apache kafka) or /kafka-cf-data (for the confluent version)

drwxr-xr-x   2 root root  4096 Jul 21 08:53 company_wallet_db_v3-V3_0_0-transactions-0

cd进入该目录,然后列出文件.您将看到存储实际数据的 .log 文件:

cd into this directory and then list the files. You will see the .log file that stores the actual data:

-rw-r--r--   1 root root 10485756 Jul 21 08:53 00000000000000000000.timeindex
-rw-r--r--   1 root root 10485760 Jul 21 08:53 00000000000000000000.index
-rw-r--r--   1 root root        8 Jul 21 08:53 leader-epoch-checkpoint
drwxr-xr-x   2 root root     4096 Jul 21 08:53 .
-rw-r--r--   1 root root      762 Jul 21 08:53 00000000000000000000.log

如果打开日志文件,您将看到:

If you open the log file, you will see:

^@^@^@^@^@^@^@^@^@^@^Bî^@^@^@^@^B<96>T<88>ò^@^@^@^@^@^@^@^@^Al^S<85><98>k^@^@^Al^S<85><98>kÿÿÿÿÿÿÿÿÿÿÿÿÿÿ^@^@^@^Aö
^@^@^@^Aè
{"created_at":1563415200000,"payload":{"action":"insert","entity":{"amount":40.0,"channel":"INTERNAL","cost_rate":1.0,"created_at":"2019-07-18T02:00:00Z","currency_id":1,"direction":"debit","effective_rate":1.0,"explanation":"Voucher,"exchange_rate":null,expired","id":1563415200,"instrument":null,"instrument_id":null,"latitude":null,"longitude":null,"other_party":null,"primary_account_id":2,"receiver_phone":null,"secondary_account_id":362,"sequence":1,"settlement_id":null,"status":"success","type":"voucher_expiration","updated_at":"2019-07-18T02:00:00Z","primary_account_previous_balance":0.0,"secondary_account_previous_balance":0.0}},"track_id":"a011ad33-2cdd-48a5-9597-5c27c8193033"}^@

让我们了解消费者如何轮询和读取记录:

Let us understand how the consumer would poll and read records :

什么是卡夫卡民意调查:

What is Kafka Poll :

Kafka维护分区中每个记录的数字偏移量.此偏移量充当该记录内的记录的唯一标识符分区,也表示消费者在划分.例如,位置5的消费者已经消费了偏移量为0到4的记录,然后将接收带有偏移量5.实际上有两个与位置相关的概念消费者的用户:消费者的位置给出了下一条记录将被给出.它会比消费者在该分区中看到的最高偏移量.它消费者每次收到邮件时都会自动前进呼叫轮询(长).

Kafka maintains a numerical offset for each record in a partition. This offset acts as a unique identifier of a record within that partition, and also denotes the position of the consumer in the partition. For example, a consumer which is at position 5 has consumed records with offsets 0 through 4 and will next receive the record with offset 5. There are actually two notions of position relevant to the user of the consumer: The position of the consumer gives the offset of the next record that will be given out. It will be one larger than the highest offset the consumer has seen in that partition. It automatically advances every time the consumer receives messages in a call to poll(long).

因此,轮询将持续时间作为输入,读取该持续时间的 00000000000000000000.log 文件,并将其返回给使用者.

So, poll takes a duration as input, reads the 00000000000000000000.log file for that duration, and returns them to the consumer.

何时删除邮件:

Kafka负责刷新消息.有两种方法:

Kafka takes care of the flushing of messages. There are 2 ways:

  1. 基于时间:默认为7天.可以使用更改 log.retention.ms = 1680000
  2. 基于尺寸:可以设置为 log.retention.bytes = 10487500

现在让我们看一下消费者:

Now let us look at the consumer:

t.turner@devs:~/developers/softwares/kafka_2.12-2.2.0$ bin/kafka-console-consumer.sh --bootstrap-server 100.102.1.40:9092 --topic company_wallet_db_v3-V3_0_0-transactions --from-beginning
{"created_at":1563415200000,"payload":{"action":"insert","entity":{"amount":40.0,"channel":"INTERNAL","cost_rate":1.0,"created_at":"2019-07-18T02:00:00Z","currency_id":1,"direction":"debit","effective_rate":1.0,"explanation":"Voucher,"exchange_rate":null,expired","id":1563415200,"instrument":null,"instrument_id":null,"latitude":null,"longitude":null,"other_party":null,"primary_account_id":2,"receiver_phone":null,"secondary_account_id":362,"sequence":1,"settlement_id":null,"status":"success","type":"voucher_expiration","updated_at":"2019-07-18T02:00:00Z","primary_account_previous_balance":0.0,"secondary_account_previous_balance":0.0}},"track_id":"a011ad33-2cdd-48a5-9597-5c27c8193033"}
^CProcessed a total of 1 messages

以上命令指示使用者从 offset = 0 中读取.Kafka为此控制台使用者分配一个 group_id ,并维护该 group_id 读取的最后一个偏移量.因此,它可以将更新的消息推送到此 consumer-group

The above command instructs the consumer to read from offset = 0. Kafka assigns this console consumer a group_id and maintains the last offset that this group_id has read. So, it can push newer messages to this consumer-group

什么是Kafka提交:

What is Kafka Commit:

提交是一种向kafka告知消费者已成功处理的消息的方法.可以认为这是在 group-id:current_offset + 1 之间更新查找.您可以使用使用者对象的commitAsync()或commitSync()方法进行管理.

Commit is a way to tell kafka the messages the consumer has successfully processed. This can be thought as updating the lookup between group-id : current_offset + 1. You can manage this using the commitAsync() or commitSync() methods of the consumer object.

参考: https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html

这篇关于了解Kafka poll(),flush()&amp; amp;犯罪()的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆