如何在风暴中创建拓扑 [英] How to create a topology in storm

查看:72
本文介绍了如何在风暴中创建拓扑的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们是暴风雨的新手。我们不知道如何创建拓扑结构,请帮助我们应对风暴。我们尝试了在Windows上运行风暴一文中给出的示例wordcount c =拓扑。
但是我们无法理解如何给出输入以及输入存在的位置以及风暴ui中输出的位置。

We are new to storm. We don't know about how to create topology please help us to work with storm. We tried sample wordcount c=topology given in the article "running storm on windows". But we can't understand how to give an input and also where the input is present and where the output is present in the storm ui.

推荐答案

Storm UI中不存在输入和输出。在Storm UI中,您可以看到没有发出的元组,处理时间,群集配置和群集的运行状况。要查看输出和输入,请使用记录器机制然后检查风暴包日志文件夹中的每个工作日志文件。
要在Storm中创建拓扑,你需要两个东西,一个是spout,另一个是bolt。请在下面找到样本代码: -

Input and output does not present in Storm UI.In Storm UI you can see no of tuples emitted,processing time,cluster configuration and health of your cluster.To see output and input use logger mechanism and then check each worker log files which present in log folder of storm package. To create a topology in Storm you need two things one spout and one bolt.Please find below sample coded :-

SampleSpout.java

SampleSpout.java

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;

public class SampleSpout implements IRichSpout{

    SpoutOutputCollector collector;
    int i=0;
    List<Object> tupleList;
    @Override
    public void open(Map conf, TopologyContext context,
            SpoutOutputCollector collector) {
        // TODO Auto-generated method stub

    }
    @Override
    public void close() {
        // TODO Auto-generated method stub

    }
    @Override
    public void activate() {
        // TODO Auto-generated method stub

    }
    @Override
    public void deactivate() {
        // TODO Auto-generated method stub

    }
    @Override
    public void nextTuple() {
        tupleList=new ArrayList<Object>();
        tupleList.add("storm"+i);
        tupleList.add(i);
        collector.emit(tupleList,i);
        i++;        
    }
    @Override
    public void ack(Object msgId) {
        // TODO Auto-generated method stub

    }
    @Override
    public void fail(Object msgId) {
        // TODO Auto-generated method stub

    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word","count"));
    }
    @Override
    public Map<String, Object> getComponentConfiguration() {
        // TODO Auto-generated method stub
        return null;
    }

}

SampleBolt.java

SampleBolt.java

import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class SampleBolt implements IBasicBolt {

    private static Logger log = LoggerFactory.getLogger(SampleBolt.class);


    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // TODO Auto-generated method stub

    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        // TODO Auto-generated method stub

    }

    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        log.info(input.getValues().toString()+"output values");
    }

    @Override
    public void cleanup() {
        // TODO Auto-generated method stub

    }

}

SampleTopology.java

SampleTopology.java

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

public class SampleTopology {

    /**
     * @param args
     */
    public static void main(String[] args) {

        TopologyBuilder topology=new TopologyBuilder();
        topology.setSpout("sampleSpout",new SampleSpout());
        topology.setBolt("sampleBolt",new SampleBolt()).shuffleGrouping("sampleSpout");

        Config conf = new Config();
        conf.setDebug(true);

        LocalCluster cluster=new LocalCluster();
        cluster.submitTopology("test", conf, topology.createTopology());


    }

}

这篇关于如何在风暴中创建拓扑的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆