为什么Observable.Generate()抛出System.StackOverflowException? [英] Why does Observable.Generate() throw System.StackOverflowException?

查看:288
本文介绍了为什么Observable.Generate()抛出System.StackOverflowException?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

I'm编写C#(.NET 4.5)用于聚合基于时间的事件进行报告的应用程序。为了使我的查询逻辑可重复使用的两种实时数据和历史数据我利用无扩展名(2.0)及其 IScheduler 基础设施(对<$ C C $> HistoricalScheduler 和朋友)。

I´m writing a C# (.NET 4.5) application that is used to aggregate time based events for reporting purposes. To make my query logic reusable for both realtime and historical data I make use of the Reactive Extensions (2.0) and their IScheduler infrastructure (HistoricalScheduler and friends).

例如,假设我们创建的事件列表(按时间顺序进行排序,但他们可能一致!),其唯一的有效载荷IST的时间戳,想知道他们在整个固定期限的缓冲区分配:

For example, assume we create a list of events (sorted chronologically, but they may coincide!) whose only payload ist their timestamp and want to know their distribution across buffers of a fixed duration:

const int num = 100000;
const int dist = 10;

var events = new List<DateTimeOffset>();
var curr = DateTimeOffset.Now;
var gap = new Random();

var time = new HistoricalScheduler(curr);

for (int i = 0; i < num; i++)
{
    events.Add(curr);
    curr += TimeSpan.FromMilliseconds(gap.Next(dist));
}

var stream = Observable.Generate<int, DateTimeOffset>(
    0,
    s => s < events.Count,
    s => s + 1,
    s => events[s],
    s => events[s],
    time);

stream.Buffer(TimeSpan.FromMilliseconds(num), time)
    .Subscribe(l => Console.WriteLine(time.Now + ": " + l.Count));

time.AdvanceBy(TimeSpan.FromMilliseconds(num * dist));

运行在 System.StackOverflowException 与下面的堆栈跟踪(it's过去的3条线一路下跌)这种code结果:

Running this code results in a System.StackOverflowException with the following stack trace (it´s the last 3 lines all the way down):

mscorlib.dll!System.Threading.Interlocked.Exchange<System.IDisposable>(ref System.IDisposable location1, System.IDisposable value) + 0x3d bytes    
System.Reactive.Core.dll!System.Reactive.Disposables.SingleAssignmentDisposable.Dispose() + 0x37 bytes    
System.Reactive.Core.dll!System.Reactive.Concurrency.ScheduledItem<System.DateTimeOffset>.Cancel() + 0x23 bytes    
...
System.Reactive.Core.dll!System.Reactive.Disposables.AnonymousDisposable.Dispose() + 0x4d bytes    
System.Reactive.Core.dll!System.Reactive.Disposables.SingleAssignmentDisposable.Dispose() + 0x4f bytes    
System.Reactive.Core.dll!System.Reactive.Concurrency.ScheduledItem<System.DateTimeOffset>.Cancel() + 0x23 bytes    
...

好吧,这个问题似乎来自于我的使用 Observable.Generate(),根据列表大小( NUM ),并考虑调度的选择。

Ok, the problem seems to come from my use of Observable.Generate(), depending on the list size (num) and regardless of the choice of scheduler.

我是什么做错了吗?或者更一般地,这将是preferred的方式来创建一个的IObservable 的IEnumerable 提供事件自己的时间戳?

What am I doing wrong? Or more generally, what would be the preferred way to create an IObservable from an IEnumerable of events that provide their own timestamps?

推荐答案

(更新 - 意识到我并没有提供一种替代方案:看到答案的底部)

(update - realized I didn't provide an alternative: see at bottom of answer)

现在的问题是如何 Observable.Generate 作品 - 它是用来展开一个的 corecursive (想想递归里面翻出来)生成基于参数;如果这些争论最终产生的非常的嵌套corecursive发电机,你会打击你的堆栈。

The problem is in how Observable.Generate works - it's used to unfold a corecursive (think recursion turned inside out) generator based on the arguments; if those arguments end up generating a very nested corecursive generator, you'll blow your stack.

从这个角度上,<击>我猜测很多(没有接收源在我面前)(见下文),但我敢打赌你定义结束扩展到是这样的:

From this point on, I'm speculating a lot (don't have the Rx source in front of me) (see below), but I'm willing to bet your definition ends up expanding into something like:

initial_state =>
generate_next(initial_state) => 
generate_next(generate_next(initial_state)) => 
generate_next(generate_next(generate_next(initial_state))) =>
generate_next(generate_next(generate_next(generate_next(initial_state)))) => ...

和进行,直到你调用栈得到足够大的溢出。在,也就是说,一个方法签名+您的整型计数,那简直像每递归调用8-16字节(更多取决于状态机生成器是如何实现的),所以60000听起来是正确的(1M / 16〜62500最大深度)

And on and on until your call stack gets big enough to overflow. At, say, a method signature + your int counter, that'd be something like 8-16 bytes per recursive call (more depending on how the state machine generator is implemented), so 60,000 sounds about right (1M / 16 ~ 62500 maximum depth)

编辑:拉升源 - 确认:​​生成看起来像这样的运行的方法 - 注意嵌套调用到生成

Pulled up the source - confirmed: the "Run" method of Generate looks like this - take note of the nested calls to Generate:

protected override IDisposable Run(
    IObserver<TResult> observer, 
    IDisposable cancel, 
    Action<IDisposable> setSink)
{
    if (this._timeSelectorA != null)
    {
        Generate<TState, TResult>.α α = 
                new Generate<TState, TResult>.α(
                     (Generate<TState, TResult>) this, 
                     observer, 
                     cancel);
        setSink(α);
        return α.Run();
    }
    if (this._timeSelectorR != null)
    {
        Generate<TState, TResult>.δ δ = 
               new Generate<TState, TResult>.δ(
                   (Generate<TState, TResult>) this, 
                   observer, 
                   cancel);
        setSink(δ);
        return δ.Run();
    }
    Generate<TState, TResult>._ _ = 
             new Generate<TState, TResult>._(
                  (Generate<TState, TResult>) this, 
                  observer, 
                  cancel);
    setSink(_);
    return _.Run();
}

编辑:DERP,没有提供任何的替代品......这里有一个可能的工作是:

Derp, didn't offer any alternatives...here's one that might work:

(编辑:固定 Enumerable.Range ,使视频流的大小不需额外被 CHUNKSIZE 乘)

( fixed Enumerable.Range, so stream size won´t be multiplied by chunkSize)

const int num = 160000;
const int dist = 10;

var events = new List<DateTimeOffset>();
var curr = DateTimeOffset.Now;
var gap = new Random();
var time = new HistoricalScheduler(curr);

for (int i = 0; i < num; i++)
{
    events.Add(curr);
    curr += TimeSpan.FromMilliseconds(gap.Next(dist));
}

    // Size too big? Fine, we'll chunk it up!
const int chunkSize = 10000;
var numberOfChunks = events.Count / chunkSize;

    // Generate a whole mess of streams based on start/end indices
var streams = 
    from chunkIndex in Enumerable.Range(0, (int)Math.Ceiling((double)events.Count / chunkSize) - 1)
    let startIdx = chunkIndex * chunkSize
    let endIdx = Math.Min(events.Count, startIdx + chunkSize)
    select Observable.Generate<int, DateTimeOffset>(
        startIdx,
        s => s < endIdx,
        s => s + 1,
        s => events[s],
        s => events[s],
        time);

    // E pluribus streamum
var stream = Observable.Concat(streams);

stream.Buffer(TimeSpan.FromMilliseconds(num), time)
    .Subscribe(l => Console.WriteLine(time.Now + ": " + l.Count));

time.AdvanceBy(TimeSpan.FromMilliseconds(num * dist));

这篇关于为什么Observable.Generate()抛出System.StackOverflowException?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆