pubsub如何知道某个时间点发布了多少条消息? [英] How does pubsub know how many messages I published at a point in time?

查看:177
本文介绍了pubsub如何知道某个时间点发布了多少条消息?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在此处发布消息的代码:

Code for publishing the messages here:

async function publishMessage(topicName) {
  console.log(`[${new Date().toISOString()}] publishing messages`);
  const pubsub = new PubSub({ projectId: PUBSUB_PROJECT_ID });
  const topic = pubsub.topic(topicName, {
    batching: {
      maxMessages: 10,
      maxMilliseconds: 10 * 1000,
    },
  });

  const n = 5;
  const dataBufs: Buffer[] = [];
  for (let i = 0; i < n; i++) {
    const data = `message payload ${i}`;
    const dataBuffer = Buffer.from(data);
    dataBufs.push(dataBuffer);
  }

  const results = await Promise.all(
    dataBufs.map((dataBuf, idx) =>
      topic.publish(dataBuf).then((messageId) => {
        console.log(`[${new Date().toISOString()}] Message ${messageId} published. index: ${idx}`);
        return messageId;
      })
    )
  );
  console.log('results:', results.toString());
}

如您所见,我将发布5条消息.发布时间是await Promise.all(...),对于用户来说,这时我们可以说发送消息,但是对于pubsub库内部而言,可能不是.我将maxMessages设置为10,因此pubsub将等待10秒(maxMilliseconds),然后发布这5条消息.

As you can see, I am going to publish 5 messages. The time to publish is await Promise.all(...), I mean, for users, We can say send messages at this moment, but for internal of pubsub library maybe not. I set maxMessages to 10, so pubsub will wait for 10 seconds(maxMilliseconds), then publish these 5 messages.

抽烟结果符合我的期望:

The exuection result meets my expectations:

[2020-05-05T09:53:32.078Z] publishing messages
[2020-05-05T09:53:42.209Z] Message 36854 published. index: 0
[2020-05-05T09:53:42.209Z] Message 36855 published. index: 1
[2020-05-05T09:53:42.209Z] Message 36856 published. index: 2
[2020-05-05T09:53:42.209Z] Message 36857 published. index: 3
[2020-05-05T09:53:42.209Z] Message 36858 published. index: 4
results: 36854,36855,36856,36857,36858

实际上,我认为topic.publish不会直接调用远程pubsub服务,而是将消息推送到内存队列中.而且有一个窗口时间可以计算邮件的数量,可能是打勾或类似的时间:

In fact, I think topic.publish does not directly call the remote pubsub service, but pushes the message into the memory queue. And there is a window time to calculate the count of the messages, maybe in a tick or something like:

// internal logic of @google/pubsub library
setTimeout(() => {
  // if user messages to be published gte maxMessages, then, publish them immediately
  if(getLength(messageQueue) >= maxMessages) {
    callRemotePubsubService(messageQueue)
  }
}, /* window time = */ 100);

还是使用setImmediate()process.nextTick()?

推荐答案

请注意,向服务发送消息的条件是或"而不是与".换句话说,如果maxMessages消息正在等待发送,或者自库接收到第一条未完成的消息以来已经过去了maxMilliseconds,它将把未完成的消息发送到服务器.

Note that the conditions for sending a message to the service is an OR not an AND. In other words, if either maxMessages messages are waiting to be sent OR maxMilliseconds has passed since the library received the first outstanding message, it will send the outstanding messages to the server.

客户端库的源代码可用,因此您可以确切地看到它的内容.做.该库有一个队列,用于跟踪尚未发送的消息.添加消息后,如果队列现在已满(基于批处理设置),则它使用setTimeout 安排最终在该服务上发布的呼叫.发布者客户端具有队列实例时,它会在调用publish时向其添加消息.

The source code for the client library is available, so you can see exactly what it does. The library has a queue that it uses to track messages that haven't been sent yet. When a message is added, if the queue is now full (based on the batching settings), then it immediately calls publish. When the first message is added, it uses setTimeout to schedule a call that ultimately calls publish on the service. The publisher client has an instance of the queue to which it adds messages when publish is called.

这篇关于pubsub如何知道某个时间点发布了多少条消息?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆