使 IObservable 订阅并发 [英] Make an IObservable subscription concurrent

查看:66
本文介绍了使 IObservable 订阅并发的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有以下代码

string dataDirectory = _settingsProvider.DataSettings.BaseDirectory;
_solverManagementService.MergedPointCloudProducer(dataDirectory, cancellationToken)
    .Subscribe(PointCloudMergerCompleted);

其中 SolverManagementService _solverManagementService

Public class SolverManagementService : ISolverManagementService
{
    public IObservable<IPointCloud> MergedPointCloudProducer(string dataDirectory,
        CancellationToken token)
    {
        return Observable.Create<IPointCloud>(
            observer =>
            {
                PairCollectionProducer(dataDirectory, token)
                    .Subscribe(pairCollection =>
                    {
                        observer.OnNext(_icpBatchSolverService.RecursivelyMergeAsync(
                            pairCollection, token));
                    },
                    onCompleted: () =>
                    {
                        observer.OnCompleted();
                    });
                return () => { };
            });
    }
    ... // Other methods. 
}

但是这里 _icpBatchSolverService.RecursivelyMergeAsync(pairCollection, token) 很昂贵,虽然这会返回一个 Task 我不线程化这个和这个调用块.由于 RecursivelyMergeAsync 返回一个 Task 它可以被等待,所以我修改了代码以使用 async/await

But here _icpBatchSolverService.RecursivelyMergeAsync(pairCollection, token) is expensive and although this returns a Task<IPointCloud> I do not threadify this and this call blocks. As RecursivelyMergeAsync returns a Task<IPointCloud> it can be awaited, so I have amended the code to use async/await

public IObservable<IPointCloud> MergedPointCloudProducer(string dataDirectory,
    CancellationToken token)
{
    return Observable.Create<IPointCloud>(
        observer =>
        {
            PairCollectionProducer(dataDirectory, token)
                .Subscribe(async (pairCollection) =>
                {
                    observer.OnNext(await _icpBatchSolverService.RecursivelyMergeAsync(
                        pairCollection, token));
                },
                onCompleted: () =>
                {
                    observer.OnCompleted();
                });
            return () => { };
        });
}

但现在它立即返回并且控制台应用程序关闭.我确信这不需要 Semephores 就可以完成,但我是 RX 的新手.如何将 RecursivelyMergeAsync 配置为为每个返回的 pairCollection 同时运行而不会阻塞并在所有递归合并完成时收到通知?

but now it returns immediately and the console app shuts down. I am sure this can be done without the need for Semephores, but I am new to RX. How can I configure the RecursivelyMergeAsync to be run concurrently for each returned pairCollection without blocking and getting a notification when all recursive merges complete?

注意.在单元测试中,我执行以下操作

public class IcpBatchSolverServiceTests
{
    private Mock<ISettingsProvider> _mockSettingsProvider; 
    private IIcpBatchSolverService _icpBatchSolverService;

    [OneTimeSetUp]
    public void Setup()
    {
        _mockSettingsProvider = new Mock<ISettingsProvider>();

        _mockSettingsProvider.Setup(m => m.IcpSolverSettings).Returns(new IcpSolverSettings());
        _mockSettingsProvider.Object.IcpSolverSettings.MaximumDegreeOfParallelism = 6;

        Log.Logger = new LoggerConfiguration()
            .WriteTo.Console()
            .CreateLogger();

        var serviceProvider = new ServiceCollection()
            .AddLogging(builder =>
            {
                builder.SetMinimumLevel(LogLevel.Trace);
                builder.AddSerilog(Log.Logger);
            })
            .BuildServiceProvider();

        ILogger<IcpBatchSolverServiceTests> logger = serviceProvider
            .GetService<ILoggerFactory>()
            .CreateLogger<IcpBatchSolverServiceTests>();

        _icpBatchSolverService = new IcpBatchSolverService(_mockSettingsProvider.Object, logger);
    }

    [Test]
    public async Task CanSolveBatchAsync()
    {
        IPointCloud @static = PointCloudFactory.GetRandomPointCloud(1000);
        List<IPointCloud> pointCloudList = PointCloudFactory.GenerateRandomlyRotatedBatch(@static, 12);

        IPartitioningService<IPointCloud> ps = new PointCloudPartitioningService();
        IPointCloud result = await _icpBatchSolverService.RecursivelyMergeAsync(ps.Partition(pointCloudList), CancellationToken.None);

        Assert.AreEqual(@static.Vertices.Length, result.Vertices.Length);
    }
}

这个过程完美地同时进行.

编辑.当为不同几何图形(不同角度不同几何图形的深度图)提供具有命名约定的文件夹时,我需要做的处理的概述 .NNNN.exr 其中 NNNN 是一些数值.对于一批文件.

  1. 使用不同几何图形的文件名将这些文件批处理成集合.

foreach 文件批处理

  1. [*Serial*] 调用 C++ API 从图像文件中提取深度图.
  2. [*Parallel*] 将 DepthMaps 转换为 PointClouds.这可以一次性完成.
  3. [*Parallel*] 使用 ICP 算法合并 PointClouds(昂贵)但将 TaskScheduler 的并发限制为两个线程(根据机器架构/内存等选择)
  1. [*Serial*] Call C++ API to extract DepthMaps from image files.
  2. [*Parallel*] Convert DepthMaps to PointClouds. this can be done all at once.
  3. [*Parallel*] Merge PointClouds using ICP algorithm (expensive) but limit concurrency with TaskScheduler to two threads (chosen depending on machine architecture/memory etc.)

最后,我使用步骤 3 中合并的点云再次调用 C++ API.所以在 RX 中,我当前的完整管道看起来像

public class SolverManagementService : ISolverManagementService
{
    private readonly IIcpBatchSolverService _icpBatchSolverService;
    private readonly IDepthMapToPointCloudAdapter _pointCloudAdapter;
    private readonly ILogger<SolverManagementService> _logger;

    public SolverManagementService(
        IIcpBatchSolverService icpBatchSolverService,
        IDepthMapToPointCloudAdapter pointCloudAdapter,
        ILogger<SolverManagementService> logger)
    {
        _icpBatchSolverService = icpBatchSolverService ?? throw new ArgumentNullException("icpBatchSolverService cannot be null");
        _pointCloudAdapter = pointCloudAdapter ?? throw new ArgumentNullException("pointCloudAdapter cannot be null");
        _logger = logger; 
    }

    public IObservable<IPointCloud> MergedPointCloudProducer(string dataDirectory, CancellationToken token)
    {
        return Observable.Create<IPointCloud>(
            observer =>
            {
                PairCollectionProducer(dataDirectory, token)
                    .Subscribe(pairCollection =>
                    {
                        observer.OnNext(_icpBatchSolverService.RecursivelyMergeAsync(pairCollection, token).Result);
                    },
                    onCompleted: () =>
                    {
                        observer.OnCompleted();
                    });
                return () => { };
            });
    }

    public IObservable<PairCollection<IPointCloud>> PairCollectionProducer(string dataDirectory, CancellationToken token)
    {
        return Observable.Create<PairCollection<IPointCloud>>(
            observer =>
            {
                Parallel.ForEach(
                    Utils.GetFileBatches(dataDirectory), 
                    (fileBatch) =>
                {
                    var producer = RawDepthMapProducer(fileBatch, token);
                    ConcurrentBag<IPointCloud> bag = new ConcurrentBag<IPointCloud>();

                    producer.Subscribe(rawDepthMap =>
                    {
                        bag.Add(_pointCloudAdapter.GetPointCloudFromDepthMap(rawDepthMap));
                        _logger?.LogDebug($"Thread {Thread.CurrentThread.ManagedThreadId}: {bag.Count:N0} PointCloud(s) added to concurrent bag");
                    }, 
                    onCompleted: () =>
                    {
                        PointCloudPartitioningService ps = new PointCloudPartitioningService();
                        observer.OnNext(ps.Partition(bag.ToList()));

                        _logger?.LogDebug($"Thread {Thread.CurrentThread.ManagedThreadId}: PointCloud PairCollection generated " +
                            $"for file set \"{Path.GetFileNameWithoutExtension(bag.FirstOrDefault().Source)}\"");
                    });
                });
                observer.OnCompleted();
                return () => { };
            });
    }

    public IObservable<RawDepthMap> RawDepthMapProducer(List<string> filePaths, CancellationToken token)
    {
        return Observable.Create<RawDepthMap>(
            observer =>
            {
                int index = 0;
                foreach(var filePath in filePaths)
                {
                    token.ThrowIfCancellationRequested();
                    var extractor = DepthMapExtractorFactory.GetDepthMapExtractor(filePath);

                    observer.OnNext(extractor.GetDepthMap(filePath, index++));
                    _logger?.LogDebug($"Thread {Thread.CurrentThread.ManagedThreadId}: DepthMap extracted from \"{filePath}\"");
                }
                observer.OnCompleted();
                return () => { };
            });
    }
}

我在寻求: 1. 我上面的代码有什么问题 _icpBatchSolverService.RecursivelyMergeAsync 返回一个 Task 并且是并发的,我想要这个拖车同时运行.2. 我的代码还有什么问题?

I am seeking: 1. What is wrong with my code above note _icpBatchSolverService.RecursivelyMergeAsync returns a Task<IPointCloud and is concurrent and I would like this tow run concurrently. 2. What else is wrong with my code?

推荐答案

我打算留下一个通用的答案,因为上面的代码过于庞大,无法将其归结.

I'm going to leave a generic answer, because the code up above is too extensive to boil it down.

有两种语法可用于定义异步行为.第一个是 async/await 模式,第二个和更老的模式是 Subscribe() 模式(反应式).

There are two syntaxes which may be used to define asynchronous behavior. The first is the async/await pattern and the second, and older, is the Subscribe() pattern (reactive).

异步和并发是一回事吗?

不,绝对不是.对于可能正在阅读本文但不知道的人来说,异步意味着稍后发生",而不是同时发生".通过使用这两种语法中的任何一种,您都可以定义在满足某个谓词后立即发生的行为.一个非常常见的用例是处理从 Web 服务器返回的响应.您需要发出请求,然后在响应返回时执行某些操作.

No, it is definitely not. For those who might be reading this who don't know, asynchronous means "it happens later," not "it happens concurrently." By using either of these syntaxes, you're defining behavior that happens immediately after some predicate has been met. A very common use case is to handle a response coming back from a web server. You need to make the request, then do something when the response comes back.

并发性不同.例如,您可以使用 Task.Run()Parallel.ForEach() 来调用并发.在这两种情况下,您都在定义一个分叉.在 Task.Run 的情况下,您可能稍后会执行 Task.WaitAll.在 Parallel.ForEach 的情况下,它会为你做 fork/join.当然,reactive 有一套自己的 fork/join 操作.

Concurrency is different. You might invoke concurrency by using Task.Run() or Parallel.ForEach(), for example. In both cases, you're defining a fork. In the case of Task.Run, you might then later do a Task.WaitAll. In the case of the Parallel.ForEach, it will do the fork/join for you. Of course, reactive has its own set of fork/join operations.

当我等待或订阅时会发生什么?

以下两行代码都具有相同的行为,这种行为让很多程序员感到困惑:

The following two lines of code both have the same behavior, and that behavior confuses a good number of programmers:

var result = await myAsync();

myObservable.Subscribe(result => { ... });

在这两种情况下,程序的控制流都以可预测但可能令人困惑的方式移动.在第一种情况下,当 await 正在等待时,控制流返回到父调用者.在第二个中,控制流移动到下一行代码,在返回结果时调用 lambda 表达式.

In both cases, the control flow of the program moves in a predictable, but potentially-confusing fashion. In the first case, control flow returns back to the parent caller while the await is being awaited. In the second, control flow moves on to the next line of code, with the lambda expression being called upon the return of the result.

我在学习如何使用这些的人中看到的一个常见事情是尝试将 lambda 中的变量分配给父作用域中的地址.这是行不通的,因为在执行 lambda 之前,该作用域将不复存在.使用 async/await 不太可能做一些愚蠢的事情,但您还必须记住,控制流将向上调用堆栈,直到定义下一个同步操作.本文更深入地解释它,以及这篇文章 更容易理解.

A common thing that I've seen among people learning how to use these is to try to assign a variable from within the lambda to an address in the parent scope. This isn't going to work, because that scope will cease to exist long before the lambda is executed. It's less likely to do something stupid using async/await, but you also have to remember that the control flow will go up the call stack until the next synchronous operation is defined. This article explains it in a little more depth, and this article is a little easier to understand.

这篇关于使 IObservable 订阅并发的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆