Samza:将消息的处理延迟到时间戳记 [英] Samza: Delay processing of messages until timestamp

查看:94
本文介绍了Samza:将消息的处理延迟到时间戳记的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在与Samza处理来自Kafka主题的消息.某些消息将来会带有时间戳,我想将处理推迟到该时间戳之后.同时,我想继续处理其他传入消息.

I'm processing messages from a Kafka topic with Samza. Some of the messages come with a timestamp in the future and I'd like to postpone the processing until after that timestamp. In the meantime, I'd like to keep processing other incoming messages.

我试图做的是让我的Task排队消息,并实现WindowableTask来定期检查消息(如果它们的时间戳允许处理它们).基本思路如下:

What I tried to do is make my Task queue the messages and implement the WindowableTask to periodically check the messages if their timestamp allows to process them. The basic idea looks like this:

public class MyTask implements StreamTask, WindowableTask {

    private HashSet<MyMessage> waitingMessages = new HashSet<>();

    @Override
    public void process(IncomingMessageEnvelope incomingMessageEnvelope, MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
        byte[] message = (byte[]) incomingMessageEnvelope.getMessage();
        MyMessage parsedMessage = MyMessage.parseFrom(message);

        if (parsedMessage.getValidFromDateTime().isBeforeNow()) {
            // Do the processing
        } else {
            waitingMessages.add(parsedMessage);
        }

    }

    @Override
    public void window(MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
        for (MyMessage message : waitingMessages) {
            if (message.getValidFromDateTime().isBeforeNow()) {
                // Do the processing and remove the message from the set
            }
        }
    }
}

这显然有一些缺点.重新部署任务时,我将在内存中丢失等待的消息.因此,我想了解延迟与Samza一起处理消息的最佳实践.我是否需要一次又一次地将消息重新发送到同一主题,直到最终可以处理它们?我们在这里谈论的是将处理延迟几分钟,直到1-2个小时.

This obviously has some downsides. I'd be losing my waiting messages in memory when I redeploy my task. So I'd like to know the best practice for delaying the processing of messages with Samza. Do I need to reemit the messages to the same topic again and again until I can finally process them? We're talking about delaying the processing for a few minutes up to 1-2 hours here.

推荐答案

我认为您可以使用Samza的键值存储来保持任务实例的状态,而不是内存中的Set. 它看起来应该像这样:

I think you could use key-value store of Samza to keep state of your task instance instead of in-memory Set. It should look something like:

public class MyTask implements StreamTask, WindowableTask, InitableTask {

  private KeyValueStore<String, MyMessage> waitingMessages;


  @SuppressWarnings("unchecked")
  @Override
  public void init(Config config, TaskContext context) throws Exception {
    this.waitingMessages = (KeyValueStore<String, MyMessage>) context.getStore("messages-store");
  }

  @Override
  public void process(IncomingMessageEnvelope incomingMessageEnvelope, MessageCollector messageCollector,
      TaskCoordinator taskCoordinator) {
    byte[] message = (byte[]) incomingMessageEnvelope.getMessage();
    MyMessage parsedMessage = MyMessage.parseFrom(message);

    if (parsedMessage.getValidFromDateTime().isBefore(LocalDate.now())) {
      // Do the processing
    } else {
      waitingMessages.put(parsedMessage.getId(), parsedMessage);
    }

  }

  @Override
  public void window(MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
    KeyValueIterator<String, MyMessage> all = waitingMessages.all();
    while(all.hasNext()) {
      MyMessage message = all.next().getValue();
      // Do the processing and remove the message from the set
    }
  }

}

如果重新部署任务,Samza应该重新创建键值存储的状态(Samza将值保留在与键值存储相关的特殊kafka主题中).当然,您需要提供商店的一些额外配置(在上面的messages-store示例中).

If you redeploy you task Samza should recreate state of key-value store (Samza keeps values in special kafka topic related to key-value store). You need of course provide some extra configuration of your store (in above example for messages-store).

您可以在此处了解有关键值存储的信息(适用于最新的Samza版本): https://samza.apache.org/learn/documentation/0.14/container/state-management.html

You could read about key-value store here (for the latest Samza version): https://samza.apache.org/learn/documentation/0.14/container/state-management.html

这篇关于Samza:将消息的处理延迟到时间戳记的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆