如何在grpc中正确设计发布-订阅模式? [英] How to design publish-subscribe pattern properly in grpc?

查看:799
本文介绍了如何在grpc中正确设计发布-订阅模式?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用grpc来实现pub sub模式,但是我对如何正确执行它感到困惑.

i'm trying to implement pub sub pattern using grpc but i'm confusing a bit about how to do it properly.

我的原型:rpc call (google.protobuf.Empty) returns (stream Data);

客户端:

asynStub.call(Empty.getDefaultInstance(), new StreamObserver<Data>() {
         @Override
         public void onNext(Data value) {
           // process a data

         @Override
         public void onError(Throwable t) {

         }

         @Override
         public void onCompleted() {

         }
       });

   } catch (StatusRuntimeException e) {
     LOG.warn("RPC failed: {}", e.getStatus());
   }

   Thread.currentThread().join();

服务器服务:

public class Sender extends DataServiceGrpc.DataServiceImplBase implements Runnable {
  private final BlockingQueue<Data> queue;
  private final static HashSet<StreamObserver<Data>> observers = new LinkedHashSet<>();

  public Sender(BlockingQueue<Data> queue) {
    this.queue = queue;
  }

  @Override
  public void data(Empty request, StreamObserver<Data> responseObserver) {
    observers.add(responseObserver);
  }

  @Override
  public void run() {
    while (!Thread.currentThread().isInterrupted()) {
      try {
        // waiting for first element
        Data data = queue.take();
        // send head element
        observers.forEach(o -> o.onNext(data));

      } catch (InterruptedException e) {
        LOG.error("error: ", e);
        Thread.currentThread().interrupt();
      }
    }
  }
}

如何正确地从全球观察员中删除客户?连接断开时如何接收某种信号?
如何管理客户端-服务器重新连接?连接断开时如何强制客户端重新连接?

How to remove clients from global observers properly? How to received some sort of a signal when connection drops?
How to manage client-server reconnections? How to force client reconnect when connection drops?

提前谢谢!

推荐答案

在执行服务时:

  @Override
  public void data(Empty request, StreamObserver<Data> responseObserver) {
    observers.add(responseObserver);
  }

您需要获取上下文

You need to get the Context of the current request, and listen for cancellation. For single-request, multi-response calls (a.k.a. Server streaming) the gRPC generated code is simplified to pass in the the request directly. This means that you con't have direct access to the underlying ServerCall.Listener, which is how you would normally listen for clients disconnecting and cancelling.

相反,每个gRPC调用都有一个与之关联的Context,其中携带取消和其他请求范围的信号.对于您的情况,您只需要通过添加自己的侦听器来侦听取消,然后将其安全地从链接的哈希集中删除响应观察器.

Instead, every gRPC call has a Context associated with it, which carries the cancellation and other request-scoped signals. For your case, you just need to listen for cancellation by adding your own listener, which then safely removes the response observer from your linked hash set.

关于重新连接:如果断开连接,gRPC客户端将自动重新连接,但除非安全,否则通常不会重试RPC.对于服务器流式RPC,通常这样做是不安全的,因此您需要直接在客户端上重试RPC.

As for reconnects: gRPC clients will automatically reconnect if the connection is broken, but usually will not retry the RPC unless it is safe to do so. In the case of server streaming RPCs, it is usually not safe to do, so you'll need to retry the RPC on your client directly.

这篇关于如何在grpc中正确设计发布-订阅模式?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆