我的Storm拓扑既不工作(不生成输出)也不失败(不生成错误或异常) [英] My Storm Topology neither working(not generating output) nor failing (not generating errors or exceptions)

查看:49
本文介绍了我的Storm拓扑既不工作(不生成输出)也不失败(不生成错误或异常)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个拓扑,其中我试图计算SimulatorSpout(不是真正的Stream)生成的单词出现次数,然后将其写入MySQL数据库表,该表方案非常简单:

Field  |  Type        |  ...
ID     |  int(11)     |      Auto_icr
word   |  varchar(50) |  
count  |  int(11)     | 

但是我面临着奇怪的问题(正如我之前提到的) 我已成功将拓扑提交给由4个主管组成的Storm Cluster,并且可以在Storm Web UI中看到拓扑的流程 (无例外),但是当我检查MySQL表时,令我惊讶的是,该表为空...

But I am facing weird problem(as I beforementioned) I successfully submitted The Topology to my Storm Cluster which consists of 4 supervisors, and I can see the flow of the Topology in Storm Web UI (no exceptions) but when I checked the MySQL table, to my surprise, the table is empty...

任何评论,建议都欢迎...

ANY COMMENTS, SUGGESTIONS ARE WELCOME...

以下是喷嘴和螺栓:

public class MySQLConnection {

    private static Connection conn = null;
    private static String dbUrl = "jdbc:mysql://192.168.0.2:3306/test?";
    private static String dbClass = "com.mysql.jdbc.Driver";

    public static Connection getConnection() throws SQLException, ClassNotFoundException  {

        Class.forName(dbClass);
        conn = DriverManager.getConnection(dbUrl, "root", "qwe123");            
        return conn;    
     }
}

============================ SentenceSpout ================== =============

============================= SentenceSpout ===============================

public class SentenceSpout extends BaseRichSpout{

private static final long serialVersionUID = 1L;
private boolean _completed = false;

private SpoutOutputCollector _collector;
private String [] sentences = {
        "Obama delivered a powerfull speech against USA",
        "I like cold beverages",
        "RT http://www.turkeyairline.com Turkish Airlines has delayed some flights",
        "don't have a cow man...",
        "i don't think i like fleas"
    };

private int index = 0;


public void open (Map config, TopologyContext context, SpoutOutputCollector collector) {
    _collector = collector;
}

public void nextTuple () {

        _collector.emit(new Values(sentences[index]));
        index++;
        if (index >= sentences.length) {
            index = 0;  

        Utils.waitForSeconds(1);
    }
}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("sentence"));
}

public void ack(Object msgId) {
    System.out.println("OK: " + msgId);
}   

public void close() {}

public void fail(Object msgId) {
    System.out.println("FAIL: " + msgId);
    }
}

=========================== SplitSentenceBolt ================== ===========

============================ SplitSentenceBolt ==============================

public class SplitSentenceBolt extends BaseRichBolt {

private static final long serialVersionUID = 1L;
private OutputCollector _collector;

public void prepare (Map config, TopologyContext context, OutputCollector collector) {
    _collector = collector;
}

public void execute (Tuple tuple) {

    String sentence = tuple.getStringByField("sentence");
    String httpRegex = "((https?|ftp|telnet|gopher|file)):((//)|(\\\\))+[\\w\\d:#@%/;$()~_?\\+-=\\\\\\.&]*";
    sentence = sentence.replaceAll(httpRegex, "").replaceAll("RT", "").replaceAll("[.|,]", "");
    String[] words = sentence.split(" ");
    for (String word : words) {
        if (!word.isEmpty())
        _collector.emit(new Values(word.trim()));
    }
    _collector.ack(tuple);
}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("word"));
   }
 }

========================== WordCountBolt =================== =============

=========================== WordCountBolt =================================

public class WordCountBolt extends BaseRichBolt {
private static final long serialVersionUID = 1L;
private HashMap<String , Integer> counts = null;
private OutputCollector _collector; 
private ResultSet resSet = null;
private Statement stmt = null;
private Connection _conn = null;

private String path = "/home/hduser/logOfStormTops/logger.txt";
String rLine = null;

public void prepare (Map config, TopologyContext context, OutputCollector collector) {
    counts = new HashMap<String, Integer>();
    _collector = collector;
}

public void execute (Tuple tuple) {

    int insertResult = 0;
    int updateResult = 0;

    String word = tuple.getStringByField("word");
    //----------------------------------------------------
    if (!counts.containsKey(word)) {            

        counts.put(word, 1);
        try {
            insertResult = wordInsertIfNoExist(word);
            if (insertResult == 1) {
                _collector.ack(tuple);
            } else {
                _collector.fail(tuple);
            }
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }  else {
    //-----------------------------------------------
        counts.put(word, counts.get(word) + 1);

        try {
            // writing to db
            updateResult = updateCountOfExistingWord(word);
            if (updateResult == 1) {
                _collector.ack(tuple);
            } else {
                _collector.fail(tuple);
            }
            // Writing to file
            BufferedWriter buffer = new BufferedWriter(new FileWriter(path));
            buffer.write("[ " + word + " : " + counts.get("word") + " ]");
            buffer.newLine();
            buffer.flush();
            buffer.close();

        } catch (ClassNotFoundException e) {            
            e.printStackTrace();
        } catch (SQLException e) {          
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
        System.out.println("{word-" + word + " : count-" + counts.get(word) + "}");
    }

}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
}

// *****************************************************

public int wordInsertIfNoExist(String word) throws ClassNotFoundException, SQLException {

    String query = "SELECT word FROM wordcount WHERE word=\"" + word + "\"";
    String insert = "INSERT INTO wordcount (word, count) VALUES (\"" + word +  "\", 1)";
    _conn = MySQLConnection.getConnection();
    stmt = _conn.createStatement();
    resSet = stmt.executeQuery(query);
    int res = 0;
    if (!resSet.next()) {

         res = stmt.executeUpdate(insert);

    } else {
        System.out.println("Yangi qiymatni kirityotrganda nimadir sodir bo'ldi");
    }
    resSet.close();
    stmt.close();
    _conn.close();
    return res;
}

public int updateCountOfExistingWord(String word) throws ClassNotFoundException, SQLException {

    String update = "UPDATE wordcount SET count=count+1 WHERE word=\"" + word + "\"";   
    _conn = MySQLConnection.getConnection();        
    stmt = _conn.createStatement();
    int result = stmt.executeUpdate(update);

        //System.out.println(word + "'s count has been updated (incremented)");

    resSet.close();
    stmt.close();
    _conn.close();
    return result;
    } 
  }

========================= WordCountTopology ===================== =========

========================= WordCountTopology ==============================

public class WordCountTopology {
private static final String SENTENCE_SPOUT_ID = "sentence-spout";
private static final String SPLIT_BOLT_ID = "split-bolt";
private static final String COUNT_BOLT_ID = "count-bolt";
private static final String TOPOLOGY_NAME = "NewWordCountTopology";

@SuppressWarnings("static-access")
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {

SentenceSpout spout = new SentenceSpout();
SplitSentenceBolt splitBolt = new SplitSentenceBolt();
WordCountBolt countBolt = new WordCountBolt();

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout(SENTENCE_SPOUT_ID, spout, 2);

builder.setBolt(SPLIT_BOLT_ID, splitBolt, 4).shuffleGrouping(SENTENCE_SPOUT_ID);

builder.setBolt(COUNT_BOLT_ID, countBolt, 4).fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));

Config config = new Config();
config.setMaxSpoutPending(100);
config.setDebug(true);

StormSubmitter submitter = new StormSubmitter(); 

submitter.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());

   }
}

推荐答案

这是因为在引发异常时未调用_collector.ack(tuple).当待处理的元组太多时,spout将停止发送新的元组.尝试抛出RuntimeException而不是printStackTrace.

It is because the _collector.ack(tuple) is not being called when there is exception thrown. When there are too many pending tuple, spout will stop sending new tuples. Try throwing out RuntimeException instead of printStackTrace.

这篇关于我的Storm拓扑既不工作(不生成输出)也不失败(不生成错误或异常)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆