Storm:用于从端口读取数据的 Spout [英] Storm : Spout for reading data from a port
问题描述
我需要编写一个 Storm spout 来从端口读取数据.想知道这在逻辑上是否可行.
I need to write a storm spout for reading data from a port. Wanted to know if that was logically possible.
考虑到这一点,我设计了一个简单的拓扑结构,设计为一个喷口和一个螺栓.spout 会收集使用 wget 发送的 HTTP 请求,bolt 会显示请求——仅此而已.
With that in mind, I had designed a simple topology designed for the same with one spout and one bolt. The spout would gather HTTP requests sent using wget and the bolt would display the request-Just that.
我的喷口结构如下:
public class ProxySpout extends BaseRichSpout{
//The O/P collector
SpoutOutputCollector sc;
//The socket
Socket clientSocket;
//The server socket
ServerSocket sc;
public ProxySpout(int port){
this.sc=new ServerSocket(port);
try{
clientSocket=sc.accept();
}catch(IOException ex){
//Handle it
}
}
public void nextTuple(){
try{
InputStream ic=clientSocket.getInputStream();
byte b=new byte[8196];
int len=ic.read(b);
sc.emit(new Values(b));
ic.close();
}catch(//){
//Handle it
}finally{
clientSocket.close();
}
}
}
我也实现了其余的方法.
I have implemented the rest of the methods too.
当我把它变成一个拓扑并运行它时,当我发送第一个请求时出现错误:
When I turn this into a topology and run it, I get an error when I send the first request:
java.lang.RuntimeException:java.io.NotSerializableException:java.net.Socket
java.lang.RuntimeException:java.io.NotSerializableException:java.net.Socket
只需要知道我实现这个 spout 的方式是否有问题.喷口甚至有可能从端口收集数据吗?或者让一个 spout 作为代理的实例?
Just need to know if there is something wrong with the way I am implementing this spout. Is it even possible for a spout to collect data from a port? Or for a spout to act as an instance of a proxy?
编辑
成功了.
代码是:
public class ProxySpout extends BaseRichSpout{
//The O/P collector
static SpoutOutputCollector _collector;
//The socket
static Socket _clientSocket;
static ServerSocket _serverSocket;
static int _port;
public ProxySpout(int port){
_port=port;
}
public void open(Map conf,TopologyContext context, SpoutOutputCollector collector){
_collector=collector;
_serverSocket=new ServerSocket(_port);
}
public void nextTuple(){
_clientSocket=_serverSocket.accept();
InputStream incomingIS=_clientSocket.getInputStream();
byte[] b=new byte[8196];
int len=b.incomingIS.read(b);
_collector.emit(new Values(b));
}
}
根据@Shaw 的建议,尝试在 open()
方法中初始化 _serverSocket
并且 _clientSocket
在 nextTuple() 中运行
监听请求的方法.
As per @Shaw's suggestion, tried initializing _serverSocket
in the open()
method and the _clientSocket
runs in nextTuple()
method for listening to requests.
不知道这个的性能指标,但它有效..:-)
Dunno the performance metrices of this one, but it works..:-)
推荐答案
在构造函数中只分配变量.尝试在 prepare 方法中实例化 ServerSocket ,不要在构造函数中编写任何新的 ... .并重命名变量,你有两个 sc 变量.
In constructor just assign the variables. Try to instantiate ServerSocket in prepare method, do not write any new ... in constructor. And rename variables, you have two sc variables.
public class ProxySpout extends BaseRichSpout{
int port;
public ProxySpout(int port){
this.port=port;
}
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
//new ServerSocket
}
@Override
public void nextTuple() {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
如果你把它放在prepare方法中,那么它只会在spout已经部署后才会被调用,所以它不需要序列化,而且它在spout的每个生命周期只会被调用一次,所以它不是低效的.
If you put it in prepare method then it will only be called once the spout is already deployed, so it doesn't need to be serialized, and it will only be called once per lifetime of the spout, so it's not inefficient.
这篇关于Storm:用于从端口读取数据的 Spout的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!