从阅读的NetworkStream破坏了缓冲区 [英] Reading from NetworkStream corrupts the buffer
问题描述
当从使用的NetworkStream读取ReadUntilClosedObservable1数据,返回的数据被损坏像读取数据重叠的部分区块。
然而,当我与ReadUntilClosedObservable2读取数据的数据到达没有问题。
我想使用的ReadUntilClosedObservable1因为流反复阅读ReadUntilClosedObservable2是燃烧的CPU。
我怎样才能获得的消息同步顺序?
更新:
返回Observable.Timer(TimeSpan.Zero,间隔,TaskPoolScheduler.Default)
.SelectMany(_ => ReadToEnd的)
。凡(dataChunk = GT; dataChunk.Length大于0);
我只注意到 ReadToEnd的
它完成了previous作业之前一次又一次被解雇。难道不是需要同步?如果 Observable.Timer
是我怎么能达到同样的效果没有它,阅读的时间间隔,但无需等待启动的问题?
公共静态的IObservable< INT> ReadObservable(此流流,字节[]缓冲区
,诠释抵消,诠释计数)
{
返回stream.ReadAsync(缓冲区,偏移,计数)
.ToObservable();
}公共静态的IObservable<字节[]> ReadObservable(此流流,
INT缓冲区大小)
{
VAR缓冲=新的字节[缓冲区大小] 返回stream.ReadObservable(缓冲液,0,buffer.Length)
。选择(cbRead =>
{
如果(cbRead == 0)
{
返回新的[0]字节;
} 如果(cbRead == buffer.Length)
{
返回缓冲区;
} VAR dataChunk =新的字节[cbRead] Buffer.BlockCopy(缓冲液,0,dataChunk,
0,cbRead); 返回dataChunk;
});
}公共静态的IObservable<字节[]> ReadUntilClosedObservable1(这的NetworkStream
流缓冲区大小INT,时间跨度间隔)
{
VAR为ReadToEnd = Observable.Defer(()=> stream.ReadObservable(缓冲区大小))
.DoWhile(()=> stream.DataAvailable)
.ToList()
。选择(dataChunks =>
{
VAR缓冲=新的List<位>(); 的foreach(在dataChunks VAR dataChunk)
{
buffer.AddRange(dataChunk);
} 返回buffer.ToArray();
}); 返回Observable.Timer(TimeSpan.Zero,间隔,TaskPoolScheduler.Default)
.SelectMany(_ => ReadToEnd的)
。凡(dataChunk = GT; dataChunk.Length大于0);
}公共静态的IObservable<字节[]> ReadUntilClosedObservable2(此流流
,诠释缓冲区大小)
{
返回Observable.Defer(()=> stream.ReadObservable(缓冲区大小))
。重复()
。凡(dataChunk = GT; dataChunk.Length大于0);
}
哦,不,不......不要做这样的...
异步和RX是更多...的非直观的设置,以获得工作之一,但它比你尝试什么简单一些。关键位是三个不同的运营商的Rx:
-
FromAsyncPattern
:从一个异步调用签名生成一个的IObservable
工厂 -
Observable.Defer
:允许您使用上述的IObservable
工厂产生的每用户观测 -
Observable.While
:让你重新调用,直到我说,当上的的IObservable
(编辑:改变使用的NetworkStream
为例)
(doubleEDIT:改变了基于评论)
试试这个 - 除非我想念我的猜测,它或多或少什么你想要的:
无效的主要()
{
//我们会养活这对听众
VAR消息=哟妈妈说你喜欢这样的消息; VAR listenerTask =任务
。厂
.StartNew(()=>
{
VAR缓冲区大小= 1024;
VAR本地主机=新ip地址(新的byte [] {127,0,0,1});
VAR监听器=新的TcpListener(本地主机,11201);
listener.Start();
变种incomingClient = listener.AcceptTcpClient();
变种clientStream = incomingClient.GetStream();
//我们的读者缓冲
VAR观察者= clientStream.ReadObservable(缓冲区大小);
VAR compareBuffer =观察者
//虽然我们获取数据和客户端以
//仍处于连接
.TakeWhile(returnBuffer = GT; returnBuffer.Length大于0&放大器;&放大器;
incomingClient.Connected)
//在读取块之间,响应返回给客户端
//无需花哨这里,只是正常的回写异步
。做(returnBuffer => clientStream.BeginWrite(
returnBuffer,
0,
returnBuffer.Length,
AR => clientStream.EndWrite(AR),
空值))
.ToEnumerable()
.SelectMany(returnBuffer => returnBuffer)
.ToArray();
listener.Stop();
Console.WriteLine(
监听器认为它被告知... {0},
Encoding.ASCII.GetString(compareBuffer));
}); VAR clientTask = Task.Factory.StartNew(
()=>
{
VAR的客户=新的TcpClient();
client.Connect(本地主机,11201);
VAR随机=新的随机();
变种outStream = client.GetStream();
VAR bytesToSend = Encoding.ASCII.GetBytes(消息);
的foreach(在bytesToSend字节toSend)
{
//在发送一个字符
outStream.WriteByte(toSend); //监听器应该人云亦云我们...
INT打手= outStream.ReadByte();
如果(呆子!= toSend)
{
Console.WriteLine(
咦监听呼应错了,我说:{0},他们说,{1},
发送,
继续);
打破;
}
Console.WriteLine(我说:{0},他们说,{1},toSend,呆子); //以一个小盹(模拟延迟等)
Thread.sleep代码(random.Next(200));
}
client.Close();
}); Task.WaitAll(listenerTask,clientTask);
}
公共静态类分机
{
公共静态的IObservable<字节[]> ReadObservable(此流流,INT缓冲区大小)
{
//保存读取数据
VAR缓冲=新的字节[缓冲区大小]
//第1步:异步签名=>观察到厂
VAR异步读取= Observable.FromAsyncPattern<字节[],INT,INT,INT>(
stream.BeginRead,
stream.EndRead);
返回Observable.While(
//而存在要读出的数据
()=> stream.CanRead,
//反复调用观察到的工厂,这将
//重建以使其将从目前开始
//流位置 - 因此0偏移
Observable.Defer(()=>异步读取(缓冲液,0,缓冲区大小))
。选择(的ReadBytes = GT; buffer.Take(的ReadBytes).ToArray()));
}
}
When reading data from NetworkStream with ReadUntilClosedObservable1, the returned data is corrupted like some blocks of read data overlap.
However, when I read the data with ReadUntilClosedObservable2 the data arrives without problems.
I want to use the ReadUntilClosedObservable1 because repeatedly reading from stream in ReadUntilClosedObservable2 is burning the CPU.
How can I get the messages in sync order?
UPDATE:
return Observable.Timer(TimeSpan.Zero, interval, TaskPoolScheduler.Default)
.SelectMany(_ => readToEnd)
.Where(dataChunk => dataChunk.Length > 0);
I just noticed that readToEnd
to is fired again and again before it finishes the previous job. Doesn't it need to be synchronized? If Observable.Timer
is the problem how can I achieve the same effect without it, reading in intervals but starting without waiting?
public static IObservable<int> ReadObservable(this Stream stream, byte[] buffer
,int offset, int count)
{
return stream.ReadAsync(buffer, offset, count)
.ToObservable();
}
public static IObservable<byte[]> ReadObservable(this Stream stream,
int bufferSize)
{
var buffer = new byte[bufferSize];
return stream.ReadObservable(buffer, 0, buffer.Length)
.Select(cbRead =>
{
if (cbRead == 0)
{
return new byte[0];
}
if (cbRead == buffer.Length)
{
return buffer;
}
var dataChunk = new byte[cbRead];
Buffer.BlockCopy(buffer, 0, dataChunk,
0, cbRead);
return dataChunk;
});
}
public static IObservable<byte[]> ReadUntilClosedObservable1(this NetworkStream
stream, int bufferSize, TimeSpan interval)
{
var readToEnd = Observable.Defer(() => stream.ReadObservable(bufferSize))
.DoWhile(() => stream.DataAvailable)
.ToList()
.Select(dataChunks =>
{
var buffer = new List<byte>();
foreach (var dataChunk in dataChunks)
{
buffer.AddRange(dataChunk);
}
return buffer.ToArray();
});
return Observable.Timer(TimeSpan.Zero, interval, TaskPoolScheduler.Default)
.SelectMany(_ => readToEnd)
.Where(dataChunk => dataChunk.Length > 0);
}
public static IObservable<byte[]> ReadUntilClosedObservable2(this Stream stream
,int bufferSize)
{
return Observable.Defer(() => stream.ReadObservable(bufferSize))
.Repeat()
.Where(dataChunk => dataChunk.Length > 0);
}
Oh, no, no...don't do it like that...
Async + Rx is one of the more...non-intuitive setups to get working, but it is quite a bit simpler than what you're attempting. The key bits are three different Rx operators:
FromAsyncPattern
: generates a "IObservable
Factory" from an async call signatureObservable.Defer
: allows you to use aboveIObservable
factory to generate observables per subscriberObservable.While
: allows you to "Reinvoke until I say when" on anIObservable
(EDIT: altered to use a NetworkStream
example)
(doubleEDIT: altered based on comments)
Try this - unless I miss my guess, it's more or less what you're trying for:
void Main()
{
// We'll feed this to listener
var message = "Yo mamma said you like messages like this";
var listenerTask = Task
.Factory
.StartNew(() =>
{
var bufferSize = 1024;
var localhost = new IPAddress(new byte[]{127,0,0,1});
var listener = new TcpListener(localhost, 11201);
listener.Start();
var incomingClient = listener.AcceptTcpClient();
var clientStream = incomingClient.GetStream();
// our buffered reader
var observer = clientStream.ReadObservable(bufferSize);
var compareBuffer = observer
// Take while we're getting data and the client
// is still connected
.TakeWhile(returnBuffer => returnBuffer.Length > 0 &&
incomingClient.Connected)
// In between read blocks, respond back to the client
// No need for fanciness here, just normal async writeback
.Do(returnBuffer => clientStream.BeginWrite(
returnBuffer,
0,
returnBuffer.Length,
ar => clientStream.EndWrite(ar),
null))
.ToEnumerable()
.SelectMany (returnBuffer => returnBuffer)
.ToArray();
listener.Stop();
Console.WriteLine(
"Listener thinks it was told... {0}",
Encoding.ASCII.GetString(compareBuffer));
});
var clientTask = Task.Factory.StartNew(
() =>
{
var client = new TcpClient();
client.Connect("localhost", 11201);
var random = new Random();
var outStream = client.GetStream();
var bytesToSend = Encoding.ASCII.GetBytes(message);
foreach(byte toSend in bytesToSend)
{
// send a character over
outStream.WriteByte(toSend);
// Listener should parrot us...
int goOn = outStream.ReadByte();
if(goOn != toSend)
{
Console.WriteLine(
"Huh. Listener echoed wrong. I said: {0}, they said {1}",
toSend,
goOn);
break;
}
Console.WriteLine("I said: {0}, they said {1}", toSend, goOn);
// Take a little nap (simulate latency, etc)
Thread.Sleep(random.Next(200));
}
client.Close();
});
Task.WaitAll(listenerTask, clientTask);
}
public static class Ext
{
public static IObservable<byte[]> ReadObservable(this Stream stream, int bufferSize)
{
// to hold read data
var buffer = new byte[bufferSize];
// Step 1: async signature => observable factory
var asyncRead = Observable.FromAsyncPattern<byte[], int, int, int>(
stream.BeginRead,
stream.EndRead);
return Observable.While(
// while there is data to be read
() => stream.CanRead,
// iteratively invoke the observable factory, which will
// "recreate" it such that it will start from the current
// stream position - hence "0" for offset
Observable.Defer(() => asyncRead(buffer, 0, bufferSize))
.Select(readBytes => buffer.Take(readBytes).ToArray()));
}
}
这篇关于从阅读的NetworkStream破坏了缓冲区的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!