如何对futures :: Stream :: concat2读取的字节数施加限制? [英] How do I apply a limit to the number of bytes read by futures::Stream::concat2?
问题描述
您可能希望对[使用
futures::Stream::concat2
]时读取的字节数设置某种上限
我该如何实际实现呢?例如,下面的代码模仿了向我的服务发送无限数量数据的恶意用户:
extern crate futures; // 0.1.25
use futures::{prelude::*, stream};
fn some_bytes() -> impl Stream<Item = Vec<u8>, Error = ()> {
stream::repeat(b"0123456789ABCDEF".to_vec())
}
fn limited() -> impl Future<Item = Vec<u8>, Error = ()> {
some_bytes().concat2()
}
fn main() {
let v = limited().wait().unwrap();
println!("{}", v.len());
}
一种解决方案是创建一个流组合器,一旦超过某个字节的阈值,该组合器就会终止该流.这是一种可能的实现方式:
struct TakeBytes<S> {
inner: S,
seen: usize,
limit: usize,
}
impl<S> Stream for TakeBytes<S>
where
S: Stream<Item = Vec<u8>>,
{
type Item = Vec<u8>;
type Error = S::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
if self.seen >= self.limit {
return Ok(Async::Ready(None)); // Stream is over
}
let inner = self.inner.poll();
if let Ok(Async::Ready(Some(ref v))) = inner {
self.seen += v.len();
}
inner
}
}
trait TakeBytesExt: Sized {
fn take_bytes(self, limit: usize) -> TakeBytes<Self>;
}
impl<S> TakeBytesExt for S
where
S: Stream<Item = Vec<u8>>,
{
fn take_bytes(self, limit: usize) -> TakeBytes<Self> {
TakeBytes {
inner: self,
limit,
seen: 0,
}
}
}
然后可以将其链接到concat2
之前的流:
fn limited() -> impl Future<Item = Vec<u8>, Error = ()> {
some_bytes().take_bytes(999).concat2()
}
此实现有一些警告:
- 它仅适用于
Vec<u8>
.当然,您可以引入泛型以使其更广泛地适用. - 它允许输入的字节数超过了限制,它只是在该点之后停止流.这些类型的决定取决于应用程序.
要记住的另一件事是,您希望尝试解决尽可能低的问题—如果数据源已经分配了1 GB的内存,则设置限制将无济于事. /p>
An answer to How do I read the entire body of a Tokio-based Hyper request? suggests:
you may wish to establish some kind of cap on the number of bytes read [when using
futures::Stream::concat2
]
How can I actually achieve this? For example, here's some code that mimics a malicious user who is sending my service an infinite amount of data:
extern crate futures; // 0.1.25
use futures::{prelude::*, stream};
fn some_bytes() -> impl Stream<Item = Vec<u8>, Error = ()> {
stream::repeat(b"0123456789ABCDEF".to_vec())
}
fn limited() -> impl Future<Item = Vec<u8>, Error = ()> {
some_bytes().concat2()
}
fn main() {
let v = limited().wait().unwrap();
println!("{}", v.len());
}
One solution is to create a stream combinator that ends the stream once some threshold of bytes has passed. Here's one possible implementation:
struct TakeBytes<S> {
inner: S,
seen: usize,
limit: usize,
}
impl<S> Stream for TakeBytes<S>
where
S: Stream<Item = Vec<u8>>,
{
type Item = Vec<u8>;
type Error = S::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
if self.seen >= self.limit {
return Ok(Async::Ready(None)); // Stream is over
}
let inner = self.inner.poll();
if let Ok(Async::Ready(Some(ref v))) = inner {
self.seen += v.len();
}
inner
}
}
trait TakeBytesExt: Sized {
fn take_bytes(self, limit: usize) -> TakeBytes<Self>;
}
impl<S> TakeBytesExt for S
where
S: Stream<Item = Vec<u8>>,
{
fn take_bytes(self, limit: usize) -> TakeBytes<Self> {
TakeBytes {
inner: self,
limit,
seen: 0,
}
}
}
This can then be chained onto the stream before concat2
:
fn limited() -> impl Future<Item = Vec<u8>, Error = ()> {
some_bytes().take_bytes(999).concat2()
}
This implementation has caveats:
- it only works for
Vec<u8>
. You can introduce generics to make it more broadly applicable, of course. - it allows for more bytes than the limit to come in, it just stops the stream after that point. Those types of decisions are application-dependent.
Another thing to keep in mind is that you want to attempt to tackle this problem as low as you can — if the source of the data has already allocated a gigabyte of memory, placing a limit won't help as much.
这篇关于如何对futures :: Stream :: concat2读取的字节数施加限制?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!