使用0mq(ZeroMQ)同步两个简单的python3脚本时出现死锁 [英] Deadlock when synchronizing two simple python3 scripts using 0mq (ZeroMQ)

查看:140
本文介绍了使用0mq(ZeroMQ)同步两个简单的python3脚本时出现死锁的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

当我尝试使用0mq( ZeroMQ )同步两个python3脚本时,出现了这个奇怪的死锁。这些脚本可以很好地运行数千次迭代,但是迟早它们都会停止并互相等待。我正在Windows 7上从不同的CMD-Windows运行两个脚本。

I get this strange deadlock when I try to synchronize two python3 scripts using 0mq (ZeroMQ). The scripts run fine for several thousand iterations, but sooner or later they both stop and wait for each other. I am running both scripts from different CMD-Windows on Windows 7.

我不知道
为什么这样的死锁甚至可能发生
这儿怎么了?

脚本A:

while (1):
   context = zmq.Context()
   socket = context.socket(zmq.REP)
   socket.bind('tcp://127.0.0.1:10001')
   msg = socket.recv()                        # Waiting for script B to send done
   # ............................................................................
   # ... do something useful (takes only a few millisecs)
   # ............................................................................     
   context = zmq.Context()
   socket = context.socket(zmq.REQ)
   socket.connect('tcp://127.0.0.1:10002')
   socket.send_string("done")                 # Tell script B we are done

脚本B

while (1):
   # ............................................................................
   # ... do something useful (takes only a few millisecs)
   # ............................................................................
   context = zmq.Context()
   socket = context.socket(zmq.REQ)
   socket.connect('tcp://127.0.0.1:10001')
   socket.send_string("done")               # Tell script A we are done

   context = zmq.Context()
   socket = context.socket(zmq.REP)
   socket.bind('tcp://127.0.0.1:10002')
   msg = socket.recv()                      # Waiting for script A to send done


推荐答案

这不是死锁情况



代码,当然,仍然需要注意。

This is not a DeadLock case

The code, sure, still needs some care.

消歧:您的方案不会陷入资源互锁状态,也就是 DeadLock 。是的,可以肯定,您的代码崩溃了,但是很可能不是由于 REQ / REP DeadLock(它可能而且确实出现在有损网络 tcp上)引起的。 : transport-class)。发布的代码由于非托管资源处理而崩溃,而不是由于达到DeadLock / LiveLock的相互阻塞状态。

Disambiguation: your scenario does not hit into a resources mutual locking state, aka a DeadLock. Yes, sure, your code crashes, but most probably not due to a REQ/REP DeadLock ( where it might and does appear on a lossy network tcp: transport-class ). The posted code is crashing due to unmanaged resource handling, not due to reaching a mutual-blocking state of a DeadLock / LiveLock.

首先,让我们假设您的超低延迟动机系统不允许重复实例化任何内容。

First, let's assume your ultra-low latency-motivated system does not allow to repetitively instantiate anything. There are exceptions to this, but let's be profi.


  1. 移动您的 .Context()资源设置(或从外部调用继承)

  1. move your .Context() resource setup ( or inheritance from an outer call ) out of the loop

审查,无论是否需要以及延迟限制,您都可以在每次循环运行中两次设置/拆卸 .socket()资源。

review, whether you need and your latency constraints allow you to setup / tear-down a .socket() resource twice in each loop-run.

决定,一旦丢失第一条消息,您是否可以忍受真正的 REQ / REP 僵局在传输路径中

decide, whether you can live with real REQ/REP deadlock once a first message gets lost in the transport-path

强制执行正常使用资源终止( .socket() -s,操作系统 port# s, .Context() -s)。不要让它们永远悬而未决,而要创造无限数量的其他组件,这会破坏任何故障恢复系统。资源永远不会是无限的。

enforce graceful resources-use termination ( .socket()-s, O/S port#s, .Context()-s ). Do not let them hanging unterminated forever, while creating infinite amount of others instead, that devastates any "fault-resilient" system. Resources are never infinite.

设计以非阻塞的方式同时进行信令和传输行为。这样,您就可以检测和处理远程进程超时,并为本地补救/响应性操作引入机会。

design both signalling and transmission behaviours in a non-blocking manner. This allows you to detect and handle remote-process timeouts and introduce a chance for local remedy / responsive actions.

重新设计达到所需的安全代码级别(以下示例在具有远程键盘以及其他一些本地和远程诊断工具的分布式处理框架中,在软实时控制的无穷循环24/7/365中工作了几年)。

redesign the code to a level of secure code you need ( the below example works a few years in a soft-realtime controlled endless loop 24/7/365 in a distributed processing framework with a remote keyboard and some other local- and remote-diagnostic tools ).






生产级代码缺少什么?



您的代码必须设想在分布式系统的任何部分中出了什么问题。是的,这很困难,但有必要。您的远程节点-一个通信对方-停止响应,丢失消息,重新启动,由于O / S崩溃而陷入僵局,无论可以想像的是(再加上一些令人讨厌的惊奇,您都只会在运行中发现...) 。这是本小博文中要介绍的另一个潘多拉魔盒,但这并不意味着没有必要。这是您的救生衣。


What is missing for production-grade code?

Your code has to "envisage" what might have gone wrong, in any part of your distributed system. Yes, it is hard, but necessary. Your remote node -- a communicating counterparty -- stopped responding, lost a message, went rebooted, stalled due to O/S crash, whatever imaginable ( plus a few rather nasty surprised you will find only on-the-fly ... ). This is another Pandora's Box to cover in this small post, which does not mean it is not necessary. It is your life-saving vest.

在任何可能的地方都以无阻塞的方式进行设计,这样您就可以控制事件...

Design in a non-blocking manner wherever you can, this way you remain in control of events ...

无论如何,总是释放系统资源和 .term()全部为ZeroMQ 。 Context()实例以优美的方式-整理是一种公平的做法-在现实生活中以及在代码帝国中更是如此。

Anyways, always release system resources and .term() all ZeroMQ .Context() instances in a graceful manner -- "tidy up" is a fair practice -- both in real life and the more in the code-empires.

# /\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\
#NONSTOP RESPONDER RAW EXAMPLE:
def aMiniRESPONDER( aTarget2Bind2_URL             = "tcp://A.B.C.D:8889",
                    anExternalPREDICTOR           = None,
                    anExternallyManagedZmqCONTEXT = None,
                    aSpreadMinSafetyMUL           = 3.0,
                    aSilentMODE                   = True
                    ):
   try: # RESOURCES LAYER
        # ... SETUP
        # ------------------------------------------------- .Context()
        # can setup a locally-managed context or re-use
        # anExternallyManagedZmqCONTEXT obtained upon a func Call
        aZmqCONTEXT   = anExternallyManagedZmqCONTEXT or zmq.Context( 1 )   

        # localhost:8887 [REP] ... remote [REQ] peer  .connect() + .send()
        aCtrlPORT_URL = "tcp://*:8887"                                      

        # localhost:8890 [PUB] ... remote [SUB] peers .connect() +
        # .subscribe + .recv( zmq.NOBLOCK ) ( MQL4 cannot .poll() so far ...)
        aSIGsPORT_URL = "tcp://*:8890"                                      
        aXmitPORT_URL = aTarget2Bind2_URL

        aListOfSOCKETs = []

        pass # -------------------------------------------------------------# ZMQ
        try: # -------------------------------------------------------------#
            # try: XmitPORT
            aXmitSOCKET = aZmqCONTEXT.socket( zmq.PAIR )

            # XmitPORT
            aXmitSOCKET.bind(      aXmitPORT_URL )                          
            aListOfSOCKETs.append( aXmitSOCKET )
        except:                                                             
            #    EXC: XmitPORT on Failure: GRACEFUL CLEARING XmitPORT

            msg =  "\nEXC. ZmqError({0:s}) on aXmitSOCKET setup / .bind( {1:s} )"
            print msg.format( repr( zmq.ZMQError() ), aTarget2Bind2_URL )
            raise ValueError( "ZMQ_EXC_EXIT @ XmitPORT SETUP" )
        pass # -------------------------------------------------------------# ZMQ
        try: # -------------------------------------------------------------#
            # try: CtrlPORT    
            # CtrlSOCKET [REP] .recv()s<--[REQ] + .send()s--> [REQ]
            aCtrlSOCKET = aZmqCONTEXT.socket( zmq.REP )                     

            # CtrlPORT <-REQ/REP means a remote peer [REQ] has to
            # .send()+.recv() before sending another CtrlCMD
            aCtrlSOCKET.bind(      aCtrlPORT_URL )                          
            aListOfSOCKETs.append( aCtrlSOCKET )
        except:                                                             
            # EXC: CtrlPORT on Failure: GRACEFUL CLEARING both CtrlPORT
            # and XmitPORT
            msg =  "\nEXC. ZmqError({0:s}) on aCtrlSOCKET setup / .bind( {1:s} )"
            print msg.format( repr( zmq.ZMQError() ), aCtrlPORT_URL )
            raise ValueError( "ZMQ_EXC_EXIT @ CtrlPORT SETUP" )
        pass # -------------------------------------------------------------# ZMQ
        try: # -------------------------------------------------------------#
            # try: SIGsPORT

            # SIGsPORT [PUB] .send()s--> [SUB]s
            aSIGsSOCKET= aZmqCONTEXT.socket( zmq.PUB  )                     

            # SIGsPORT -->  PUB/SUB means a remote peer(s) [SUB] .subscribe() + .recv()
            aSIGsSOCKET.bind(      aSIGsPORT_URL )                          
            aListOfSOCKETs.append( aSIGsSOCKET )
        except:                                                             
            # EXC: SIGsPORT on Failure: GRACEFUL CLEARING both CtrlPORT
            # and XmitPORT and SIGsPORT
            msg =  "\nEXC. ZmqError({0:s}) on aSIGsSOCKET setup / .bind( {1:s} )"
            print msg.format( repr( zmq.ZMQError() ), aSIGsPORT_URL )
            raise ValueError( "ZMQ_EXC_EXIT @ SIGsPORT SETUP" )
        pass # -------------------------------------------------------------# ZMQ

        # vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv
        # ... SETUP YOUR APPLICATION CODE

        try:     # APP LAYER ___________________________________________
           #           what you want to do
           #           here you go ...

        except:  # APP LAYER ___________________________________________
           #           handle EXCs

        finally: # APP LAYER ___________________________________________
           #           your own application post-mortem / pre-exit code

        # ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

   except:  # RESOURCES LAYER .............................................
        # ... code shall handle it's own exceptions + externally caused events

   finally: # RESOURCES LAYER .............................................
        # ... always, ALWAYS gracefully exit ( avoid leakages and dirty things )

        [ allSOCKETs.setsockopt( zmq.LINGER, 0 ) for allSOCKETs in aListOfSOCKETs ]
        [ allSOCKETs.close( )                    for allSOCKETs in aListOfSOCKETs ]

        # --------------------------------------------------------------#
        # RESOURCES dismantled, may .term()

        # .TERM(), NOP otherwise
        if not ( aZmqCONTEXT is anExternallyManagedZmqCONTEXT ):        #
                 aZmqCONTEXT.term()                                     #
        return

这篇关于使用0mq(ZeroMQ)同步两个简单的python3脚本时出现死锁的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆