将 ZeroMQ 与 C# 与 inproc 传输一起使用 [英] Using ZeroMQ with C# with inproc transport

查看:21
本文介绍了将 ZeroMQ 与 C# 与 inproc 传输一起使用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在试验 ZeroMQ 并尝试让 某些东西 工作.我的第一个想法是使用 inproc 传输设置 REP/REQ 以查看是否可以在两个线程之间发送消息.以下大部分代码取自 clzmq 示例,但似乎不起作用.

I'm experimenting with ZeroMQ and trying to get something working. My first thought was to set up a REP/REQ using the inproc transport to see if I could send messages between two threads. Most of the below code is taken from the clzmq examples, but it doesn't seem to work.

服务器和客户端都绑定到传输,但是当客户端尝试执行 Send 时,它会阻塞并坐在那里.我没有 ZeroMQ 经验,所以我不确定首先要看哪里,任何帮助将不胜感激.这是违规(攻击性)代码:

Both the server and the client are bound to the transport, but when the client tries to do a Send it blocks and just sits there. I have no ZeroMQ experience so I'm not sure where to look first, any help would be greatly appreciated. Here's the offending (offensive) code:

using System;
using System.Diagnostics;
using System.Threading;
using NUnit.Framework;
using ZMQ;

namespace PostBox
{
    [TestFixture]
    public class Class1
    {

        private const string Address = "inproc://test";
        private const uint MessageSize = 10;
        private const int RoundtripCount = 100;

        [Test]
        public void Should()
        {
            var clientThread = new Thread(StartClient);
            clientThread.Start();

            var serverThread = new Thread(StartServer);
            serverThread.Start();

            clientThread.Join();
            serverThread.Join();

            Console.WriteLine("Done with life");
        }

        private void StartServer()
        {


            //  Initialise 0MQ infrastructure
            using (var ctx = new Context(1))
            {
                using (var skt = ctx.Socket(SocketType.REP))
                {
                    skt.Bind(Address);

                    Console.WriteLine("Server has bound");

                    //  Bounce the messages.
                    for (var i = 0; i < RoundtripCount; i++)
                    {
                        var msg = skt.Recv();
                        Debug.Assert(msg.Length == MessageSize);
                        skt.Send(msg);
                    }
                    Thread.Sleep(1000);
                }
            }

            Console.WriteLine("Done with server");
        }

        private void StartClient()
        {
            Thread.Sleep(2000);

            //  Initialise 0MQ infrastructure
            using (var ctx = new Context(1))
            {
                using (var skt = ctx.Socket(SocketType.REQ))
                {
                    skt.Bind(Address);

                    Console.WriteLine("Client has bound");

                    //  Create a message to send.
                    var msg = new byte[MessageSize];

                    //  Start measuring the time.
                    var watch = new Stopwatch();
                    watch.Start();

                    //  Start sending messages.
                    for (var i = 0; i < RoundtripCount; i++)
                    {
                        skt.Send(msg);
                        msg = skt.Recv();
                        Debug.Assert(msg.Length == MessageSize);

                        Console.Write(".");
                    }

                    //  Stop measuring the time.
                    watch.Stop();
                    var elapsedTime = watch.ElapsedTicks;

                    //  Print out the test parameters.
                    Console.WriteLine("message size: " + MessageSize + " [B]");
                    Console.WriteLine("roundtrip count: " + RoundtripCount);

                    //  Compute and print out the latency.
                    var latency = (double)(elapsedTime) / RoundtripCount / 2 *
                        1000000 / Stopwatch.Frequency;
                    Console.WriteLine("Your average latency is {0} [us]",
                        latency.ToString("f2"));
                }
            }

            Console.WriteLine("Done with client");
        }

    }
}

我在以下答案的帮助下完成了这项工作,但它还要求我将 Bind 更改为 Connect,这在您考虑时很有意义因为我们有一个绑定到本地传输的服务器和一个连接到远程传输的客户端.这是更新后的代码:

I got this working with the help of the below answer, but it also required me to change a Bind to a Connect, which makes sense when you think about it as we have a server binding to a local transport and a client connecting to a remote transport. Here's the updated code:

using System;
using System.Diagnostics;
using System.Threading;
using NUnit.Framework;
using ZMQ;

namespace PostBox
{
    [TestFixture]
    public class Class1
    {

        private const string Address = "inproc://test";
        private const uint MessageSize = 10;
        private const int RoundtripCount = 100;

        private static Context ctx;

        [Test]
        public void Should()
        {
            using (ctx = new Context(1))
            {
                var clientThread = new Thread(StartClient);
                clientThread.Start();

                var serverThread = new Thread(StartServer);
                serverThread.Start();

                clientThread.Join();
                serverThread.Join();

                Console.WriteLine("Done with life");
            }
        }

        private void StartServer()
        {
            try
            {
                using (var skt = ctx.Socket(SocketType.REP))
                {
                    skt.Bind(Address);

                    Console.WriteLine("Server has bound");

                    //  Bounce the messages.
                    for (var i = 0; i < RoundtripCount; i++)
                    {
                        var msg = skt.Recv();
                        Debug.Assert(msg.Length == MessageSize);
                        skt.Send(msg);
                    }
                    Thread.Sleep(1000);
                }

                Console.WriteLine("Done with server");
            }
            catch (System.Exception e)
            {
                Console.WriteLine(e.Message);
            }
        }

        private void StartClient()
        {
            Thread.Sleep(2000);

            try
            {
                //  Initialise 0MQ infrastructure
                using (var skt = ctx.Socket(SocketType.REQ))
                {
                    skt.Connect(Address);

                    Console.WriteLine("Client has bound");

                    //  Create a message to send.
                    var msg = new byte[MessageSize];

                    //  Start measuring the time.
                    var watch = new Stopwatch();
                    watch.Start();

                    //  Start sending messages.
                    for (var i = 0; i < RoundtripCount; i++)
                    {
                        skt.Send(msg);
                        msg = skt.Recv();
                        Debug.Assert(msg.Length == MessageSize);

                        Console.Write(".");
                    }

                    //  Stop measuring the time.
                    watch.Stop();
                    var elapsedTime = watch.ElapsedTicks;

                    //  Print out the test parameters.
                    Console.WriteLine("message size: " + MessageSize + " [B]");
                    Console.WriteLine("roundtrip count: " + RoundtripCount);

                    //  Compute and print out the latency.
                    var latency = (double)(elapsedTime) / RoundtripCount / 2 *
                                  1000000 / Stopwatch.Frequency;
                    Console.WriteLine("Your average latency is {0} [us]",
                                      latency.ToString("f2"));
                }

                Console.WriteLine("Done with client");
            }
            catch (System.Exception e)
            {
                Console.WriteLine(e.Message);
            }
        }

    }
}

推荐答案

我相信,两个线程都需要使用相同的 Context.Zeromq 指南不建议在一个进程中使用多个上下文.创建一个上下文,在两个线程之间共享该上下文.这应该有效.

I believe, both threads need to use the same Context. Zeromq guide does recommend not to use more than one context in a process. Create a context, share that context between both the threads. This should work.

来自 http://zguide.zeromq.org/chapter:all

您必须为您的流程创建一个上下文"对象,并将其传递给所有线程.上下文收集 ØMQ 的状态.创建连接跨进程:传输,服务器和客户端线程必须共享相同的上下文对象.

You MUST create a 'context' object for your process, and pass that to all threads. The context collects ØMQ's state. To create a connection across the inproc: transport, both server and client thread must share the same context object.

这篇关于将 ZeroMQ 与 C# 与 inproc 传输一起使用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆