如何截取来自对一个服务的调用中的标头,然后将其插入到gRPC-java中的另一个请求中? [英] How to intercept the headers from in call to one service and insert it to another request in gRPC-java?

查看:110
本文介绍了如何截取来自对一个服务的调用中的标头,然后将其插入到gRPC-java中的另一个请求中?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有两个服务器-HelloServer和WorldServer.

I have two servers - HelloServer and WorldServer.

两者都实现了相同的原始文件:

Both implement the same proto file:

// The greeting service definition.
service GreeterService {
    // Sends a greeting
    rpc GreetWithHelloOrWorld (GreeterRequest) returns (GreeterReply) {}
    rpc GreetWithHelloWorld (GreeterRequest) returns (GreeterReply) {}
}

message GreeterRequest {
    string id = 1;
}

// The response message containing the greetings
message GreeterReply {
    string message = 1;
    string id = 2;
}

我想将traceIds添加到请求中.据我了解,这是通过在Metadata对象中添加traceId来实现的.

I want to add traceIds to the requests. As far as I understand, this is achieved through adding the traceId in the Metadata object.

这是我用来检查是否传递了traceId的测试.向HelloServer发出请求,后者依次调用WorldServer,然后最终返回响应.

Here is the test I am using to check that the traceIds are passed along. The request is made to the HelloServer which in turns calls the WorldServer and then finally returns the response.

@Test
public void greetHelloWorld() {
    String traceId = UUID.randomUUID().toString();
    Metadata metadata = new Metadata();
    metadata.put(MetadataKeys.TRACE_ID_METADATA_KEY, traceId);

    Greeter.GreeterRequest greeterRequest = Greeter.GreeterRequest.newBuilder().setId(traceId).build();

    ManagedChannel channel = ManagedChannelBuilder
        .forAddress("localhost", 8080)
        .usePlaintext(true)
        .build();

    AtomicReference<Metadata> trailersCapture = new AtomicReference<>();
    AtomicReference<Metadata> headersCapture = new AtomicReference<>();
    ClientInterceptor clientInterceptor = MetadataUtils.newAttachHeadersInterceptor(metadata);
    ClientInterceptor metadataCapturingClientInterceptor = MetadataUtils.newCaptureMetadataInterceptor(headersCapture, trailersCapture);

    GreeterServiceBlockingStub blockingStub = GreeterServiceGrpc.newBlockingStub(ClientInterceptors.intercept(channel, clientInterceptor, metadataCapturingClientInterceptor));
    GreeterServiceStub asyncStub = GreeterServiceGrpc.newStub(channel);

    try {
        Greeter.GreeterReply greeterReply = blockingStub.greetWithHelloWorld(greeterRequest);
        String idInResponse = greeterReply.getId();
        String idInHeaders = headersCapture.get().get(MetadataKeys.TRACE_ID_METADATA_KEY);
        logger.info("Response from HelloService and WorldService  -- , id = {}, headers = {}", greeterReply.getMessage(), idInResponse, idInHeaders);
        assertEquals("Ids in response and header did not match", idInResponse, idInHeaders);
    } catch (StatusRuntimeException e) {
        logger.warn("Exception when calling HelloService and WorldService\n" +  e);
        fail();
    } finally {
        channel.shutdown();
    }
}

ServerInterceptor的实现

Implementation of ServerInterceptor:

@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {

    String traceId = headers.get(MetadataKeys.TRACE_ID_METADATA_KEY);
    logger.info("objId=" + this.toString().substring(this.toString().lastIndexOf('@')) + " Trace id -- 1=" + headers.get(MetadataKeys.TRACE_ID_METADATA_KEY));
    return next.startCall(new ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(call) {
        @Override
        public void sendHeaders(Metadata headers) {
            headers.put(MetadataKeys.TRACE_ID_METADATA_KEY, traceId);
            logger.info("objId=" + this.toString().substring(this.toString().lastIndexOf('@')) + " Trace id -- 2  " + headers.get(MetadataKeys.TRACE_ID_METADATA_KEY));
            super.sendHeaders(headers);
        }

        @Override
        public void sendMessage(RespT message) {
           logger.info("objId=" + this.toString().substring(this.toString().lastIndexOf('@')) + " message=" + message.toString());
            super.sendMessage(message);
        }
    }, headers);

这是greetWithHelloWorld()方法的实现:

Here is the implementation of the greetWithHelloWorld() method:

public void greetWithHelloWorld(com.comcast.manitoba.world.hello.Greeter.GreeterRequest request,
                                        io.grpc.stub.StreamObserver<com.comcast.manitoba.world.hello.Greeter.GreeterReply> responseObserver) {
        Greeter.GreeterRequest greeterRequest = Greeter.GreeterRequest.newBuilder().setId(request.getId()).build();
    
        Metadata metadata = new Metadata();
        metadata.put(MetadataKeys.TRACE_ID_METADATA_KEY, "");
    
        ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 8081)
                .usePlaintext(true).build();
    
        AtomicReference<Metadata> trailersCapture = new AtomicReference<>();
        AtomicReference<Metadata> headersCapture = new AtomicReference<>();
        ClientInterceptor clientInterceptor = MetadataUtils.newAttachHeadersInterceptor(metadata);
        ClientInterceptor metadataCapturingClientInterceptor = MetadataUtils.newCaptureMetadataInterceptor(headersCapture, trailersCapture);
    
        GreeterServiceGrpc.GreeterServiceBlockingStub blockingStub = GreeterServiceGrpc.newBlockingStub(ClientInterceptors.intercept(channel, clientInterceptor, metadataCapturingClientInterceptor));
    
        String messageFromWorldService = "";
        String replyIdFromWorldService = "";
        try {
            Greeter.GreeterReply greeterReply = blockingStub.greetWithHelloOrWorld(greeterRequest);
            messageFromWorldService = greeterReply.getMessage();
            replyIdFromWorldService = greeterReply.getId();
            logger.info("Response from WorldService  -- {}, id = {}", messageFromWorldService, replyIdFromWorldService);
        } catch (StatusRuntimeException e) {
            logger.warn("Exception when calling HelloService\n" +  e);
        }
    
        Greeter.GreeterReply greeterReply = Greeter.GreeterReply.newBuilder().setMessage("Hello" + messageFromWorldService).setId(replyIdFromWorldService).build();
        responseObserver.onNext(greeterReply);
        responseObserver.onCompleted();
    }

问题出在greetWithHelloWorld()方法中,我无权访问元数据,因此无法从标头中提取traceId并将其附加到对World Server的请求中.但是,如果我在该方法中设置了一个断点,则可以看到该请求对象中确实包含了traceId,它是私有的并且不可访问.

The problem is in the greetWithHelloWorld() method, I don't have access to Metadata, so I cannot extract the traceId from the header and attach it to the request to the World server. However, if I put a breakpoint in that method, I can see that request object does have the traceId in it which is private to it and unaccessible.

任何想法我该如何实现?另外,这是传递traceId的最佳方法吗?我发现了一些有关使用上下文的参考.上下文和元数据有什么区别?

Any ideas how can I achieve this? Also, is this the best way to do pass traceIds around? I found some references to using Context. What is the difference between Context and Metadata?

推荐答案

预期的方法是使用ClientInterceptor和ServerInterceptor.客户端拦截器将从Context复制到元数据.服务器拦截器将从元数据复制到上下文.在服务器拦截器中使用Contexts.interceptCall将Context应用于所有回调.

The expected approach is to use a ClientInterceptor and ServerInterceptor. The client interceptor would copy from Context into Metadata. The server interceptor would copy from Metadata to Context. Use Contexts.interceptCall in the server interceptor to apply the Context all callbacks.

元数据用于线级传播.上下文用于进程内传播.通常,应用程序不需要直接与元数据进行交互(在Java中).

Metadata is for wire-level propagation. Context is for in-process propagation. Generally the application should not need to interact directly with Metadata (in Java).

这篇关于如何截取来自对一个服务的调用中的标头,然后将其插入到gRPC-java中的另一个请求中?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆