Apache Storm远程拓扑提交 [英] Apache Storm Remote Topology Submission

查看:195
本文介绍了Apache Storm远程拓扑提交的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我一直在使用IDE(Eclipse)测试Storm Topologies的远程提交。
我成功将简单的风暴拓扑上传到远程Storm集群,但奇怪的是当我检查Storm UI以确定远程提交的拓扑是否正常工作时,我看到了_acker bolt in the UI,但其他螺栓和喷口不存在。之后,我从命令行手动提交拓扑,并再次检查Storm UI,它正在工作,因为它应该没有问题。我一直在找原因但找不到。我在下面附加了拓扑和远程提交者类以及相应的Storm UI图片:

I have been testing remote submission of Storm Topologies using IDE (Eclipse). And I succeeded uploading simple storm topology to remote Storm cluster, but the weird thing is when I checked Storm UI to make sure whether the topology, which was submitted remotely, is working without problems, I saw just _acker bolt in the UI but other bolts and spout is not there. After that I submitted the topology manually from command line and again checked Storm UI, and it is working as it is supposed to work without problems. I have been looking for the reason but couldn't find. I attached both topology and remote submitter class below and corresponding Storm UI pictures:


这是Eclipse控制台的输出(远程提交后)

This is the output from Eclipse console (after remote submission)



225  [main] INFO  backtype.storm.StormSubmitter - Uploading topology jar     T:\STORM_TOPOLOGIES\Benchmark.jar to assigned location: /app/storm/nimbus/inbox/stormjar-d3ca2e14-c1d4-45e1-b21c-70f62c62cd84.jar
234  [main] INFO  backtype.storm.StormSubmitter - Successfully uploaded topology jar to assigned location: /app/storm/nimbus/inbox/stormjar-d3ca2e14-c1d4-45e1-b21c-70f62c62cd84.jar

这是拓扑:

public class StormBenchmark {


// ******************************************************************************************
public static class GenSpout extends BaseRichSpout {

    //private static final Logger logger = Logger.getLogger(StormBenchmark.class.getName());

    private Long count = 1L;
    private Object msgID;
    private static final long serialVersionUID = 1L;
    private static final Character[] CHARS = new Character[] { 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'};
    private static final String[] newsagencies = {"bbc", "cnn", "reuters", "aljazeera", "nytimes", "nbc news", "fox news", "interfax"}; 

    SpoutOutputCollector _collector;
    int _size;
    Random _rand;
    String _id;
    String _val;
    // Constructor
    public GenSpout(int size) {
        _size = size;
    }  

    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        _collector = collector;
        _rand = new Random();
        _id = randString(5);
        _val = randString2(_size);
    }
    //Business logic
    public void nextTuple() {

        count++;
        msgID = count;
        _collector.emit(new Values(_id, _val), msgID);
    }

    public void ack(Object msgID) {
        this.msgID = msgID;
    }

    private String randString(int size) {

        StringBuffer buf = new StringBuffer();
        for(int i=0; i<size; i++) {
            buf.append(CHARS[_rand.nextInt(CHARS.length)]);
        }
        return buf.toString();
    }

    private String randString2(int size) {

        StringBuffer buf = new StringBuffer();
        for(int i=0; i<size; i++) {
            buf.append(newsagencies[_rand.nextInt(newsagencies.length)]);
        }
        return buf.toString();
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("id", "item"));
    }        
}
// =======================================================================================================
// =================================== B O L T ===========================================================
public static class IdentityBolt extends BaseBasicBolt {    

    private static final long serialVersionUID = 1L;

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("id", "item"));
    }
    public void execute(Tuple tuple, BasicOutputCollector collector) {

        String character = tuple.getString(0);
        String agency = tuple.getString(1);
        List<String> box = new ArrayList<String>();
        box.add(character);
        box.add(agency);
        try {
            fileWriter(box);
        } catch (IOException e) {
            e.printStackTrace();
        }
        box.clear();
    } 

    public void fileWriter(List<String> listjon) throws IOException {

        String pathname = "/home/hduser/logOfStormTops/logs.txt";
        File file = new File(pathname);
        if (!file.exists()){
            file.createNewFile();
        }
        BufferedWriter writer = new BufferedWriter(new FileWriter(file, true));

        writer.write(listjon.get(0) + " : " + listjon.get(1));
        writer.newLine();
        writer.flush();
        writer.close();         
    }
}


//storm jar storm-benchmark-0.0.1-SNAPSHOT-standalone.jar storm.benchmark.ThroughputTest demo 100 8 8 8 10000
public static void main(String[] args) throws Exception {


    TopologyBuilder builder = new TopologyBuilder();

    builder.setSpout("spout", new GenSpout(8), 2).setNumTasks(4);

    builder.setBolt("bolt", new IdentityBolt(), 4).setNumTasks(8)
            .shuffleGrouping("spout");


    Config conf = new Config();
    conf.setMaxSpoutPending(200);
    conf.setStatsSampleRate(0.0001);
    //topology.executor.receive.buffer.size: 8192 #batched
    //topology.executor.send.buffer.size: 8192 #individual messages
    //topology.transfer.buffer.size: 1024 # batched

    conf.put("topology.executor.send.buffer.size", 1024);
    conf.put("topology.transfer.buffer.size", 8);
    conf.put("topology.receiver.buffer.size", 8);
    conf.put(Config.TOPOLOGY_WORKER_CHILDOPTS, "-Xdebug -Xrunjdwp:transport=dt_socket,address=1%ID%,server=y,suspend=n");

    StormSubmitter.submitTopology("SampleTop", conf,   builder.createTopology());
}
}

这是RemoteSubmitter类:

And here is The RemoteSubmitter class:

public class RemoteSubmissionTopo {

@SuppressWarnings({ "unchecked", "rawtypes", "unused" })
public static void main(String... args) {


    Config conf = new Config();
    TopologyBuilder topoBuilder = new TopologyBuilder();
    conf.put(Config.NIMBUS_HOST, "117.16.142.49");
    conf.setDebug(true);
    Map stormConf = Utils.readStormConfig();
    stormConf.put("nimbus.host", "117.16.142.49");
    String jar_path = "T:\\STORM_TOPOLOGIES\\Benchmark.jar";


    Client client = NimbusClient.getConfiguredClient(stormConf).getClient();

    try {
        NimbusClient nimbus = new NimbusClient(stormConf, "117.16.142.49", 6627);
        String uploadedJarLocation = StormSubmitter.submitJar(stormConf, jar_path);
        String jsonConf = JSONValue.toJSONString(stormConf);

        nimbus.getClient().submitTopology("benchmark-tp", uploadedJarLocation, jsonConf, topoBuilder.createTopology());



    } catch (TTransportException e) {
        e.printStackTrace();
    } catch (AlreadyAliveException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    } catch (InvalidTopologyException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    } catch (TException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }

    try {
        Thread.sleep(6000);
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}
}

这是Storm UI pict(如果是远程提交)

And Here is the Storm UI pict (in case of remote submission)

这是另一个Storm UI pict(如果是手动提交)

And Here is the other Storm UI pict (in case of manual submission)

推荐答案

RemoteSubmissionTopo 中使用 TopologyBuilder topoBuilder = new TopologyBuilder(); 但不要调用 setSpout(...) / setBolt(...)。因此,您提交的拓扑没有运算符......

In RemoteSubmissionTopo you use TopologyBuilder topoBuilder = new TopologyBuilder(); but do not call setSpout(...)/setBolt(...). Thus, you are submitting an topology with no operators...

Btw:实际上根本不需要RemoteSubmissionTopo 。您可以使用 StormBenchmark 进行远程提交。只需在 main 中添加 conf.put(Config.NIMBUS_HOST,117.16.142.49); 并设置JVM选项 -Dstorm.jar = / path / to / topology.jar ,你很高兴。

Btw: RemoteSubmissionTopo is actually not required at all. You can use StormBenchmark to submit remotely. Just add conf.put(Config.NIMBUS_HOST, "117.16.142.49"); in main and set JVM option -Dstorm.jar=/path/to/topology.jar and you are good to run.

这篇关于Apache Storm远程拓扑提交的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆