如何制作 IObservable<string>从控制台输入 [英] How to make an IObservable&lt;string&gt; from console input

查看:48
本文介绍了如何制作 IObservable<string>从控制台输入的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我尝试编写控制台可观察对象,如下例所示,但它不起作用.订阅存在一些问题.如何解决这些问题?

静态类程序{静态异步任务 Main(string[] args){//var observable = Observable.Interval(TimeSpan.FromMilliseconds(1000)).Publish().RefCount();//有效//var observable = FromConsole().Publish().RefCount();//不起作用var observable = FromConsole();//不起作用observable.Subscribe(Console.WriteLine);等待 Task.Delay(1500);observable.Subscribe(Console.WriteLine);等待新的 TaskCompletionSource().Task;}静态 IObservable从控制台(){return Observable.Create(异步观察者=>{而(真){观察者.OnNext(Console.ReadLine());}});}}

如果我使用 Observable.Interval,它会订阅两次,并且我有一个输入有两个输出.如果我使用了任何版本的 FromConsole,我就有一个订阅和一个被阻止的线程.

解决方案

首先,通常最好避免使用 Observable.Create 来创建 observables - 它当然是为了这个目的,但是它可以创建 observables,由于它们的阻塞性质,它们的行为不像你认为的那样.正如你所发现的!

相反,在可能的情况下,使用内置运算符来创建可观察对象.在这种情况下可以做到这一点.

我的 FromConsole 版本是这样的:

static IObservableFromConsole() =>可观察的.Defer(() =>可观察的.Start(() => Console.ReadLine())).重复();

Observable.Start 实际上就像可观察对象的 Task.Run .它为我们调用 Console.ReadLine() 而不会阻塞.

Observable.Defer/Repeat 对重复调用 Observable.Start(() => Console.ReadLine()).如果没有 Defer,它只会调用 Observable.Start 并永远重复返回一个字符串.

这就解决了.

现在,第二个问题是您希望看到 Console.ReadLine() 两个订阅对 FromConsole() observable 输出的值.>

由于 Console.ReadLine 的工作方式,您从每个订阅中获取值,但一次只能获取一个.试试这个代码:

static async Task Main(string[] args){var observable = FromConsole();observable.Select(x => $"1:{x}").Subscribe(Console.WriteLine);observable.Select(x => $"2:{x}").Subscribe(Console.WriteLine);await new TaskCompletionSource().Task;}静态 IObservableFromConsole() =>可观察的.Defer(() =>可观察的.Start(() => Console.ReadLine())).重复();

当我运行时,我得到这样的输出:

1:ddfd2:dff1:dfsdfs2:sdffdfd1:sdfsdfsdf

这样做的原因是每个订阅都会启动对 FromConsole 的新订阅.因此,您有两次对 Console.ReadLine() 的调用,它们有效地排队,并且每个调用只获取每个备用输入.因此 1 & 之间的交替2.

所以,要解决这个问题,您只需要 .Publish().RefCount() 运算符对.

试试这个:

static async Task Main(string[] args){var observable = FromConsole().Publish().RefCount();observable.Select(x => $"1:{x}").Subscribe(Console.WriteLine);observable.Select(x => $"2:{x}").Subscribe(Console.WriteLine);await new TaskCompletionSource().Task;}静态 IObservableFromConsole() =>可观察的.Defer(() =>可观察的.Start(() => Console.ReadLine())).重复();

我现在得到:

1:你好2:你好1:世界2:世界

简而言之,它结合了非阻塞 FromConsole observable 和 .Publish().RefCount() 的使用,这使得它以您的方式工作期待.

I have tried to write console observable as in the example below, but it doesn't work. There are some issues with subscriptions. How to solve these issues?

static class Program
{
    static async Task Main(string[] args)
    {
        // var observable = Observable.Interval(TimeSpan.FromMilliseconds(1000)).Publish().RefCount(); // works
        // var observable = FromConsole().Publish().RefCount(); // doesn't work
        var observable = FromConsole(); // doesn't work
        observable.Subscribe(Console.WriteLine);
        await Task.Delay(1500);
        observable.Subscribe(Console.WriteLine);
        await new TaskCompletionSource().Task;
    }

    static IObservable<string> FromConsole()
    {
        return Observable.Create<string>(async observer =>
        {
            while (true)
            {
                observer.OnNext(Console.ReadLine());
            }
        });
    }
}

If I used Observable.Interval, it subscribes two times and I have two outputs for one input. If I used any version of FromConsole, I have one subscription and a blocked thread.

解决方案

To start with, it is usually best to avoid using Observable.Create to create observables - it's certainly there for that purpose, but it can create observables that don't behave like you think they should because of their blocking nature. As you've discovered!

Instead, when possible, use the built-in operators to create observables. And that can be done in this case.

My version of FromConsole is this:

static IObservable<string> FromConsole() =>
    Observable
        .Defer(() =>
            Observable
                .Start(() => Console.ReadLine()))
        .Repeat();

Observable.Start effectively is like Task.Run for observables. It calls Console.ReadLine() for us without blocking.

The Observable.Defer/Repeat pair repeatedly calls Observable.Start(() => Console.ReadLine()). Without the Defer it would just call Observable.Start and repeatedly return the one string forever.

That solves that.

Now, the second issue is that you want to see the value from the Console.ReadLine() output by both subscriptions to the FromConsole() observable.

Due to the way Console.ReadLine works, you are getting values from each subscription, but only one at a time. Try this code:

static async Task Main(string[] args)
{
    var observable = FromConsole();
    observable.Select(x => $"1:{x}").Subscribe(Console.WriteLine);
    observable.Select(x => $"2:{x}").Subscribe(Console.WriteLine);
    await new TaskCompletionSource<int>().Task;
}

static IObservable<string> FromConsole() =>
    Observable
        .Defer(() =>
            Observable
                .Start(() => Console.ReadLine()))
        .Repeat();
        

When I run that I get this kind of output:

1:ddfd
2:dfff
1:dfsdfs
2:sdffdfd
1:sdfsdfsdf

The reason for this is that each subscription starts up a fresh subscription to FromConsole. So you have two calls to Console.ReadLine() they effectively queue and each one only gets each alternate input. Hence the alternation between 1 & 2.

So, to solve this you simply need the .Publish().RefCount() operator pair.

Try this:

static async Task Main(string[] args)
{
    var observable = FromConsole().Publish().RefCount();
    observable.Select(x => $"1:{x}").Subscribe(Console.WriteLine);
    observable.Select(x => $"2:{x}").Subscribe(Console.WriteLine);
    await new TaskCompletionSource<int>().Task;
}

static IObservable<string> FromConsole() =>
    Observable
        .Defer(() =>
            Observable
                .Start(() => Console.ReadLine()))
        .Repeat();
        

I now get:

1:Hello
2:Hello
1:World
2:World

In a nutshell, it's the combination of the non-blocking FromConsole observable and the use of .Publish().RefCount() that makes this work the way you expect.

这篇关于如何制作 IObservable<string>从控制台输入的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆