kafka 流中的延迟函数 [英] delay function in kafka streams

查看:29
本文介绍了kafka 流中的延迟函数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

正在尝试使用 kafka 流代码进行一些操作,并希望在拆分数据后为 1 毫秒添加延迟或类似threads.sleep() 之类的东西....我很困惑如何做到这一点..有人可以帮助我这样做吗?

was trying something with the kafka streams code and wanted to add delay or something like threads.sleep() for 1ms after splitting data....I m confused how to do that..can someone help me out in doing that?

KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> textlines = builder.stream("INTOPIC");
KStream<String, String> mstream = textlines
    .mapValues(value -> value.replace("[",""));
    .mapValues(value -> value.replace("]",""));
    .mapValues(value -> value.replaceAll("\\},\\{" ,"\\}\\},\\{\\{"))
    .flatMapValues(value -> Arrays.asList(value.split("\\},\\{")));
mstream.to("OUTTOPIC");
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();

所以在 .flatmapvalues 语句之后,我需要添加一个 1ms 的 thread.sleep() 那么我的语句是什么..?

So after .flatmapvalues statement I need to add a thread.sleep() for 1ms so what can be my statement there..?

推荐答案

不确定您想要实现的目标,但您似乎想要减慢处理速度?比你可以在你的使用代码中睡一觉.为此,您的 lambda 表达式必须在返回实际结果之前调用sleep".作为替代方案,您还可以添加额外的 .foreach()peek() 调用并在那里休眠.

Not sure what you want to achieve, but it seems you want to slow down processing? Than you can just put a sleep into your use code. For this, your lambda expression must call "sleep" before it returns the actual result. As an alternative, you can also add an additional .foreach() or peek() call and sleep there.

这篇关于kafka 流中的延迟函数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆