如何读取基于 Tokio 的 Hyper 请求的整个正文? [英] How do I read the entire body of a Tokio-based Hyper request?

查看:35
本文介绍了如何读取基于 Tokio 的 Hyper 请求的整个正文?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想使用 Hyper 的当前主分支编写一个服务器,该服务器保存由 POST 请求传递的消息,并将此消息发送到每个传入的 GET 请求.

I want to write a server using the current master branch of Hyper that saves a message that is delivered by a POST request and sends this message to every incoming GET request.

我有这个,主要是从 Hyper examples 目录复制的:

I have this, mostly copied from the Hyper examples directory:

extern crate futures;
extern crate hyper;
extern crate pretty_env_logger;

use futures::future::FutureResult;

use hyper::{Get, Post, StatusCode};
use hyper::header::{ContentLength};
use hyper::server::{Http, Service, Request, Response};
use futures::Stream;

struct Echo {
    data: Vec<u8>,
}

impl Echo {
    fn new() -> Self {
        Echo {
            data: "text".into(),
        }
    }
}

impl Service for Echo {
    type Request = Request;
    type Response = Response;
    type Error = hyper::Error;
    type Future = FutureResult<Response, hyper::Error>;

    fn call(&self, req: Self::Request) -> Self::Future {
        let resp = match (req.method(), req.path()) {
            (&Get, "/") | (&Get, "/echo") => {
                Response::new()
                    .with_header(ContentLength(self.data.len() as u64))
                    .with_body(self.data.clone())
            },
            (&Post, "/") => {
                //self.data.clear(); // argh. &self is not mutable :(
                // even if it was mutable... how to put the entire body into it?
                //req.body().fold(...) ?
                let mut res = Response::new();
                if let Some(len) = req.headers().get::<ContentLength>() {
                    res.headers_mut().set(ContentLength(0));
                }
                res.with_body(req.body())
            },
            _ => {
                Response::new()
                    .with_status(StatusCode::NotFound)
            }
        };
        futures::future::ok(resp)
    }

}


fn main() {
    pretty_env_logger::init().unwrap();
    let addr = "127.0.0.1:12346".parse().unwrap();

    let server = Http::new().bind(&addr, || Ok(Echo::new())).unwrap();
    println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap());
    server.run().unwrap();
}

如何将 req.body()(似乎是 ChunksStream)变成 Vec<u8>?我假设我必须以某种方式返回一个使用 StreamFuture 并将其转换为单个 Vec,也许带有 fold().但我不知道该怎么做.

How do I turn the req.body() (which seems to be a Stream of Chunks) into a Vec<u8>? I assume I must somehow return a Future that consumes the Stream and turns it into a single Vec<u8>, maybe with fold(). But I have no clue how to do that.

推荐答案

我将把问题简化为只返回总字节数,而不是回显整个流.

I'm going to simplify the problem to just return the total number of bytes, instead of echoing the entire stream.

请参阅euclio 的回答关于hyper::body::to_bytes 如果您只想将所有数据作为一个巨大的 blob.

See euclio's answer about hyper::body::to_bytes if you just want all the data as one giant blob.

访问流允许更细粒度的控制:

Accessing the stream allows for more fine-grained control:

use futures::TryStreamExt; // 0.3.7
use hyper::{server::Server, service, Body, Method, Request, Response}; // 0.13.9
use std::convert::Infallible;
use tokio; // 0.2.22

#[tokio::main]
async fn main() {
    let addr = "127.0.0.1:12346".parse().expect("Unable to parse address");

    let server = Server::bind(&addr).serve(service::make_service_fn(|_conn| async {
        Ok::<_, Infallible>(service::service_fn(echo))
    }));

    println!("Listening on http://{}.", server.local_addr());

    if let Err(e) = server.await {
        eprintln!("Error: {}", e);
    }
}

async fn echo(req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
    let (parts, body) = req.into_parts();
    match (parts.method, parts.uri.path()) {
        (Method::POST, "/") => {
            let entire_body = body
                .try_fold(Vec::new(), |mut data, chunk| async move {
                    data.extend_from_slice(&chunk);
                    Ok(data)
                })
                .await;

            entire_body.map(|body| {
                let body = Body::from(format!("Read {} bytes", body.len()));
                Response::new(body)
            })
        }
        _ => {
            let body = Body::from("Can only POST to /");
            Ok(Response::new(body))
        }
    }
}

遗憾的是,Bytes 的当前实现不再与 TryStreamExt::try_concat,所以我们必须切换回折叠.

Unfortunately, the current implementation of Bytes is no longer compatible with TryStreamExt::try_concat, so we have to switch back to a fold.

从期货 0.1.14 开始,您可以使用 Stream::concat2 将所有数据粘在一起:

Since futures 0.1.14, you can use Stream::concat2 to stick together all the data into one:

fn concat2(self) -> Concat2<Self>
where
    Self: Sized,
    Self::Item: Extend<<Self::Item as IntoIterator>::Item> + IntoIterator + Default, 

use futures::{
    future::{self, Either},
    Future, Stream,
}; // 0.1.25

use hyper::{server::Server, service, Body, Method, Request, Response}; // 0.12.20

use tokio; // 0.1.14

fn main() {
    let addr = "127.0.0.1:12346".parse().expect("Unable to parse address");

    let server = Server::bind(&addr).serve(|| service::service_fn(echo));

    println!("Listening on http://{}.", server.local_addr());

    let server = server.map_err(|e| eprintln!("Error: {}", e));
    tokio::run(server);
}

fn echo(req: Request<Body>) -> impl Future<Item = Response<Body>, Error = hyper::Error> {
    let (parts, body) = req.into_parts();

    match (parts.method, parts.uri.path()) {
        (Method::POST, "/") => {
            let entire_body = body.concat2();
            let resp = entire_body.map(|body| {
                let body = Body::from(format!("Read {} bytes", body.len()));
                Response::new(body)
            });
            Either::A(resp)
        }
        _ => {
            let body = Body::from("Can only POST to /");
            let resp = future::ok(Response::new(body));
            Either::B(resp)
        }
    }
}

您也可以通过 entire_body.to_vec()Bytes 转换为 Vec,然后将其转换为 字符串.

You could also convert the Bytes into a Vec<u8> via entire_body.to_vec() and then convert that to a String.

另见:

类似于Iterator::foldStream::fold 接受一个 accumulator(称为 init)和一个对累加器进行操作的函数和来自流的项目.函数的结果必须是另一个与原始错误类型相同的未来.总的结果本身就是一个未来.

Similar to Iterator::fold, Stream::fold takes an accumulator (called init) and a function that operates on the accumulator and an item from the stream. The result of the function must be another future with the same error type as the original. The total result is itself a future.

fn fold<F, T, Fut>(self, init: T, f: F) -> Fold<Self, F, Fut, T>
where
    F: FnMut(T, Self::Item) -> Fut,
    Fut: IntoFuture<Item = T>,
    Self::Error: From<Fut::Error>,
    Self: Sized,

我们可以使用 Vec 作为累加器.BodyStream 实现返回一个 Chunk.这实现了Deref<[u8]>,因此我们可以使用它来将每个块的数据附加到Vec.

We can use a Vec as the accumulator. Body's Stream implementation returns a Chunk. This implements Deref<[u8]>, so we can use that to append each chunk's data to the Vec.

extern crate futures; // 0.1.23
extern crate hyper;   // 0.11.27

use futures::{Future, Stream};
use hyper::{
    server::{Http, Request, Response, Service}, Post,
};

fn main() {
    let addr = "127.0.0.1:12346".parse().unwrap();

    let server = Http::new().bind(&addr, || Ok(Echo)).unwrap();
    println!(
        "Listening on http://{} with 1 thread.",
        server.local_addr().unwrap()
    );
    server.run().unwrap();
}

struct Echo;

impl Service for Echo {
    type Request = Request;
    type Response = Response;
    type Error = hyper::Error;
    type Future = Box<futures::Future<Item = Response, Error = Self::Error>>;

    fn call(&self, req: Self::Request) -> Self::Future {
        match (req.method(), req.path()) {
            (&Post, "/") => {
                let f = req.body()
                    .fold(Vec::new(), |mut acc, chunk| {
                        acc.extend_from_slice(&*chunk);
                        futures::future::ok::<_, Self::Error>(acc)
                    })
                    .map(|body| Response::new().with_body(format!("Read {} bytes", body.len())));

                Box::new(f)
            }
            _ => panic!("Nope"),
        }
    }
}

您也可以将 Vec body 转换为 String.

You could also convert the Vec<u8> body to a String.

另见:

从命令行调用时,我们可以看到结果:

When called from the command line, we can see the result:

$ curl -X POST --data hello http://127.0.0.1:12346/
Read 5 bytes

警告

所有这些解决方案都允许恶意最终用户发布无限大小的文件,这会导致机器内存不足.根据预期用途,您可能希望对读取的字节数设置某种上限,可能会在某个断点写入文件系统.

Warning

All of these solutions allow a malicious end user to POST an infinitely sized file, which would cause the machine to run out of memory. Depending on the intended use, you may wish to establish some kind of cap on the number of bytes read, potentially writing to the filesystem at some breakpoint.

另见:

这篇关于如何读取基于 Tokio 的 Hyper 请求的整个正文?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆