我的 Storm 拓扑既不工作(不生成输出)也不失败(不生成错误或异常) [英] My Storm Topology neither working(not generating output) nor failing (not generating errors or exceptions)

查看:26
本文介绍了我的 Storm 拓扑既不工作(不生成输出)也不失败(不生成错误或异常)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个拓扑结构,我试图计算由 SimulatorSpout(不是真正的 Stream)生成的单词出现次数,然后写入 MySQL 数据库表,表方案非常简单:

Field  |  Type        |  ...
ID     |  int(11)     |      Auto_icr
word   |  varchar(50) |  
count  |  int(11)     | 

但我面临着奇怪的问题(正如我之前提到的)我成功提交了 The Topology 到我的 Storm Cluster,它由 4 个 supervisor 组成,我可以在 Storm Web UI 中看到 Topology 的流程(无一例外)但是当我检查 MySQL 表时,令我惊讶的是,该表是空的...

But I am facing weird problem(as I beforementioned) I successfully submitted The Topology to my Storm Cluster which consists of 4 supervisors, and I can see the flow of the Topology in Storm Web UI (no exceptions) but when I checked the MySQL table, to my surprise, the table is empty...

欢迎任何评论、建议...

ANY COMMENTS, SUGGESTIONS ARE WELCOME...

这里是喷口和螺栓:

public class MySQLConnection {

    private static Connection conn = null;
    private static String dbUrl = "jdbc:mysql://192.168.0.2:3306/test?";
    private static String dbClass = "com.mysql.jdbc.Driver";

    public static Connection getConnection() throws SQLException, ClassNotFoundException  {

        Class.forName(dbClass);
        conn = DriverManager.getConnection(dbUrl, "root", "qwe123");            
        return conn;    
     }
}

============================== SentenceSpout ================================

============================= SentenceSpout ===============================

public class SentenceSpout extends BaseRichSpout{

private static final long serialVersionUID = 1L;
private boolean _completed = false;

private SpoutOutputCollector _collector;
private String [] sentences = {
        "Obama delivered a powerfull speech against USA",
        "I like cold beverages",
        "RT http://www.turkeyairline.com Turkish Airlines has delayed some flights",
        "don't have a cow man...",
        "i don't think i like fleas"
    };

private int index = 0;


public void open (Map config, TopologyContext context, SpoutOutputCollector collector) {
    _collector = collector;
}

public void nextTuple () {

        _collector.emit(new Values(sentences[index]));
        index++;
        if (index >= sentences.length) {
            index = 0;  

        Utils.waitForSeconds(1);
    }
}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("sentence"));
}

public void ack(Object msgId) {
    System.out.println("OK: " + msgId);
}   

public void close() {}

public void fail(Object msgId) {
    System.out.println("FAIL: " + msgId);
    }
}

============================ SplitSentenceBolt ==============================

============================ SplitSentenceBolt ==============================

public class SplitSentenceBolt extends BaseRichBolt {

private static final long serialVersionUID = 1L;
private OutputCollector _collector;

public void prepare (Map config, TopologyContext context, OutputCollector collector) {
    _collector = collector;
}

public void execute (Tuple tuple) {

    String sentence = tuple.getStringByField("sentence");
    String httpRegex = "((https?|ftp|telnet|gopher|file)):((//)|(\\\\))+[\\w\\d:#@%/;$()~_?\\+-=\\\\\\.&]*";
    sentence = sentence.replaceAll(httpRegex, "").replaceAll("RT", "").replaceAll("[.|,]", "");
    String[] words = sentence.split(" ");
    for (String word : words) {
        if (!word.isEmpty())
        _collector.emit(new Values(word.trim()));
    }
    _collector.ack(tuple);
}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("word"));
   }
 }

========================== WordCountBolt ====================================

=========================== WordCountBolt =================================

public class WordCountBolt extends BaseRichBolt {
private static final long serialVersionUID = 1L;
private HashMap<String , Integer> counts = null;
private OutputCollector _collector; 
private ResultSet resSet = null;
private Statement stmt = null;
private Connection _conn = null;

private String path = "/home/hduser/logOfStormTops/logger.txt";
String rLine = null;

public void prepare (Map config, TopologyContext context, OutputCollector collector) {
    counts = new HashMap<String, Integer>();
    _collector = collector;
}

public void execute (Tuple tuple) {

    int insertResult = 0;
    int updateResult = 0;

    String word = tuple.getStringByField("word");
    //----------------------------------------------------
    if (!counts.containsKey(word)) {            

        counts.put(word, 1);
        try {
            insertResult = wordInsertIfNoExist(word);
            if (insertResult == 1) {
                _collector.ack(tuple);
            } else {
                _collector.fail(tuple);
            }
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }  else {
    //-----------------------------------------------
        counts.put(word, counts.get(word) + 1);

        try {
            // writing to db
            updateResult = updateCountOfExistingWord(word);
            if (updateResult == 1) {
                _collector.ack(tuple);
            } else {
                _collector.fail(tuple);
            }
            // Writing to file
            BufferedWriter buffer = new BufferedWriter(new FileWriter(path));
            buffer.write("[ " + word + " : " + counts.get("word") + " ]");
            buffer.newLine();
            buffer.flush();
            buffer.close();

        } catch (ClassNotFoundException e) {            
            e.printStackTrace();
        } catch (SQLException e) {          
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
        System.out.println("{word-" + word + " : count-" + counts.get(word) + "}");
    }

}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
}

// *****************************************************

public int wordInsertIfNoExist(String word) throws ClassNotFoundException, SQLException {

    String query = "SELECT word FROM wordcount WHERE word=\"" + word + "\"";
    String insert = "INSERT INTO wordcount (word, count) VALUES (\"" + word +  "\", 1)";
    _conn = MySQLConnection.getConnection();
    stmt = _conn.createStatement();
    resSet = stmt.executeQuery(query);
    int res = 0;
    if (!resSet.next()) {

         res = stmt.executeUpdate(insert);

    } else {
        System.out.println("Yangi qiymatni kirityotrganda nimadir sodir bo'ldi");
    }
    resSet.close();
    stmt.close();
    _conn.close();
    return res;
}

public int updateCountOfExistingWord(String word) throws ClassNotFoundException, SQLException {

    String update = "UPDATE wordcount SET count=count+1 WHERE word=\"" + word + "\"";   
    _conn = MySQLConnection.getConnection();        
    stmt = _conn.createStatement();
    int result = stmt.executeUpdate(update);

        //System.out.println(word + "'s count has been updated (incremented)");

    resSet.close();
    stmt.close();
    _conn.close();
    return result;
    } 
  }

======================== WordCountTopology ================================

========================= WordCountTopology ==============================

public class WordCountTopology {
private static final String SENTENCE_SPOUT_ID = "sentence-spout";
private static final String SPLIT_BOLT_ID = "split-bolt";
private static final String COUNT_BOLT_ID = "count-bolt";
private static final String TOPOLOGY_NAME = "NewWordCountTopology";

@SuppressWarnings("static-access")
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {

SentenceSpout spout = new SentenceSpout();
SplitSentenceBolt splitBolt = new SplitSentenceBolt();
WordCountBolt countBolt = new WordCountBolt();

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout(SENTENCE_SPOUT_ID, spout, 2);

builder.setBolt(SPLIT_BOLT_ID, splitBolt, 4).shuffleGrouping(SENTENCE_SPOUT_ID);

builder.setBolt(COUNT_BOLT_ID, countBolt, 4).fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));

Config config = new Config();
config.setMaxSpoutPending(100);
config.setDebug(true);

StormSubmitter submitter = new StormSubmitter(); 

submitter.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());

   }
}

推荐答案

这是因为抛出异常时没有调用 _collector.ack(tuple).当待处理的元组过多时,spout 将停止发送新的元组.尝试抛出 RuntimeException 而不是 printStackTrace.

It is because the _collector.ack(tuple) is not being called when there is exception thrown. When there are too many pending tuple, spout will stop sending new tuples. Try throwing out RuntimeException instead of printStackTrace.

这篇关于我的 Storm 拓扑既不工作(不生成输出)也不失败(不生成错误或异常)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆