如何在风暴中创建拓扑 [英] How to create a topology in storm

查看:23
本文介绍了如何在风暴中创建拓扑的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们刚接触风暴.我们不知道如何创建拓扑,请帮助我们使用storm.我们尝试了在 Windows 上运行风暴"一文中给出的示例 wordcount c=topology.但是我们无法理解如何给出输入以及输入存在于何处以及输出存在于风暴 ui 中的何处.

We are new to storm. We don't know about how to create topology please help us to work with storm. We tried sample wordcount c=topology given in the article "running storm on windows". But we can't understand how to give an input and also where the input is present and where the output is present in the storm ui.

推荐答案

Storm UI 中不存在输入和输出.在 Storm UI 中,您可以看到没有发出的元组、处理时间、集群配置和集群的健康状况.查看输出和输入使用记录器机制,然后检查出现在storm包的日志文件夹中的每个工作日志文件.要在 Storm 中创建拓扑,您需要两个东西,一个 spout 和一个 bolt.请在下面找到示例代码:-

Input and output does not present in Storm UI.In Storm UI you can see no of tuples emitted,processing time,cluster configuration and health of your cluster.To see output and input use logger mechanism and then check each worker log files which present in log folder of storm package. To create a topology in Storm you need two things one spout and one bolt.Please find below sample coded :-

SampleSpout.java

SampleSpout.java

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;

public class SampleSpout implements IRichSpout{

    SpoutOutputCollector collector;
    int i=0;
    List<Object> tupleList;
    @Override
    public void open(Map conf, TopologyContext context,
            SpoutOutputCollector collector) {
        // TODO Auto-generated method stub

    }
    @Override
    public void close() {
        // TODO Auto-generated method stub

    }
    @Override
    public void activate() {
        // TODO Auto-generated method stub

    }
    @Override
    public void deactivate() {
        // TODO Auto-generated method stub

    }
    @Override
    public void nextTuple() {
        tupleList=new ArrayList<Object>();
        tupleList.add("storm"+i);
        tupleList.add(i);
        collector.emit(tupleList,i);
        i++;        
    }
    @Override
    public void ack(Object msgId) {
        // TODO Auto-generated method stub

    }
    @Override
    public void fail(Object msgId) {
        // TODO Auto-generated method stub

    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word","count"));
    }
    @Override
    public Map<String, Object> getComponentConfiguration() {
        // TODO Auto-generated method stub
        return null;
    }

}

SampleBolt.java

SampleBolt.java

import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class SampleBolt implements IBasicBolt {

    private static Logger log = LoggerFactory.getLogger(SampleBolt.class);


    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // TODO Auto-generated method stub

    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        // TODO Auto-generated method stub

    }

    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        log.info(input.getValues().toString()+"output values");
    }

    @Override
    public void cleanup() {
        // TODO Auto-generated method stub

    }

}

SampleTopology.java

SampleTopology.java

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

public class SampleTopology {

    /**
     * @param args
     */
    public static void main(String[] args) {

        TopologyBuilder topology=new TopologyBuilder();
        topology.setSpout("sampleSpout",new SampleSpout());
        topology.setBolt("sampleBolt",new SampleBolt()).shuffleGrouping("sampleSpout");

        Config conf = new Config();
        conf.setDebug(true);

        LocalCluster cluster=new LocalCluster();
        cluster.submitTopology("test", conf, topology.createTopology());


    }

}

这篇关于如何在风暴中创建拓扑的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆