Netty 可以自动处理请求的排队吗? [英] Can Netty automatically handle queueing of requests?

查看:47
本文介绍了Netty 可以自动处理请求的排队吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在 Apache PLC4X 项目 (https://plc4x.apache.org) 中,我们正在为使用 Netty 的工业 PLC.这里通常将各种协议分层.有时一层要求我们将一条消息拆分为底层的多条消息.现在我们面临一个大问题:一个协议协商每个连接的最大数量的未确认消息.所以我们不能发送超过这个最大值的消息,否则接收者只会发送一个错误响应.

In the Apache PLC4X project (https://plc4x.apache.org) we are implementing drivers for industrial PLCs using Netty. Here usually a variety of protocols are layered. Some times one layer requires us to split up one message into multiple messages of the underlying layer. Now we are facing one big problem: One protocol negotiates a maximum number of unconfirmed messages per connection. So we can't send more messages to that than this maximum or the receiver will simply send an error response.

现在我们不需要在 encode 方法中将东西添加到out",而是将它们添加到某种队列中并使用一些 Netty 机制来处理该队列的排空……Netty 中是否有这样的机制?如果没有,实现此目的的最佳方法是什么?

Now we would need to not add things to "out" in the encode method, but to add them to some sort of queue and have some Netty mechanism take care of draining that queue ... is there such a mechanism in Netty? If not, what would be the best way to implement this?

如果有好的 Netty 洞察力的人加入我们的项目邮件列表 (dev@plc4x.apache.org) 也会很酷,因为我们也在为 Netty 做一些非常酷的添加(以太网帧上的原始套接字传输和一个以 IP 数据包为基础)......我敢打赌这两个项目可以从彼此中受益匪浅.

Would also be cool if someone with good Netty insight could join our project mailing list (dev@plc4x.apache.org) as we're also working on some really cool additions for Netty (Raw Socket transport on Ethernet Frame and one on IP packet base) ... I bet both projects could benefit greatly from each other.

推荐答案

虽然 Netty 没有提供这种开箱即用的处理程序,但是由于内部设计,很容易将这种最大并发挂起请求从盒子.

While Netty does not provide such a handler out of the box, but because of the internal design, it is really easy to make such max concurrent pending requests out of the box.

可以使用 <来自 Netty 框架的 code>PendingWriteQueue 类与通用处理程序相结合:

Making such handler can be done using the PendingWriteQueue class from the Netty framework in combination with a generic handler:

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.PendingWriteQueue;

public class MaxPendingRequestHandler extends ChannelHandlerAdapter {

    private PendingWriteQueue queue;
    private int freeSlots;

    public MaxPendingRequestHandler(int maxRequests) {
        this.freeSlots = maxRequests;
    }

    private synchronized void trySendMessages(ChannelHandlerContext ctx) {
        if(this.freeSlots > 0) {
            while(this.freeSlots > 0) {
                if(this.queue.removeAndWrite() == null) {
                    ctx.flush();
                    return;
                }
                this.freeSlots--;
            }
            ctx.flush();
        }
    }

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        this.queue = new PendingWriteQueue(ctx);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        // Send everything so we get a proper failurefor those pending writes
        this.queue.removeAndWriteAll();
        super.channelInactive(ctx);
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        this.queue.removeAndWriteAll();
        super.channelUnregistered(ctx);
    }

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        this.queue.add(msg, promise);
        trySendMessages(ctx);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        synchronized(this) {
            this.freeSlots++;
            trySendMessages(ctx);
        }
        super.channelRead(ctx, msg);
    }

}

此处理程序的工作原理是将每条新消息保存在队列中,并在每次写入/读取时检查线路上的空闲插槽.

This handler works on the fact that it saves every new message in a queue, and it checks the free slots on the wire on every write/read.

请注意,处理程序应放置在数据包解码器/编码器之后的管道中,否则将传入数据包计数为潜在的多个数据包时会出现问题,例如:

Notice that handler should be placed in the pipeline after the packet decoders/encoders, else problems happen with counting incoming packets as potential multiple packets, example:

pipeline.addLast(new PacketCodex()); // A codex exists of an encoder and decoder, you can also ass them seperately
// pipeline.addLast(new TrafficShapingHandler()) // Optional, depending on your required protocols
// pipeline.addLast(new IdleStateHandler())      // Optional, depending on your required protocols
pipeline.addLast(new MaxPendingRequestHandler())
pipeline.addLast(new Businesshandler())

当然,您还想验证我们的处理程序是否有效,这可以使用包含 EmbeddedChannel 的单元测试来完成 &JUnit:

Of course, you also want to verify that our handler works, this can be done using a Unit test containing a EmbeddedChannel & JUnit:

public class MaxPendingRequestHandlerTest {

    @Test
    public void testMaxPending() {
        EmbeddedChannel channel = new EmbeddedChannel(new MaxPendingRequestHandler(3));

        // channel.writeOutbound("1", "2", "3"); // This doesn't work as it expects operations to complete directly

        channel.write("1");
        channel.write("2");
        channel.write("3");
        channel.write("4");
        channel.write("5");
        channel.write("6");

        Assert.assertEquals(channel.readOutbound(), "1");
        Assert.assertEquals(channel.readOutbound(), "2");
        Assert.assertEquals(channel.readOutbound(), "3");
        Assert.assertEquals(channel.readOutbound(), (Object)null);
    }

    @Test
    public void testMaxPendingWhenAResponseHasReceived() {
        EmbeddedChannel channel = new EmbeddedChannel(new MaxPendingRequestHandler(3));

        // channel.writeOutbound("1", "2", "3"); // This doesn't work as it expects operations to complete directly

        channel.write("1");
        channel.write("2");
        channel.write("3");
        channel.write("4");
        channel.write("5");
        channel.write("6");

        channel.writeInbound("RE: 1");

        Assert.assertEquals(channel.readOutbound(), "1");
        Assert.assertEquals(channel.readOutbound(), "2");
        Assert.assertEquals(channel.readOutbound(), "3");
        Assert.assertEquals(channel.readOutbound(), "4");
        Assert.assertEquals(channel.readOutbound(), (Object)null);
    }

    @Test
    public void testMaxPendingWhenAllResponseHasReceived() {
        EmbeddedChannel channel = new EmbeddedChannel(new MaxPendingRequestHandler(3));

        // channel.writeOutbound("1", "2", "3"); // This doesn't work as it expects operations to complete directly

        channel.write("1");
        channel.write("2");
        channel.write("3");
        channel.write("4");
        channel.write("5");
        channel.write("6");

        channel.writeInbound("RE: 1");
        channel.writeInbound("RE: 2");
        channel.writeInbound("RE: 3");
        channel.writeInbound("RE: 4");
        channel.writeInbound("RE: 5");
        channel.writeInbound("RE: 6");

        Assert.assertEquals(channel.readOutbound(), "1");
        Assert.assertEquals(channel.readOutbound(), "2");
        Assert.assertEquals(channel.readOutbound(), "3");
        Assert.assertEquals(channel.readOutbound(), "4");
        Assert.assertEquals(channel.readOutbound(), "5");
        Assert.assertEquals(channel.readOutbound(), "6");
        Assert.assertEquals(channel.readOutbound(), (Object)null);
    }

    @Test
    public void testMaxPendingWhenAllResponseHasReceivedAndNewMessagesAreSend() {
        EmbeddedChannel channel = new EmbeddedChannel(new MaxPendingRequestHandler(3));

        // channel.writeOutbound("1", "2", "3"); // This doesn't work as it expects operations to complete directly

        channel.write("1");
        channel.write("2");
        channel.write("3");
        channel.write("4");
        channel.write("5");
        channel.write("6");

        channel.writeInbound("RE: 1");
        channel.writeInbound("RE: 2");
        channel.writeInbound("RE: 3");
        channel.writeInbound("RE: 4");
        channel.writeInbound("RE: 5");
        channel.writeInbound("RE: 6");

        channel.write("7");
        channel.write("8");
        channel.write("9");
        channel.write("10");

        Assert.assertEquals(channel.readOutbound(), "1");
        Assert.assertEquals(channel.readOutbound(), "2");
        Assert.assertEquals(channel.readOutbound(), "3");
        Assert.assertEquals(channel.readOutbound(), "4");
        Assert.assertEquals(channel.readOutbound(), "5");
        Assert.assertEquals(channel.readOutbound(), "6");
        Assert.assertEquals(channel.readOutbound(), "7");
        Assert.assertEquals(channel.readOutbound(), "8");
        Assert.assertEquals(channel.readOutbound(), "9");
        Assert.assertEquals(channel.readOutbound(), (Object)null);
    }

}

这篇关于Netty 可以自动处理请求的排队吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆