Google云Pubsub数据丢失 [英] Google Cloud Pubsub Data lost

查看:94
本文介绍了Google云Pubsub数据丢失的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我遇到了GCP pubsub存在的问题,在几秒钟内发布数千条消息时,一小部分数据丢失了。



我从pubsub记录 message_id ,并记录 session_id session_id ,但是不同的 message_id 。例如,在一次测试中,我发送了5,000条消息到pubsub,并且正好收到了5,000条消息,并且丢失了8条消息。日志丢失的消息如下所示:

  MISSING sessionId:sessionId:731(从请求日志中缺少,但以日志来自Flask API)
$ b $ messageId FOUND:messageId:108562396466545

API:200 **** sessionId:731,messageId:108562396466545(Log from Flask API)

Pubsub:sessionId:730,messageId:108562396466545(从请求日志)

重复的内容如下所示:

  =======在sessionId上重复发现:730 === ==== 
$ b $ sessionId:730,messageId:108562396466545

sessionId:730,messageId:108561339282318

(都是来自pull请求的日志)

所有缺少的数据和重复内容都是这样。



从上面的例子可以看出,有些消息已经使用了另一条消息的 message_id ,并且已经发送了两次,其中有两个不同的 message_id s。



我很喜欢呃如果有人能帮我弄清楚发生了什么?
$ b

代码

我有一个API发送消息给pubsub ,看起来像这样:

from flask import Flask,request,jsonify,render_template
from flask_cors进口CORS,cross_origin
将simplejson导入为json $ b $ from google.cloud import pubsub $ b $ from functools导入包装
导入重新导入
导入json


app = Flask(__ name__)
ps = pubsub.Client()

...

@ app.route('/ publish',methods = ['POST'])
@cross_origin()
@json_validator
def publish_test_topic():
pubsub_topic ='test_topic'
data = request .data

topic = ps.topic(pubsub_topic)

event = json.loads(data)

messageId = topic.publish(data)
return'200 **** sessionId:'+ str(event [sessionId])+,messageId:+ messageId +******

这是我用来从pubsub读取的代码:



from google.cloud import pubsub
import re
import json

  ps = pubsub.Client()
topic = ps.topic('test-xiu')
sub = topic.subscription('TEST-xiu')

max_messages = 1
stop = False

messages = []

class消息(对象):
消息的文档字符串。
def __init __(self,sessionId,messageId):
super(Message,self).__ init __()
self.seesionId = sessionId
self.messageId = messageId


def pull_all():
while stop == False:

m = sub.pull(max_messages = max_messages,return_immediately = False)

用于m中的数据:
ack_id = data [0]
message = data [1]
messageId = message.message_id
data = message.data
event = json.loads(data)
sess ionId = str(event [sessionId])
messages.append(Message(sessionId = sessionId,messageId = messageId))

print'200 **** sessionId:'+ sessionId +,messageId:+ messageId +******

sub.acknowledge(ack_ids = [ack_id])

pull_all()

为了生成session_id,发送请求&从API记录响应:

  //生成可追踪的sessionId 
var sessionId = 0

var increment_session_id = function(){
sessionId ++;
return sessionId;
}

var generate_data = function(){
var data = {};
// data.sessionId = faker.random.uuid();
data.sessionId = increment_session_id();
data.user = get_rand(userList);
data.device = get_rand(deviceList);
data.visitTime = new Date;
data.location = get_rand(locationList);
data.content = get_rand(contentList);

返回数据;

$ b $ var sendData = function(url,payload){
var request = $ .ajax({
url:url,
contentType:' application / json',
method:'POST',
data:JSON.stringify(payload),
error:function(xhr,status,errorThrown){
console.log (xhr,status,errorThrown);
$('。result')。prepend(< pre id ='json'>+ JSON.stringify(xhr,null,2)+< pre>)
$('。result')。prepend(< div> errorThrown:+ errorThrown +< / div>)
$('。result')。prepend (< div> ====== FAIL =======< / div>< div>状态:+ status +< / div>)
}
$ b'('。result')。prepend(< div> ====== SUCCESS = ======< / div>< pre id ='json'>+ JSON.stringify(payload,null,2)+< / pre>)
})


$(submit_button).click(function(){
var request_num = get_request_num();
var request_url = get_url();
for(var i = 0; i< request_num; i ++){
var data = generate_data();
var loadData = changeVerb(data,'load');
sendData(request_url,loadData);

))

更新 p>

我对API进行了更改,问题似乎消失了。我所做的更改不是针对所有请求使用一个 pubsub.Client(),而是针对每一个请求初始化客户端。新API如下所示:

from flask import Flas,request,jsonify,render_template $ b $ from flask_cors import CORS,cross_origin
将simplejson作为json
从google.cloud导入pubsub
从functools导入包装
导入重新导入
导入json


应用= Flask(__ name__)
$ b ...

@ app.route('/ publish',methods = ['POST'])
@cross_origin()
@json_validator
def publish_test_topic():

ps = pubsub.Client()


pubsub_topic ='test_topic'
data = request.data

topic = ps.topic(pubsub_topic)

event = json.loads(data)

messageId = topic。发布(数据)
返回'200 **** sessionId:'+ str(event [sessionId])+,messageId :+ messageId +******


解决方案

<与Google的一些人交谈,这似乎是Python客户端的一个问题:


我们的共识是在当前的python客户端中存在线程安全问题。当我们说话时,客户端库几乎从头开始被重写,所以我不想在当前版本中进行任何修复。我们预计新版本将在6月底之前上市。



运行app.yaml中的thread_safe:false的当前代码,或者更好,但是在每次调用中实例化客户端应该是解决方法 - 您找到的解决方案。


有关详细解决方案,请参阅问题中的更新


I'm experiencing a problem with GCP pubsub where a small percentage of data was lost when publishing thousands of messages in couple seconds.

I'm logging both message_id from pubsub and a session_id unique to each message on both the publishing end as well as the receiving end, and the result I'm seeing is that some message on the receiving end has same session_id, but different message_id. Also, some messages were missing.

For example, in one test I send 5,000 messages to pubsub, and exactly 5,000 messages were received, with 8 messages lost. The log lost messages look like this:

MISSING sessionId:sessionId: 731 (missing in log from pull request, but present in log from Flask API)

messageId FOUND: messageId:108562396466545

API: 200 **** sessionId: 731, messageId:108562396466545 ******(Log from Flask API)

Pubsub: sessionId: 730, messageId:108562396466545(Log from pull request)

And the duplicates looks like:

======= Duplicates FOUND on sessionId: 730=======

sessionId: 730, messageId:108562396466545

sessionId: 730, messageId:108561339282318

(both are logs from pull request)

All missing data and duplicates look like this.

From the above example, it is clear that some messages has taken the message_id of another message, and has been sent twice with two different message_ids.

I wonder if anyone would help me figure out what is going on? Thanks in advance.

Code

I have an API sending message to pubsub, which looks like this:

from flask import Flask, request, jsonify, render_template
from flask_cors import CORS, cross_origin
import simplejson as json
from google.cloud import pubsub
from functools import wraps
import re
import json


app = Flask(__name__)
ps = pubsub.Client()

...

@app.route('/publish', methods=['POST'])
@cross_origin()
@json_validator
def publish_test_topic():
    pubsub_topic = 'test_topic'
    data = request.data

    topic = ps.topic(pubsub_topic)

    event = json.loads(data)

    messageId = topic.publish(data)
    return '200 **** sessionId: ' + str(event["sessionId"]) + ", messageId:" + messageId + " ******"

And this is the code I used to read from pubsub:

from google.cloud import pubsub import re import json

ps = pubsub.Client()
topic = ps.topic('test-xiu')
sub = topic.subscription('TEST-xiu')

max_messages = 1
stop = False

messages = []

class Message(object):
    """docstring for Message."""
    def __init__(self, sessionId, messageId):
        super(Message, self).__init__()
        self.seesionId = sessionId
        self.messageId = messageId


def pull_all():
    while stop == False:

        m = sub.pull(max_messages = max_messages, return_immediately = False)

        for data in m:
            ack_id = data[0]
            message = data[1]
            messageId = message.message_id
            data = message.data
            event = json.loads(data)
            sessionId = str(event["sessionId"])
            messages.append(Message(sessionId = sessionId, messageId = messageId))

            print '200 **** sessionId: ' + sessionId + ", messageId:" + messageId + " ******"

            sub.acknowledge(ack_ids = [ack_id])

pull_all()

For generating session_id, sending request & logging response from API:

// generate trackable sessionId
var sessionId = 0

var increment_session_id = function () {
  sessionId++;
  return sessionId;
}

var generate_data = function () {
  var data = {};
  // data.sessionId = faker.random.uuid();
  data.sessionId = increment_session_id();
  data.user = get_rand(userList);
  data.device = get_rand(deviceList);
  data.visitTime = new Date;
  data.location = get_rand(locationList);
  data.content = get_rand(contentList);

  return data;
}

var sendData = function (url, payload) {
  var request = $.ajax({
    url: url,
    contentType: 'application/json',
    method: 'POST',
    data: JSON.stringify(payload),
    error: function (xhr, status, errorThrown) {
      console.log(xhr, status, errorThrown);
      $('.result').prepend("<pre id='json'>" + JSON.stringify(xhr, null, 2) + "</pre>")
      $('.result').prepend("<div>errorThrown: " + errorThrown + "</div>")
      $('.result').prepend("<div>======FAIL=======</div><div>status: " + status + "</div>")
    }
  }).done(function (xhr) {
    console.log(xhr);
    $('.result').prepend("<div>======SUCCESS=======</div><pre id='json'>" + JSON.stringify(payload, null, 2) + "</pre>")
  })
}

$(submit_button).click(function () {
  var request_num = get_request_num();
  var request_url = get_url();
  for (var i = 0; i < request_num; i++) {
    var data = generate_data();
    var loadData = changeVerb(data, 'load');
    sendData(request_url, loadData);
  }
}) 

UPDATE

I made a change on the API, and the issue seems to go away. The changes I made was instead of using one pubsub.Client() for all request, I initialized a client for every single request coming in. The new API looks like:

from flask import Flask, request, jsonify, render_template
from flask_cors import CORS, cross_origin
import simplejson as json
from google.cloud import pubsub
from functools import wraps
import re
import json


app = Flask(__name__)

...

@app.route('/publish', methods=['POST'])
@cross_origin()
@json_validator
def publish_test_topic():

    ps = pubsub.Client()


    pubsub_topic = 'test_topic'
    data = request.data

    topic = ps.topic(pubsub_topic)

    event = json.loads(data)

    messageId = topic.publish(data)
    return '200 **** sessionId: ' + str(event["sessionId"]) + ", messageId:" + messageId + " ******"

解决方案

Talked with some guy from Google, and it seems to be an issue with the Python Client:

The consensus on our side is that there is a thread-safety problem in the current python client. The client library is being rewritten almost from scratch as we speak, so I don't want to pursue any fixes in the current version. We expect the new version to become available by end of June.

Running the current code with thread_safe: false in app.yaml or better yet just instantiating the client in every call should is the work around -- the solution you found.

For detailed solution, please see the Update in the question

这篇关于Google云Pubsub数据丢失的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆