Google PubSub python客户端返回StatusCode.UNAVAILABLE [英] Google PubSub python client returning StatusCode.UNAVAILABLE

查看:314
本文介绍了Google PubSub python客户端返回StatusCode.UNAVAILABLE的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试为Google Cloud PubSub主题建立长期运行的Pull订阅. 我使用的代码与文档这里,即:

I am trying to establish a long running Pull subscription to a Google Cloud PubSub topic. I am using a code very similar to the example given in the documentation here, i.e.:

def receive_messages(project, subscription_name):
    """Receives messages from a pull subscription."""
    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(
        project, subscription_name)

    def callback(message):
        print('Received message: {}'.format(message))
        message.ack()

    subscriber.subscribe(subscription_path, callback=callback)

    # The subscriber is non-blocking, so we must keep the main thread from
    # exiting to allow it to process messages in the background.
    print('Listening for messages on {}'.format(subscription_path))
    while True:
        time.sleep(60)

问题是我有时会收到以下回溯:

The problem is that I'm receiving the following traceback sometimes:

Exception in thread Consumer helper: consume bidirectional stream:
Traceback (most recent call last):
  File "/usr/lib/python3.5/threading.py", line 914, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.5/threading.py", line 862, in run
    self._target(*self._args, **self._kwargs)
  File "/path/to/google/cloud/pubsub_v1/subscriber/_consumer.py", line 248, in _blocking_consume
    self._policy.on_exception(exc)
  File "/path/to/google/cloud/pubsub_v1/subscriber/policy/thread.py", line 135, in on_exception
    raise exception
  File "/path/to/google/cloud/pubsub_v1/subscriber/_consumer.py", line 234, in _blocking_consume
    for response in response_generator:
  File "/path/to/grpc/_channel.py", line 348, in __next__
    return self._next()
  File "/path/to/grpc/_channel.py", line 342, in _next
    raise self
grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with (StatusCode.UNAVAILABLE, The service was unable to fulfill your request. Please try again. [code=8a75])>

我看到在另一个问题中提到了此问题,但是我在这里问如何在Python中正确处理它.我试图将请求包装在一个异常中,但它似乎在后台运行,并且在发生该错误的情况下我无法重试.

I saw that this was referenced in another question but here I am asking to how to handle it properly in Python. I have tried to wrap the request in an exception but it seems to run in the background and I am not able to retry in case of that error.

推荐答案

自定义

A somewhat hacky approach that is working for me is a custom policy_class. The default one has an on_exception function that ignores DEADLINE_EXCEEDED. You can make a class that inherits the default and also ignores UNAVAILABLE. Mine looks like this:

from google.cloud import pubsub
from google.cloud.pubsub_v1.subscriber.policy import thread
import grpc

class AvailablePolicy(thread.Policy):
    def on_exception(self, exception):
        """The parent ignores DEADLINE_EXCEEDED. Let's also ignore UNAVAILABLE.

        I'm not sure what triggers that error, but if you ignore it, your
        subscriber seems to work just fine. It's probably an intermittent
        thing and it reconnects later if you just give it a chance.
        """
        # If this is UNAVAILABLE, then we want to retry.
        # That entails just returning None.
        unavailable = grpc.StatusCode.UNAVAILABLE
        if getattr(exception, 'code', lambda: None)() == unavailable:
            return
        # For anything else, fallback on super.
        super(AvailablePolicy, self).on_exception(exception)

subscriber = pubsub.SubscriberClient(policy_class=AvailablePolicy)
# Continue to set up as normal.

看起来很像

It looks a lot like the original on_exception just ignores a different error. If you want, you can add some logging whenever the exception is thrown and verify that everything still works. Future messages will still come through.

这篇关于Google PubSub python客户端返回StatusCode.UNAVAILABLE的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆