0MQ:如何在一个线程安全的方式使用ZeroMQ? [英] 0MQ: How to use ZeroMQ in a threadsafe manner?

查看:1389
本文介绍了0MQ:如何在一个线程安全的方式使用ZeroMQ?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我阅读 ZeroMq指南,我偶然发现了以下内容:




您不得共享
线程之间ØMQ插座。 ØMQ套接字并不
线程安全的。从技术上讲这是可能的
要做到这一点,但它需要信号灯,
锁或互斥。这会让你的
应用缓慢和脆弱。唯一的
的地方,它的远程理智线程之间
股插座是否需要做
魔像
插座垃圾收集
语言绑定。




和后来的:




记住:除了在创建它们的线程不要使用或关闭套接字。




我也可以理解,ZeroMQ 上下文是线程安全的。



如果为一个事件类注册一个又一个类,在.NET中,此事件可能是从不同的线程比监听器上创建的线程中调用。



我觉得只有两种选择,可以从事件处理程序内发货通过ZeroMQ,套接字的东西




  • 的事件处理程序,调用线程同步到线程的ZeroMQ - 插座

  • 创建一个新的ZeroMQ被创造 - 插座 /获取已有的ZeroMQ - 插座的事件处理程序中的线程使用线程ZeroMQ - 上下文



看来,0MQ-指南劝阻第一个,我不认为创建一个新的ZeroMq编程每个线程高性能/要走的路。



我的提问:结果
什么是正确的模式(它意味着是的方式)从一个事件处理程序中通过发布消息0MQ



此外,没有指导的作者有ZeroMQ,结合考虑对于.NET时,他们写道:




只有
的地方,它的远程理智线程之间
股插座是否需要做
魔像
插座垃圾收集
语言绑定。 ?




下面是一些samplecode强调我的问题/问题:

 公共类ExampleClass中
{
公共事件的EventHandler< ByteEventArgs> SomethinIsCalledFromAnotherThread;
}

公共类ByteEventArgs:EventArgs的
{
公共字节[] BytesToSend;
}


公共类调度
{
ZMQ.Context CTX;

公共调度(ZMQ.Context mqcontext,例如ExampleClass exampleClassInstance)
{
this.ctx = mqcontext;
exampleClassInstance.SomethinIsCalledFromAnotherThread + =新的EventHandler< ByteEventArgs>(exampleClass_SomethinIsCalledFromAnotherThread);
}

无效exampleClass_SomethinIsCalledFromAnotherThread(对象发件人,ByteEventArgs E)
{
//此方法可能由不同的线程调用。所以,我必须得到一个新的socket等?
使用(VAR插座= ctx.Socket(ZMQ.SocketType.PUSH))
{
//初始化插座等.....最后:
socket.Send( e.BytesToSend);
}
//是不是太多开销?
}
}


解决方案

在达框架v4和最多可以使用并发集合来解决这个问题。即生产者 - 消费者模式。 。多线程(处理器),可以将数据发送到一个线程安全的队列,只是单独的线程从队列中消耗数据,并使用套接字并将其发送



下面的理念是:

  sendQueue =新BlockingCollection<我的资料GT;(新ConcurrentQueue<我的资料GT;()); 
//并发队列可以从多个线程接受/处理安全
+ MyHandler的=(的MyStuff stuffToSend)=> sendQueue.Add(stuffToSend);使用(VAR插座= context.Socket()){$ {


//启动单线程数据发送循环
Task.Factory.StartNew(()=> b $ b的MyStuff stuffToSend;
//此枚举将被阻塞,直到CompleteAdding被称为
的foreach
socket.Send(stuff.Serialize())(在sendQueue.GetConsumingEnumerable()VAR的东西) ;
}
});当完成
OnMyAppExit + = sendQueue.CompleteAdding

//跳出发送循环;


I read the ZeroMq guide and I stumbled upon the following:

You MUST NOT share ØMQ sockets between threads. ØMQ sockets are not threadsafe. Technically it's possible to do this, but it demands semaphores, locks, or mutexes. This will make your application slow and fragile. The only place where it's remotely sane to share sockets between threads are in language bindings that need to do magic like garbage collection on sockets.

and later on:

Remember: Do not use or close sockets except in the thread that created them.

I also understood that the ZeroMQ Context is threadsafe.

If a class registers for an event of a another class, in .Net, this event might be invoked from a different thread than the thread the listener was created on.

I think there are only two options to be able to dispatch something via ZeroMQ-Sockets from within an eventhandler:

  • Synchronize the eventhandler-invoking-thread to the thread the ZeroMQ-Socket was created in
  • Create a new ZeroMQ-Socket / get the exisiting ZeroMQ-Socket for the thread within the eventhandler by using the threadsafe ZeroMQ-Context

It seems that the 0MQ-Guide to discourage the first one and I don't think that creating a new ZeroMq-Socket for each thread is performant / the way to go.

My Question:
What is the correct pattern (the way it is meant to be) to publish messages via 0MQ from within an eventhandler?

Also, did the authors of the guide have the ZeroMQ-Binding for .Net in mind when they wrote:

The only place where it's remotely sane to share sockets between threads are in language bindings that need to do magic like garbage collection on sockets. ?

Here is some samplecode to emphasize my problem/question:

public class ExampleClass
{
    public event EventHandler<ByteEventArgs> SomethinIsCalledFromAnotherThread;
}

public class ByteEventArgs : EventArgs
{
    public byte[] BytesToSend;
}


public class Dispatcher
{
    ZMQ.Context ctx;

    public Dispatcher(ZMQ.Context mqcontext, ExampleClass exampleClassInstance)
    {
        this.ctx = mqcontext;
        exampleClassInstance.SomethinIsCalledFromAnotherThread += new EventHandler<ByteEventArgs>(exampleClass_SomethinIsCalledFromAnotherThread);
    }

    void exampleClass_SomethinIsCalledFromAnotherThread(object sender, ByteEventArgs e)
    {
        // this method might be called by a different thread. So I have to get a new socket etc?
        using (var socket = ctx.Socket(ZMQ.SocketType.PUSH))
        {
            // init socket etc..... and finally: 
            socket.Send(e.BytesToSend);
        }
        // isn't that too much overhead?
    }
}

解决方案

In .net framework v4 and up you can use concurrent collection to solve this problem. Namely Producer-Consumer pattern. Multiple threads (handlers) can send data to a thread-safe queue and just single thread consumes data from the queue and sends it using the socket.

Here is the idea:

sendQueue = new BlockingCollection<MyStuff>(new ConcurrentQueue<MyStuff>());
// concurrent queue can accept from multiple threads/handlers safely
MyHandler += (MyStuff stuffToSend) => sendQueue.Add(stuffToSend);

// start single-threaded data send loop
Task.Factory.StartNew(() => {
    using(var socket = context.Socket()) {
        MyStuff stuffToSend;
        // this enumerable will be blocking until CompleteAdding is called
        foreach(var stuff in sendQueue.GetConsumingEnumerable())
            socket.Send(stuff.Serialize());
    }
});

// break out of the send loop when done
OnMyAppExit += sendQueue.CompleteAdding;

这篇关于0MQ:如何在一个线程安全的方式使用ZeroMQ?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆