2分钟让ZMQ pub/sub连接到kubernetes [英] 2 minutes for ZMQ pub/sub to connect in kubernetes

查看:93
本文介绍了2分钟让ZMQ pub/sub连接到kubernetes的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个使用weave作为我的CNI的Kubernetes 1.18集群.我有一个基于ZMQ的发布/订阅应用程序,经常(并非总是)看到订阅者需要2分钟才能收到发布者的消息.这似乎是我的Kubernetes环境的某种套接字超时唯一性.

I have a Kubernetes 1.18 cluster using weave as my CNI. I have a ZMQ based pub/sub app and I am often (not always) seeing it take 2 minutes before the subscriber can receive messages from the publisher. This seems to be some sort of socket timeout uniqe to my Kubernetes environment.

这是我琐碎的ZMQ应用示例

Here is my trivial ZMQ app example

#!/bin/env python2
import zmq, sys, time, argparse, logging, datetime, threading
from zmq.utils.monitor import recv_monitor_message

FORMAT = '%(asctime)-15s %(message)s'
logging.basicConfig(format=FORMAT)

if zmq.zmq_version_info() < (4, 0):
    raise RuntimeError("monitoring in libzmq version < 4.0 is not supported")

logging.error("libzmq-%s" % zmq.zmq_version())
if zmq.zmq_version_info() < (4, 0):
    raise RuntimeError("monitoring in libzmq version < 4.0 is not supported")

EVENT_MAP = {}
logging.error("Event names:")
for name in dir(zmq):
    if name.startswith('EVENT_'):
        value = getattr(zmq, name)
        logging.error("%21s : %4i" % (name, value))
        EVENT_MAP[value] = name


def event_monitor(monitor):
    while monitor.poll():
        evt = recv_monitor_message(monitor)
        evt.update({'description': EVENT_MAP[evt['event']]})
        logging.error("Event: {}".format(evt))
        if evt['event'] == zmq.EVENT_MONITOR_STOPPED:
            break
    monitor.close()
    logging.error("event monitor thread done!")

parser = argparse.ArgumentParser("Simple zmq pubsub example")
parser.add_argument("pub_or_sub", help="Either pub or sub")
parser.add_argument("host", help="host address to connect to if sub otherwise the address to bind to")
parser.add_argument("--port", "-p", type=int, help="The port to use", default=4567)
args = parser.parse_args()

context = zmq.Context()

if args.pub_or_sub.lower() == "sub":
    zmq_socket = context.socket(zmq.SUB)
    monitor = zmq_socket.get_monitor_socket()
    t = threading.Thread(target=event_monitor, args=(monitor,))
    t.setDaemon(True)
    t.start()

    zmq_socket.setsockopt(zmq.SUBSCRIBE, "")
    zmq_socket.connect("tcp://{}:{}".format(args.host, args.port))
    while 1:
        if zmq_socket.poll(timeout=1000):
            logging.error("Received: {}".format(zmq_socket.recv()))
        else:
            logging.error("No message available")
elif args.pub_or_sub.lower() == "pub":
    zmq_socket = context.socket(zmq.PUB)
    monitor = zmq_socket.get_monitor_socket()
    t = threading.Thread(target=event_monitor, args=(monitor,))
    t.setDaemon(True)
    t.start()
    zmq_socket.bind("tcp://{}:{}".format(args.host, args.port))
    i = 0
    while 1:
        logging.error("Sending message: {}".format(i))
        zmq_socket.send("Message {} at {}".format(i, datetime.datetime.now()))
        i += 1
        time.sleep(1.0)
else:
    raise RuntimeError("Needs to either be sub or pub nothing else allowed")

这是我在Kubernetes中部署它的方式:

Here is how I am deploying it within Kubernetes:

---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: pub-deployment
  labels:
    app: pub
spec:
  replicas: 1
  selector:
    matchLabels:
      app: pub
  template:
    metadata:
      labels:
        app: pub
    spec:
      containers:
      - name: pub
        image: bagoulla/zmq:latest
        command: ["pubsub", "pub", "0.0.0.0"]
---
apiVersion: v1
kind: Service
metadata:
  name: pub
spec:
  selector:
    app: pub
  ports:
    - protocol: TCP
      port: 4567
      targetPort: 4567
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: sub-deployment
  labels:
    app: sub
spec:
  replicas: 1
  selector:
    matchLabels:
      app: sub
  template:
    metadata:
      labels:
        app: sub
    spec:
      containers:
      - name: sub
        image: bagoulla/zmq:latest
        command: ["pubsub", "sub", "pub"]

我期望从订阅服务器上看到的内容以及在同一主机上的Kubernetes外部运行时(尽管仍在Docker中)看到的内容迅速地重复了以下内容,直到pub容器准备就绪并进行了路由:

What I would expect to see from the subscriber, and I do see when running outside of Kubernetes on the same host (though still in Docker), is the following repeated in quick succession until the pub container is ready and routed:

2020-08-16 08:12:09,141 Event: {'endpoint': 'tcp://127.0.0.1:4567', 'event': 2, 'value': 115, 'description': 'EVENT_CONNECT_DELAYED'}
2020-08-16 08:12:09,141 Event: {'endpoint': 'tcp://127.0.0.1:4567', 'event': 128, 'value': 12, 'description': 'EVENT_CLOSED'}
2020-08-16 08:12:09,142 Event: {'endpoint': 'tcp://127.0.0.1:4567', 'event': 4, 'value': 183, 'description': 'EVENT_CONNECT_RETRIED'}
2020-08-16 08:12:09,328 Event: {'endpoint': 'tcp://127.0.0.1:4567', 'event': 2, 'value': 115, 'description': 'EVENT_CONNECT_DELAYED'}

但是我在Kubernetes中看到的却是:

However what I see in Kubernetes instead is:

│ 2020-08-16 05:54:51,724 Event: {'endpoint': 'tcp://pub:4567', 'event': 2, 'value': 115, 'description': 'EVENT_CONNECT_DELAYED'}                                                                                                            │
.... 2 minutes later....
│ 2020-08-16 05:56:59,038 No message available                                                                                                                                                                                               │
│ 2020-08-16 05:56:59,056 Event: {'endpoint': 'tcp://pub:4567', 'event': 128, 'value': 12, 'description': 'EVENT_CLOSED'}                                                                                                                    │
│ 2020-08-16 05:56:59,056 Event: {'endpoint': 'tcp://pub:4567', 'event': 4, 'value': 183, 'description': 'EVENT_CONNECT_RETRIED'}                                                                                                            │
│ 2020-08-16 05:56:59,243 Event: {'endpoint': 'tcp://pub:4567', 'event': 2, 'value': 115, 'description': 'EVENT_CONNECT_DELAYED'}                                                                                                            │
│ 2020-08-16 05:56:59,245 Event: {'endpoint': 'tcp://pub:4567', 'event': 1, 'value': 12, 'description': 'EVENT_CONNECTED'}                                                                                                                   │
│ 2020-08-16 05:56:59,286 Received: Message 127 at 2020-08-16 05:56:59.286036  

显然,Kubernetes内的某些东西阻止了"EVENT_CLOSED"事件的发生.事件发生在及时的庄园中.这可能是什么?

Clearly something within Kubernetes is preventing the "EVENT_CLOSED" event from occurring in a timely manor. What could this be?

推荐答案

问题是,当服务启动时,它实际上会创建一个TCP黑洞,可以在其中启动tcp连接,但永远不会结束连接.用户应在TCP连接上设置超时,以便他们可以重试连接,直到基础部署或Pod启动并正确路由为止.对于ZMQ,可以使用ZMQ_CONNECT_TIMEOUT套接字选项完成此操作.

The issue is that when the service comes up it essentially creates a TCP black hole where tcp connections can be started but never end up connecting. Users should set a timeout on TCP connections so that they can retry the connection until the underlying deployment or pod is up and routed properly. For ZMQ this can be done with the ZMQ_CONNECT_TIMEOUT socket option.

这篇关于2分钟让ZMQ pub/sub连接到kubernetes的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆