Storm:用于从端口读取数据的Spout [英] Storm : Spout for reading data from a port

查看:200
本文介绍了Storm:用于从端口读取数据的Spout的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要写一个风暴喷口来从端口读取数据。想知道这在逻辑上是否可行。

I need to write a storm spout for reading data from a port. Wanted to know if that was logically possible.

考虑到这一点,我设计了一个简单的拓扑结构,设计用于一个喷嘴和一个螺栓。 spout会收集使用wget发送的HTTP请求,而bolt会显示请求 - 就是那个。

With that in mind, I had designed a simple topology designed for the same with one spout and one bolt. The spout would gather HTTP requests sent using wget and the bolt would display the request-Just that.

我的spout结构如下:

My spout structure is as follows:

public class ProxySpout extends BaseRichSpout{
         //The O/P collector
         SpoutOutputCollector sc;
         //The socket
         Socket clientSocket;
         //The server socket
         ServerSocket sc;

         public ProxySpout(int port){
            this.sc=new ServerSocket(port);
            try{
                clientSocket=sc.accept();
            }catch(IOException ex){
                //Handle it
            }
         }

         public void nextTuple(){
            try{
                InputStream ic=clientSocket.getInputStream();
                byte b=new byte[8196];
                int len=ic.read(b);

                sc.emit(new Values(b));
                ic.close();
            }catch(//){
                //Handle it
            }finally{
                clientSocket.close();
            }
         }
}

我已实施其余的方法也是。

I have implemented the rest of the methods too.

当我把它变成拓扑并运行它时,我发送第一个请求时出错:

When I turn this into a topology and run it, I get an error when I send the first request:

java.lang.RuntimeException:java.io.NotSerializableException:java.net.Socket

java.lang.RuntimeException:java.io.NotSerializableException:java.net.Socket

只需要知道方式是否有问题我正在实施这个喷口。鲸鱼喷水器甚至可以从端口收集数据吗?或者鲸鱼喷水充当代理的实例?

Just need to know if there is something wrong with the way I am implementing this spout. Is it even possible for a spout to collect data from a port? Or for a spout to act as an instance of a proxy?

修改

搞定了。

代码是:

   public class ProxySpout extends BaseRichSpout{
         //The O/P collector
         static SpoutOutputCollector _collector;
         //The socket
         static Socket _clientSocket;
         static ServerSocket _serverSocket;
         static int _port;

         public ProxySpout(int port){
          _port=port;
         }

         public void open(Map conf,TopologyContext context, SpoutOutputCollector collector){
           _collector=collector;
           _serverSocket=new ServerSocket(_port);
         }   

         public void nextTuple(){
            _clientSocket=_serverSocket.accept();
            InputStream incomingIS=_clientSocket.getInputStream();
            byte[] b=new byte[8196];
            int len=b.incomingIS.read(b);
            _collector.emit(new Values(b));
     }
}

根据@ Shaw的建议,尝试初始化<$ c open()方法中的$ c> _serverSocket 和 _clientSocket 在<$中运行c $ c> nextTuple()收听请求的方法。

As per @Shaw's suggestion, tried initializing _serverSocket in the open() method and the _clientSocket runs in nextTuple() method for listening to requests.

不知道这个的性能表,但它有效..: - )

Dunno the performance metrices of this one, but it works..:-)

推荐答案

在构造函数中只需分配变量。尝试在prepare方法中实例化ServerSocket,不要在构造函数中编写任何新的...并重命名变量,你有两个sc变量。

In constructor just assign the variables. Try to instantiate ServerSocket in prepare method, do not write any new ... in constructor. And rename variables, you have two sc variables.

public class ProxySpout extends BaseRichSpout{

    int port;

    public ProxySpout(int port){
        this.port=port;
    }

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector)  { 
        //new ServerSocket
    }

    @Override
    public void nextTuple() {

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }
}

如果你把它放在准备方法中那么它会只有在已经部署了喷口后才会调用它,所以它不需要被序列化,并且只会在喷口的每个生命周期内调用一次,因此效率不高。

If you put it in prepare method then it will only be called once the spout is already deployed, so it doesn't need to be serialized, and it will only be called once per lifetime of the spout, so it's not inefficient.

这篇关于Storm:用于从端口读取数据的Spout的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆