如何使用 reqwest 执行并行异步 HTTP GET 请求? [英] How can I perform parallel asynchronous HTTP GET requests with reqwest?

查看:68
本文介绍了如何使用 reqwest 执行并行异步 HTTP GET 请求?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

异步示例很有用,但还是新手Rust 和 Tokio,我正在努力研究如何一次处理 N 个请求,使用来自向量的 URL,并为每个 URL 作为字符串创建响应 HTML 的迭代器.

这怎么可能?

解决方案

并发请求

从 reqwest 0.10 开始:

use futures::{stream, StreamExt};//0.3.5使用 reqwest::Client;//0.10.6使用 tokio;//0.2.21, features = [宏"]const CONCURRENT_REQUESTS: usize = 2;#[东京::主要]异步 fn main() {让客户端 = Client::new();让 urls = vec![https://api.ipify.org";2];让body = stream::iter(urls).map(|网址| {让客户端 = &client;异步移动{让 resp = client.get(url).send().await?;resp.bytes().await}}).buffer_unordered(CONCURRENT_REQUESTS);身体.for_each(|b| 异步{匹配 b {好的(b) =>println!("Got {} bytes", b.len()),错误(e) =>eprintln!("出现错误:{}", e),}}).等待;}


<块引用>

stream::iter(urls)

stream::iter

取一组字符串并将其转换为.

<块引用>

.map(|url| {

StreamExt::map

对流中的每个元素运行异步函数并将元素转换为新类型.

<块引用>

let client = &client;异步移动{

Client 进行显式引用并将引用(而不是原始 Client)移动到匿名异步块中.

<块引用>

let resp = client.get(url).send().await?;

使用 Client 的连接池启动异步 GET 请求并等待请求.

<块引用>

resp.bytes().await

请求并等待响应的字节.

<块引用>

.buffer_unordered(N);

StreamExt::buffer_unordered

将期货流转换为这些期货值的流,同时执行期货.

<块引用>

实体.for_each(|b| {异步{匹配 b {好的(b) =>println!("Got {} bytes", b.len()),错误(e) =>eprintln!("出现错误:{}", e),}}}).等待;

StreamExt::for_each

将流转换回单个 future,打印出沿途接收的数据量,然后等待 future 完成.

另见:

无界执行

如果您愿意,也可以将迭代器转换为期货迭代器并使用 future::join_all:

使用期货::future;//0.3.4使用 reqwest::Client;//0.10.1使用 tokio;//0.2.11#[东京::主要]异步 fn main() {让客户端 = Client::new();让 urls = vec![https://api.ipify.org";2];让身体 = future::join_all(urls.into_iter().map(|url| {让客户端 = &client;异步移动{让 resp = client.get(url).send().await?;resp.bytes().await}})).等待;对于身体中的 b {匹配 b {好的(b) =>println!("Got {} bytes", b.len()),错误(e) =>eprintln!("出现错误:{}", e),}}}

我鼓励使用第一个示例,因为您通常希望限制并发性,bufferbuffer_unordered 对此有帮助.

并行请求

并发请求通常足够好,但有时您需要并行请求.在这种情况下,您需要生成一个任务.

使用期货::{stream, StreamExt};//0.3.8使用 reqwest::Client;//0.10.9使用 tokio;//0.2.24, features = [宏"]const PARALLEL_REQUESTS: usize = 2;#[东京::主要]异步 fn main() {让 urls = vec![https://api.ipify.org";2];让客户端 = Client::new();让body = stream::iter(urls).map(|网址| {让客户端 = client.clone();tokio::spawn(异步移动{让 resp = client.get(url).send().await?;resp.bytes().await})}).buffer_unordered(PARALLEL_REQUESTS);身体.for_each(|b| 异步{匹配 b {Ok(Ok(b)) =>println!("Got {} bytes", b.len()),好的(Err(e)) =>eprintln!("Got a reqwest::Error: {}", e),错误(e) =>eprintln!("Got a tokio::JoinError: {}", e),}}).等待;}

主要区别是:

  • 我们使用 tokio::spawn 在单独的任务中执行工作.
  • 我们必须为每个任务分配自己的reqwest::Client.正如推荐,我们克隆了一个共享客户端以利用连接池.
  • 无法加入任务时还有一个错误情况.

另见:

The async example is useful, but being new to Rust and Tokio, I am struggling to work out how to do N requests at once, using URLs from a vector, and creating an iterator of the response HTML for each URL as a string.

How could this be done?

解决方案

Concurrent requests

As of reqwest 0.10:

use futures::{stream, StreamExt}; // 0.3.5
use reqwest::Client; // 0.10.6
use tokio; // 0.2.21, features = ["macros"]

const CONCURRENT_REQUESTS: usize = 2;

#[tokio::main]
async fn main() {
    let client = Client::new();

    let urls = vec!["https://api.ipify.org"; 2];

    let bodies = stream::iter(urls)
        .map(|url| {
            let client = &client;
            async move {
                let resp = client.get(url).send().await?;
                resp.bytes().await
            }
        })
        .buffer_unordered(CONCURRENT_REQUESTS);

    bodies
        .for_each(|b| async {
            match b {
                Ok(b) => println!("Got {} bytes", b.len()),
                Err(e) => eprintln!("Got an error: {}", e),
            }
        })
        .await;
}


stream::iter(urls)

stream::iter

Take a collection of strings and convert it into a Stream.

.map(|url| {

StreamExt::map

Run an asynchronous function on every element in the stream and transform the element to a new type.

let client = &client;
async move {

Take an explicit reference to the Client and move the reference (not the original Client) into an anonymous asynchronous block.

let resp = client.get(url).send().await?;

Start an asynchronous GET request using the Client's connection pool and wait for the request.

resp.bytes().await

Request and wait for the bytes of the response.

.buffer_unordered(N);

StreamExt::buffer_unordered

Convert a stream of futures into a stream of those future's values, executing the futures concurrently.

bodies
    .for_each(|b| {
        async {
            match b {
                Ok(b) => println!("Got {} bytes", b.len()),
                Err(e) => eprintln!("Got an error: {}", e),
            }
        }
    })
    .await;

StreamExt::for_each

Convert the stream back into a single future, printing out the amount of data received along the way, then wait for the future to complete.

See also:

Without bounded execution

If you wanted to, you could also convert an iterator into an iterator of futures and use future::join_all:

use futures::future; // 0.3.4
use reqwest::Client; // 0.10.1
use tokio; // 0.2.11

#[tokio::main]
async fn main() {
    let client = Client::new();

    let urls = vec!["https://api.ipify.org"; 2];

    let bodies = future::join_all(urls.into_iter().map(|url| {
        let client = &client;
        async move {
            let resp = client.get(url).send().await?;
            resp.bytes().await
        }
    }))
    .await;

    for b in bodies {
        match b {
            Ok(b) => println!("Got {} bytes", b.len()),
            Err(e) => eprintln!("Got an error: {}", e),
        }
    }
}

I'd encourage using the first example as you usually want to limit the concurrency, which buffer and buffer_unordered help with.

Parallel requests

Concurrent requests are generally good enough, but there are times where you need parallel requests. In that case, you need to spawn a task.

use futures::{stream, StreamExt}; // 0.3.8
use reqwest::Client; // 0.10.9
use tokio; // 0.2.24, features = ["macros"]

const PARALLEL_REQUESTS: usize = 2;

#[tokio::main]
async fn main() {
    let urls = vec!["https://api.ipify.org"; 2];

    let client = Client::new();

    let bodies = stream::iter(urls)
        .map(|url| {
            let client = client.clone();
            tokio::spawn(async move {
                let resp = client.get(url).send().await?;
                resp.bytes().await
            })
        })
        .buffer_unordered(PARALLEL_REQUESTS);

    bodies
        .for_each(|b| async {
            match b {
                Ok(Ok(b)) => println!("Got {} bytes", b.len()),
                Ok(Err(e)) => eprintln!("Got a reqwest::Error: {}", e),
                Err(e) => eprintln!("Got a tokio::JoinError: {}", e),
            }
        })
        .await;
}

The primary differences are:

  • We use tokio::spawn to perform work in separate tasks.
  • We have to give each task its own reqwest::Client. As recommended, we clone a shared client to make use of the connection pool.
  • There's an additional error case when the task cannot be joined.

See also:

这篇关于如何使用 reqwest 执行并行异步 HTTP GET 请求?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆