Storm:用于从端口读取数据的 Spout [英] Storm : Spout for reading data from a port

查看:24
本文介绍了Storm:用于从端口读取数据的 Spout的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要编写一个 Storm spout 来从端口读取数据.想知道这在逻辑上是否可行.

I need to write a storm spout for reading data from a port. Wanted to know if that was logically possible.

考虑到这一点,我设计了一个简单的拓扑结构,设计为一个喷口和一个螺栓.spout 会收集使用 wget 发送的 HTTP 请求,bolt 会显示请求——仅此而已.

With that in mind, I had designed a simple topology designed for the same with one spout and one bolt. The spout would gather HTTP requests sent using wget and the bolt would display the request-Just that.

我的喷口结构如下:

public class ProxySpout extends BaseRichSpout{
         //The O/P collector
         SpoutOutputCollector sc;
         //The socket
         Socket clientSocket;
         //The server socket
         ServerSocket sc;

         public ProxySpout(int port){
            this.sc=new ServerSocket(port);
            try{
                clientSocket=sc.accept();
            }catch(IOException ex){
                //Handle it
            }
         }

         public void nextTuple(){
            try{
                InputStream ic=clientSocket.getInputStream();
                byte b=new byte[8196];
                int len=ic.read(b);

                sc.emit(new Values(b));
                ic.close();
            }catch(//){
                //Handle it
            }finally{
                clientSocket.close();
            }
         }
}

我也实现了其余的方法.

I have implemented the rest of the methods too.

当我把它变成一个拓扑并运行它时,当我发送第一个请求时出现错误:

When I turn this into a topology and run it, I get an error when I send the first request:

java.lang.RuntimeException:java.io.NotSerializableException:java.net.Socket

java.lang.RuntimeException:java.io.NotSerializableException:java.net.Socket

只需要知道我实现这个 spout 的方式是否有问题.喷口甚至有可能从端口收集数据吗?或者让一个 spout 作为代理的实例?

Just need to know if there is something wrong with the way I am implementing this spout. Is it even possible for a spout to collect data from a port? Or for a spout to act as an instance of a proxy?

编辑

成功了.

代码是:

   public class ProxySpout extends BaseRichSpout{
         //The O/P collector
         static SpoutOutputCollector _collector;
         //The socket
         static Socket _clientSocket;
         static ServerSocket _serverSocket;
         static int _port;

         public ProxySpout(int port){
          _port=port;
         }

         public void open(Map conf,TopologyContext context, SpoutOutputCollector collector){
           _collector=collector;
           _serverSocket=new ServerSocket(_port);
         }   

         public void nextTuple(){
            _clientSocket=_serverSocket.accept();
            InputStream incomingIS=_clientSocket.getInputStream();
            byte[] b=new byte[8196];
            int len=b.incomingIS.read(b);
            _collector.emit(new Values(b));
     }
}

根据@Shaw 的建议,尝试在 open() 方法中初始化 _serverSocket 并且 _clientSocketnextTuple() 中运行 监听请求的方法.

As per @Shaw's suggestion, tried initializing _serverSocket in the open() method and the _clientSocket runs in nextTuple() method for listening to requests.

不知道这个的性能指标,但它有效..:-)

Dunno the performance metrices of this one, but it works..:-)

推荐答案

在构造函数中只分配变量.尝试在 prepare 方法中实例化 ServerSocket ,不要在构造函数中编写任何新的 ... .并重命名变量,你有两个 sc 变量.

In constructor just assign the variables. Try to instantiate ServerSocket in prepare method, do not write any new ... in constructor. And rename variables, you have two sc variables.

public class ProxySpout extends BaseRichSpout{

    int port;

    public ProxySpout(int port){
        this.port=port;
    }

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector)  { 
        //new ServerSocket
    }

    @Override
    public void nextTuple() {

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }
}

如果你把它放在prepare方法中,那么它只会在spout已经部署后才会被调用,所以它不需要序列化,而且它在spout的每个生命周期只会被调用一次,所以它不是低效的.

If you put it in prepare method then it will only be called once the spout is already deployed, so it doesn't need to be serialized, and it will only be called once per lifetime of the spout, so it's not inefficient.

这篇关于Storm:用于从端口读取数据的 Spout的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆