当块到达时,如何使用 java.net.http 读取和打印分块的 HTTP 响应? [英] How do you read and print a chunked HTTP response using java.net.http as chunks arrive?
问题描述
Java 11 引入了一个新包 java.net.http
,用于发出 HTTP 请求.对于一般用途,它非常简单.
Java 11 introduces a new package, java.net.http
, for making HTTP requests. For general usage, it's pretty straight forward.
我的问题是:我如何使用 java.net.http
来处理分块响应,因为客户端收到每个块?
My question is: how do I use java.net.http
to handle chunked responses as each chunk is received by the client?
java.http.net
包含一个响应式 BodySubscriber
,这似乎是我想要的,但我找不到它的使用示例.
java.http.net
contains a reactive BodySubscriber
which appears to be what I want, but I can't find an example of how it's used.
下面是一个 python 实现,它在块到达时打印块,我想用 java.net.http 做同样的事情:
Below is a python implementation that prints chunks as they arrive, I'd like to the same thing with java.net.http:
import argparse
import requests
def main(url: str):
with requests.get(url, stream=True) as r:
for c in r.iter_content(chunk_size=1):
print(c.decode("UTF-8"), end="")
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Read from a URL and print as text as chunks arrive")
parser.add_argument('url', type=str, help="A URL to read from")
args = parser.parse_args()
main(args.url)
HttpGetDemo.java
为了完整起见,这里有一个使用 java.net.http 发出阻塞请求的简单示例:
HttpGetDemo.java
Just for completeness, here's a simple example of making a blocking request using java.net.http:
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpResponse;
import java.net.http.HttpRequest;
public class HttpGetDemo {
public static void main(String[] args) throws Exception {
var request = HttpRequest.newBuilder()
.uri(URI.create(args[0]))
.build();
var bodyHandler = HttpResponse.BodyHandlers
.ofString();
var client = HttpClient.newHttpClient();
var response = client.send(request, bodyHandler);
System.out.println(response.body());
}
}
HttpAsyncGetDemo.java
这是一个非阻塞/异步请求的例子:
HttpAsyncGetDemo.java
And here's the example making an non-blocking/async request:
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpResponse;
import java.net.http.HttpRequest;
/**
* ReadChunked
*/
public class HttpAsyncGetDemo {
public static void main(String[] args) throws Exception {
var request = HttpRequest.newBuilder()
.uri(URI.create(args[0]))
.build();
var bodyHandler = HttpResponse.BodyHandlers
.ofString();
var client = HttpClient.newHttpClient();
client.sendAsync(request, bodyHandler)
.thenApply(HttpResponse::body)
.thenAccept(System.out::println)
.join();
}
}
推荐答案
感谢 @pavel 和 @chegar999 的部分答案.他们引导我找到了我的解决方案.
Thanks to @pavel and @chegar999 for their partial answers. They led me to my solution.
我想出的解决方案如下.基本上,解决方案是使用自定义 java.net.http.HttpResponse.BodySubscriber
.BodySubscriber 包含响应式方法(onSubscribe、onNext、onError 和 onComplete)和一个 getBody 方法,该方法基本上返回一个 java CompletableFuture 最终生成 HTTP 请求的主体.拥有 BodySubscriber 后,您可以像这样使用它:
The solution I came up with is below. Basically, the solution is to use a custom java.net.http.HttpResponse.BodySubscriber
. A BodySubscriber contains reactive methods (onSubscribe, onNext, onError, and onComplete) and a getBody method that basically returns a java CompletableFuture that will eventually produce the body of the HTTP request. Once you have your BodySubscriber in hand you can use it like:
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(uri))
.build();
return client.sendAsync(request, responseInfo -> new StringSubscriber())
.whenComplete((r, t) -> System.out.println("--- Status code " + r.statusCode()))
.thenApply(HttpResponse::body);
注意这一行:
client.sendAsync(request, responseInfo -> new StringSubscriber())
那是我们注册自定义 BodySubscriber 的地方;在本例中,我的自定义类名为 StringSubscriber
.
That's where we register our custom BodySubscriber; in this case, my custom class is named StringSubscriber
.
这是一个完整的工作示例.使用 Java 11,您无需编译即可运行它.只需将其粘贴到名为 CustomSubscriber.java
的文件中,然后运行命令 java CustomSubscriber
.它在每个块到达时打印它的内容.它还收集它们并在响应完成时将它们作为正文返回.
This is a complete working example. Using Java 11, you can run it without compiling it. Just past it into a file named CustomSubscriber.java
, then run the command java CustomSubscriber <some url>
. It prints the contents of each chunk as it arrives. It also collects them and returns them as the body when the response has completed.
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandlers;
import java.net.http.HttpResponse.BodySubscriber;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Flow;
import java.util.stream.Collectors;
import java.util.List;
public class CustomSubscriber {
public static void main(String[] args) {
CustomSubscriber cs = new CustomSubscriber();
String body = cs.get(args[0]).join();
System.out.println("--- Response body:\n: ..." + body + "...");
}
public CompletableFuture<String> get(String uri) {
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(uri))
.build();
return client.sendAsync(request, responseInfo -> new StringSubscriber())
.whenComplete((r, t) -> System.out.println("--- Status code " + r.statusCode()))
.thenApply(HttpResponse::body);
}
static class StringSubscriber implements BodySubscriber<String> {
final CompletableFuture<String> bodyCF = new CompletableFuture<>();
Flow.Subscription subscription;
List<ByteBuffer> responseData = new CopyOnWriteArrayList<>();
@Override
public CompletionStage<String> getBody() {
return bodyCF;
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1); // Request first item
}
@Override
public void onNext(List<ByteBuffer> buffers) {
System.out.println("-- onNext " + buffers);
try {
System.out.println("\tBuffer Content:\n" + asString(buffers));
}
catch (Exception e) {
System.out.println("\tUnable to print buffer content");
}
buffers.forEach(ByteBuffer::rewind); // Rewind after reading
responseData.addAll(buffers);
subscription.request(1); // Request next item
}
@Override
public void onError(Throwable throwable) {
bodyCF.completeExceptionally(throwable);
}
@Override
public void onComplete() {
bodyCF.complete(asString(responseData));
}
private String asString(List<ByteBuffer> buffers) {
return new String(toBytes(buffers), StandardCharsets.UTF_8);
}
private byte[] toBytes(List<ByteBuffer> buffers) {
int size = buffers.stream()
.mapToInt(ByteBuffer::remaining)
.sum();
byte[] bs = new byte[size];
int offset = 0;
for (ByteBuffer buffer : buffers) {
int remaining = buffer.remaining();
buffer.get(bs, offset, remaining);
offset += remaining;
}
return bs;
}
}
}
尝试一下
要测试此解决方案,您需要一个服务器来发送使用 Transfer-encoding: chunked
的响应,并以足够慢的速度发送响应以观察块的到达.我在 https://github.com/hohonuuli/demo-chunk-server<创建了一个/a> 但你可以像这样使用 Docker 启动它:
Trying it out
To test this solution, you'll need a server that sends a response that uses Transfer-encoding: chunked
and sends it slow enough to watch the chunks arrive. I've created one at https://github.com/hohonuuli/demo-chunk-server but you can spin it up using Docker like so:
docker run -p 8080:8080 hohonuuli/demo-chunk-server
然后使用 java CustomSubscriber.java http://localhost:8080/chunk/10
这篇关于当块到达时,如何使用 java.net.http 读取和打印分块的 HTTP 响应?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!