服务结构无状态服务器自定义UDP侦听器 [英] Service Fabric Stateless Server Custom UDP Listener
问题描述
我们正在尝试定义一个服务结构无状态服务,该服务可侦听UDP数据.
我们正在与Microsoft合作,他们说它已受支持,我应该为TCP进行设置.以下是ServiceManifest.xml文件中的代码段:
<Resources>
<Endpoints>
<!-- This endpoint is used by the communication listener to obtain the port on which to
listen. Please note that if your service is partitioned, this port is shared with
replicas of different partitions that are placed in your code. -->
<Endpoint Name="ServiceEndpoint" Protocol="tcp" Port="12345" Type="Input" />
</Endpoints>
</Resources>
该服务可以正常启动,但是我无法使该服务接收任何UDP数据,并且如果我执行netstat -a
,则无法看到正在TCP或UDP端口上监听的任何内容.
我已经在网上进行了大量研究,但在创建自定义ICommunicationListener方面却没有发现很多,但我希望其他人能够验证SF是否应该实现这一目标.
这是ICommunicationListener实现:
public UdpCommunicationListener(string serviceEndPoint,
ServiceInitializationParameters serviceInitializationParameters, Action<UdpReceiveResult> connector)
{
if (serviceInitializationParameters == null)
{
throw new ArgumentNullException(nameof(serviceInitializationParameters));
}
var endPoint = serviceInitializationParameters
.CodePackageActivationContext
.GetEndpoint(serviceEndPoint ?? "ServiceEndPoint");
_connector = connector;
_ipAddress = FabricRuntime.GetNodeContext().IPAddressOrFQDN;
_port = endPoint.Port;
_server = new UdpServer(_ipAddress, _port);
_server.Open();
}
public Task<string> OpenAsync(CancellationToken cancellationToken)
{
_listener = _server.Listen(_connector);
return Task.FromResult($"udp::{_ipAddress}:{_port}");
}
public Task CloseAsync(CancellationToken cancellationToken)
{
this.Abort();
return Task.FromResult(true);
}
public void Abort()
{
_listener.Dispose();
_server?.Close();
}
}
public class UdpServer
{
private readonly UdpClient _udpClient;
private IObservable<UdpReceiveResult> _receiveStream;
public UdpServer(string ipAddress, int port)
{
Id = Guid.NewGuid();
_udpClient = new UdpClient(ipAddress, port);
}
public Guid Id { get; }
public void Open()
{
_receiveStream = _udpClient.ReceiveStream().Publish().RefCount();
}
public void Close()
{
//TODO: Not sure how to stop the process
}
public IDisposable Listen(Action<UdpReceiveResult> process)
{
return _receiveStream.Subscribe(async r =>
{
process(r);
});
}
}
我解决的UdpServer组件存在缺陷,现在可以在Service Fabric服务中托管.
此行的代码有问题:
_udpClient = new UdpClient(ipAddress, port);
这是侦听流量的错误过载,它必须为:
_udpClient = new UdpClient(port);
我确实尝试过:
_udpClient = new UdpClient(new IPAddress(IPAddress.Parse(_ipAddress)),port)
但这是行不通的;当通讯侦听(描述其自身)中检索主机的那一行返回主机名而不是IPAddress时,我认为您可以通过对清单进行一些更改来更改此行为,但是端口现在就足够了.>
We are trying to define a Service Fabric Stateless Service which listeners for UDP data.
We are working with Microsoft who have said that it IS supported and that I should set up for TCP; below is the snippet from the ServiceManifest.xml file :
<Resources>
<Endpoints>
<!-- This endpoint is used by the communication listener to obtain the port on which to
listen. Please note that if your service is partitioned, this port is shared with
replicas of different partitions that are placed in your code. -->
<Endpoint Name="ServiceEndpoint" Protocol="tcp" Port="12345" Type="Input" />
</Endpoints>
</Resources>
The service starts fine but I cannot make the service receive any UDP data and if I do a netstat -a
I cannot see anything listening on the port in either TCP or UDP.
I've done plenty of research on online and I've not found much on creating custom ICommunicationListener but I'm hoping that someone else might be able to verify if this should be possible with SF.
Here is the ICommunicationListener implementation:
public UdpCommunicationListener(string serviceEndPoint,
ServiceInitializationParameters serviceInitializationParameters, Action<UdpReceiveResult> connector)
{
if (serviceInitializationParameters == null)
{
throw new ArgumentNullException(nameof(serviceInitializationParameters));
}
var endPoint = serviceInitializationParameters
.CodePackageActivationContext
.GetEndpoint(serviceEndPoint ?? "ServiceEndPoint");
_connector = connector;
_ipAddress = FabricRuntime.GetNodeContext().IPAddressOrFQDN;
_port = endPoint.Port;
_server = new UdpServer(_ipAddress, _port);
_server.Open();
}
public Task<string> OpenAsync(CancellationToken cancellationToken)
{
_listener = _server.Listen(_connector);
return Task.FromResult($"udp::{_ipAddress}:{_port}");
}
public Task CloseAsync(CancellationToken cancellationToken)
{
this.Abort();
return Task.FromResult(true);
}
public void Abort()
{
_listener.Dispose();
_server?.Close();
}
}
public class UdpServer
{
private readonly UdpClient _udpClient;
private IObservable<UdpReceiveResult> _receiveStream;
public UdpServer(string ipAddress, int port)
{
Id = Guid.NewGuid();
_udpClient = new UdpClient(ipAddress, port);
}
public Guid Id { get; }
public void Open()
{
_receiveStream = _udpClient.ReceiveStream().Publish().RefCount();
}
public void Close()
{
//TODO: Not sure how to stop the process
}
public IDisposable Listen(Action<UdpReceiveResult> process)
{
return _receiveStream.Subscribe(async r =>
{
process(r);
});
}
}
There was an defect with the UdpServer component which I resolved and this now works hosted in a Service Fabric Service.
The issue with the code wa with this line :
_udpClient = new UdpClient(ipAddress, port);
This is the wrong overload for listening for traffic, it needed to be:
_udpClient = new UdpClient(port);
I did try :
_udpClient = new UdpClient(new IPAddress(IPAddress.Parse(_ipAddress)),port)
But this doesn't work; as the line from the Communication listen (as it describes itself) which retrieves the host returns the hostname and not the IPAddress, I think you could alter this behaviour by makes some changes to the manifest but just the port was sufficient for now.
这篇关于服务结构无状态服务器自定义UDP侦听器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!