在风暴中使用带有三叉戟的刻度元组 [英] Using tick tuples with trident in storm

查看:25
本文介绍了在风暴中使用带有三叉戟的刻度元组的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我可以使用标准的 spout、bolt 组合来进行流聚合并且在愉快的情况下工作得很好,当使用刻度元组以某个时间间隔保留数据时使用批处理.现在我正在做一些失败管理(跟踪未保存的元组等).(即不是来自风暴的 ootb)

I am able to use standard spout,bolt combination to do streaming aggregation and works very well in happy case, when using tick tuples to persist data at some interval to make use of batching. Right now i am doing some failure management (tracking off tuples not saved etc) myself.(i.e not ootb from storm)

但我读过三叉戟为您提供了更高的抽象和更好的故障管理.我不明白的是三叉戟中是否有刻度元组支持.基本上我想在当前一分钟左右的内存中批处理并保留所有聚合数据前几分钟使用三叉戟.

But i have read that trident gives you a higher abstraction and better failure management. What i dont understand is whether there is tick tuple support in trident. Basically I would like to batch in memory for the current minute or so and persist any aggregated data for the previous minutes using trident.

此处的任何提示或设计建议都会有所帮助.

Any pointers here or design suggestions would be helpful.

谢谢

推荐答案

实际上微批处理是 Trident 的一个内置功能.您不需要任何刻度元组.当您的代码中有类似内容时:

Actually micro-batching is a built-in Trident's feature. You don't need any tick tuples for that. When you have something like this in your code:

topology
    .newStream("myStream", spout)
    .partitionPersist(
        ElasticSearchEventState.getFactoryFor(connectionProvider),
        new Fields("field1", "field2"),
        new ElasticSearchEventUpdater()
    )

(我在这里使用我的自定义 ElasticSearch 状态/更新程序,您可能会使用其他东西)

(I'm using here my custom ElasticSearch state/updater, you might use something else)

所以当你有这样的事情时,在引擎盖下,Trident 将你的流分组并执行 partitionPersist 操作,而不是在单个元组上,而是在这些批次上.

So when you have something like this, under the hood Trident group your stream into batches and performs partitionPersist operation not on individual tuples but on those batches.

如果您出于任何原因仍然需要滴答元组,只需创建您的滴答喷口,这样的事情对我有用:

If you still need tick tuples for any reason, just create your tick spout, something like this works for me:

public class TickSpout implements IBatchSpout {

    public static final String TIMESTAMP_FIELD = "timestamp";
    private final long delay;

    public TickSpout(long delay) {
        this.delay = delay;
    }

    @Override
    public void open(Map conf, TopologyContext context) {
    }

    @Override
    public void emitBatch(long batchId, TridentCollector collector) {
        Utils.sleep(delay);
        collector.emit(new Values(System.currentTimeMillis()));
    }

    @Override
    public void ack(long batchId) {
    }

    @Override
    public void close() {
    }

    @Override
    public Map getComponentConfiguration() {
        return null;
    }

    @Override
    public Fields getOutputFields() {
        return new Fields(TIMESTAMP_FIELD);
    }
}

这篇关于在风暴中使用带有三叉戟的刻度元组的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆