如何对futures :: Stream :: concat2读取的字节数施加限制? [英] How do I apply a limit to the number of bytes read by futures::Stream::concat2?

查看:57
本文介绍了如何对futures :: Stream :: concat2读取的字节数施加限制?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我如何阅读基于Tokio的超级请求的整个内容?建议:

您可能希望对[使用futures::Stream::concat2]时读取的字节数设置某种上限

我该如何实际实现呢?例如,下面的代码模仿了向我的服务发送无限数量数据的恶意用户:

extern crate futures; // 0.1.25

use futures::{prelude::*, stream};

fn some_bytes() -> impl Stream<Item = Vec<u8>, Error = ()> {
    stream::repeat(b"0123456789ABCDEF".to_vec())
}

fn limited() -> impl Future<Item = Vec<u8>, Error = ()> {
    some_bytes().concat2()
}

fn main() {
    let v = limited().wait().unwrap();
    println!("{}", v.len());
}

解决方案

一种解决方案是创建一个流组合器,一旦超过某个字节的阈值,该组合器就会终止该流.这是一种可能的实现方式:

struct TakeBytes<S> {
    inner: S,
    seen: usize,
    limit: usize,
}

impl<S> Stream for TakeBytes<S>
where
    S: Stream<Item = Vec<u8>>,
{
    type Item = Vec<u8>;
    type Error = S::Error;

    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
        if self.seen >= self.limit {
            return Ok(Async::Ready(None)); // Stream is over
        }

        let inner = self.inner.poll();
        if let Ok(Async::Ready(Some(ref v))) = inner {
            self.seen += v.len();
        }
        inner
    }
}

trait TakeBytesExt: Sized {
    fn take_bytes(self, limit: usize) -> TakeBytes<Self>;
}

impl<S> TakeBytesExt for S
where
    S: Stream<Item = Vec<u8>>,
{
    fn take_bytes(self, limit: usize) -> TakeBytes<Self> {
        TakeBytes {
            inner: self,
            limit,
            seen: 0,
        }
    }
}

然后可以将其链接到concat2之前的流:

fn limited() -> impl Future<Item = Vec<u8>, Error = ()> {
    some_bytes().take_bytes(999).concat2()
}

此实现有一些警告:

  • 它仅适用于Vec<u8>.当然,您可以引入泛型以使其更广泛地适用.
  • 它允许输入的字节数超过了限制,它只是在该点之后停止流.这些类型的决定取决于应用程序.

要记住的另一件事是,您希望尝试解决尽可能低的问题—如果数据源已经分配了1 GB的内存,则设置限制将无济于事. /p>

An answer to How do I read the entire body of a Tokio-based Hyper request? suggests:

you may wish to establish some kind of cap on the number of bytes read [when using futures::Stream::concat2]

How can I actually achieve this? For example, here's some code that mimics a malicious user who is sending my service an infinite amount of data:

extern crate futures; // 0.1.25

use futures::{prelude::*, stream};

fn some_bytes() -> impl Stream<Item = Vec<u8>, Error = ()> {
    stream::repeat(b"0123456789ABCDEF".to_vec())
}

fn limited() -> impl Future<Item = Vec<u8>, Error = ()> {
    some_bytes().concat2()
}

fn main() {
    let v = limited().wait().unwrap();
    println!("{}", v.len());
}

解决方案

One solution is to create a stream combinator that ends the stream once some threshold of bytes has passed. Here's one possible implementation:

struct TakeBytes<S> {
    inner: S,
    seen: usize,
    limit: usize,
}

impl<S> Stream for TakeBytes<S>
where
    S: Stream<Item = Vec<u8>>,
{
    type Item = Vec<u8>;
    type Error = S::Error;

    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
        if self.seen >= self.limit {
            return Ok(Async::Ready(None)); // Stream is over
        }

        let inner = self.inner.poll();
        if let Ok(Async::Ready(Some(ref v))) = inner {
            self.seen += v.len();
        }
        inner
    }
}

trait TakeBytesExt: Sized {
    fn take_bytes(self, limit: usize) -> TakeBytes<Self>;
}

impl<S> TakeBytesExt for S
where
    S: Stream<Item = Vec<u8>>,
{
    fn take_bytes(self, limit: usize) -> TakeBytes<Self> {
        TakeBytes {
            inner: self,
            limit,
            seen: 0,
        }
    }
}

This can then be chained onto the stream before concat2:

fn limited() -> impl Future<Item = Vec<u8>, Error = ()> {
    some_bytes().take_bytes(999).concat2()
}

This implementation has caveats:

  • it only works for Vec<u8>. You can introduce generics to make it more broadly applicable, of course.
  • it allows for more bytes than the limit to come in, it just stops the stream after that point. Those types of decisions are application-dependent.

Another thing to keep in mind is that you want to attempt to tackle this problem as low as you can — if the source of the data has already allocated a gigabyte of memory, placing a limit won't help as much.

这篇关于如何对futures :: Stream :: concat2读取的字节数施加限制?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆