谷歌云发布订阅数据丢失 [英] Google Cloud Pubsub Data lost

查看:30
本文介绍了谷歌云发布订阅数据丢失的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在使用 GCP pubsub 时遇到了一个问题,即在几秒钟内发布数千条消息时会丢失一小部分数据.

I'm experiencing a problem with GCP pubsub where a small percentage of data was lost when publishing thousands of messages in couple seconds.

我正在记录来自 pubsub 的 message_id 和发布端和接收端每条消息唯一的 session_id,结果我是看到的是接收端的一些消息具有相同的session_id,但不同的message_id.此外,还丢失了一些消息.

I'm logging both message_id from pubsub and a session_id unique to each message on both the publishing end as well as the receiving end, and the result I'm seeing is that some message on the receiving end has same session_id, but different message_id. Also, some messages were missing.

例如,在一项测试中,我向 pubsub 发送了 5,000 条消息,并且恰好收到了 5,000 条消息,其中 8 条消息丢失.日志丢失消息如下所示:

For example, in one test I send 5,000 messages to pubsub, and exactly 5,000 messages were received, with 8 messages lost. The log lost messages look like this:

MISSING sessionId:sessionId: 731 (missing in log from pull request, but present in log from Flask API)

messageId FOUND: messageId:108562396466545

API: 200 **** sessionId: 731, messageId:108562396466545 ******(Log from Flask API)

Pubsub: sessionId: 730, messageId:108562396466545(Log from pull request)

重复项看起来像:

======= Duplicates FOUND on sessionId: 730=======

sessionId: 730, messageId:108562396466545

sessionId: 730, messageId:108561339282318

(both are logs from pull request)

所有缺失的数据和重复项看起来像这样.

All missing data and duplicates look like this.

从上面的例子可以看出,有些消息已经取了另一条消息的message_id,并且用两个不同的message_id发送了两次.

From the above example, it is clear that some messages has taken the message_id of another message, and has been sent twice with two different message_ids.

我想知道有没有人能帮我弄清楚发生了什么?提前致谢.

I wonder if anyone would help me figure out what is going on? Thanks in advance.

代码

我有一个向 pubsub 发送消息的 API,如下所示:

I have an API sending message to pubsub, which looks like this:

from flask import Flask, request, jsonify, render_template
from flask_cors import CORS, cross_origin
import simplejson as json
from google.cloud import pubsub
from functools import wraps
import re
import json


app = Flask(__name__)
ps = pubsub.Client()

...

@app.route('/publish', methods=['POST'])
@cross_origin()
@json_validator
def publish_test_topic():
    pubsub_topic = 'test_topic'
    data = request.data

    topic = ps.topic(pubsub_topic)

    event = json.loads(data)

    messageId = topic.publish(data)
    return '200 **** sessionId: ' + str(event["sessionId"]) + ", messageId:" + messageId + " ******"

这是我用来从pubsub读取的代码:

And this is the code I used to read from pubsub:

从 google.cloud 导入 pubsub进口重新导入json

from google.cloud import pubsub import re import json

ps = pubsub.Client()
topic = ps.topic('test-xiu')
sub = topic.subscription('TEST-xiu')

max_messages = 1
stop = False

messages = []

class Message(object):
    """docstring for Message."""
    def __init__(self, sessionId, messageId):
        super(Message, self).__init__()
        self.seesionId = sessionId
        self.messageId = messageId


def pull_all():
    while stop == False:

        m = sub.pull(max_messages = max_messages, return_immediately = False)

        for data in m:
            ack_id = data[0]
            message = data[1]
            messageId = message.message_id
            data = message.data
            event = json.loads(data)
            sessionId = str(event["sessionId"])
            messages.append(Message(sessionId = sessionId, messageId = messageId))

            print '200 **** sessionId: ' + sessionId + ", messageId:" + messageId + " ******"

            sub.acknowledge(ack_ids = [ack_id])

pull_all()

用于生成 session_id,发送请求 &来自 API 的记录响应:

For generating session_id, sending request & logging response from API:

// generate trackable sessionId
var sessionId = 0

var increment_session_id = function () {
  sessionId++;
  return sessionId;
}

var generate_data = function () {
  var data = {};
  // data.sessionId = faker.random.uuid();
  data.sessionId = increment_session_id();
  data.user = get_rand(userList);
  data.device = get_rand(deviceList);
  data.visitTime = new Date;
  data.location = get_rand(locationList);
  data.content = get_rand(contentList);

  return data;
}

var sendData = function (url, payload) {
  var request = $.ajax({
    url: url,
    contentType: 'application/json',
    method: 'POST',
    data: JSON.stringify(payload),
    error: function (xhr, status, errorThrown) {
      console.log(xhr, status, errorThrown);
      $('.result').prepend("<pre id='json'>" + JSON.stringify(xhr, null, 2) + "</pre>")
      $('.result').prepend("<div>errorThrown: " + errorThrown + "</div>")
      $('.result').prepend("<div>======FAIL=======</div><div>status: " + status + "</div>")
    }
  }).done(function (xhr) {
    console.log(xhr);
    $('.result').prepend("<div>======SUCCESS=======</div><pre id='json'>" + JSON.stringify(payload, null, 2) + "</pre>")
  })
}

$(submit_button).click(function () {
  var request_num = get_request_num();
  var request_url = get_url();
  for (var i = 0; i < request_num; i++) {
    var data = generate_data();
    var loadData = changeVerb(data, 'load');
    sendData(request_url, loadData);
  }
}) 

更新

我对 API 进行了更改,问题似乎消失了.我所做的更改不是对所有请求使用一个 pubsub.Client(),而是为每个传入的请求初始化一个客户端.新 API 如下所示:

I made a change on the API, and the issue seems to go away. The changes I made was instead of using one pubsub.Client() for all request, I initialized a client for every single request coming in. The new API looks like:

from flask import Flask, request, jsonify, render_template
from flask_cors import CORS, cross_origin
import simplejson as json
from google.cloud import pubsub
from functools import wraps
import re
import json


app = Flask(__name__)

...

@app.route('/publish', methods=['POST'])
@cross_origin()
@json_validator
def publish_test_topic():

    ps = pubsub.Client()


    pubsub_topic = 'test_topic'
    data = request.data

    topic = ps.topic(pubsub_topic)

    event = json.loads(data)

    messageId = topic.publish(data)
    return '200 **** sessionId: ' + str(event["sessionId"]) + ", messageId:" + messageId + " ******"

推荐答案

与来自 Google 的某个人交谈,似乎是 Python 客户端的问题:

Talked with some guy from Google, and it seems to be an issue with the Python Client:

我们这边的共识是,当前的python客户端存在线程安全问题.正如我们所说,客户端库几乎从头开始重写,所以我不想在当前版本中进行任何修复.我们预计新版本将在 6 月底推出.

The consensus on our side is that there is a thread-safety problem in the current python client. The client library is being rewritten almost from scratch as we speak, so I don't want to pursue any fixes in the current version. We expect the new version to become available by end of June.

在 app.yaml 中使用 thread_safe: false 运行当前代码或更好但只是在每次调用中实例化客户端应该是解决方法 - 您找到的解决方案.

Running the current code with thread_safe: false in app.yaml or better yet just instantiating the client in every call should is the work around -- the solution you found.

详细解决方法请看问题中的更新

For detailed solution, please see the Update in the question

这篇关于谷歌云发布订阅数据丢失的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆