点流连接在100,000个消耗的事件处停止 [英] Point stream join stops at 100,000 consumed events

查看:72
本文介绍了点流连接在100,000个消耗的事件处停止的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个观察者将两个点连接在一起。在连接中,1个流正在接收高频数据,另一个流非常缓慢地接收数据(即,没有数据或数据很少)。使用EventFlowDebugger(参见屏幕截图)我看到第一个
总消耗事件计数停止在100,000,而总传入事件计数继续增加。

I have a an observer that joins 2 point streams together. In the join, 1 stream is receiving high frequency data, the other stream receives data very slowly (i.e. no data, or data quite rarely) . Using EventFlowDebugger (see screenshot) I see that first the Total Consumed Events Count stops at 100,000 while the Total Incoming Events Count continues to increment.

然后当总传入事件计数达到时300,000这个也停止了。达到此限制后,在此观察者停止并重新启动之前,不会生成或消耗任何传入事件。

Then when Total Incoming Events Count reaches 300,000 this too stops. After this limit is reached no more incoming events are either produced or consumed until this observer is stopped and restarted.

                // Create the point streams for timing and car data
                var streamTiming = from s in inputTimingSubject.ToPointStreamable(e => PointEvent<TimingModel>.CreateInsert(new DateTimeOffset(e.MessageDateTime), e), AdvanceTimeSettings.IncreasingStartTime)
                                  select s;

                var streamData = from s in inputDataSubject.ToPointStreamable(e => PointEvent<CarAggregateModel>.CreateInsert(new DateTimeOffset(e.MessageDateTime), e), AdvanceTimeSettings.IncreasingStartTime)
                                 select s;

                var signal = streamTiming.AlterEventDuration(e => TimeSpan.FromSeconds(maxDuration))
                                               .ClipEventDuration(streamTiming, (e1, e2) => (e1.Driver.RacingNumber == e2.Driver.RacingNumber));
                
                // Join the timing events to the data events
                var query = from t in signal
                            from d in streamData
                            where t.Driver.RacingNumber == d.RacingNumber
                            select new DriverGearModel
                                {
                                    SessionId = t.SessionId,
                                    StartLoop = t.StartLoop,

任何想法如何防止streaminsight停止在100,000个总消耗事件计数的限制?我看不出有什么理由会发生这种情况。

Any idea how I can prevent streaminsight stopping at the limit of 100,000 Total Consumed Events Count? I can't see any reason why this should be happening.

由于我的数据的性质,连接的一侧是高频而另一侧是低频,这意味着可能是系统6分钟后停止处理数据。

Due the nature of my data with one side of the join being high frequency and the other low frequency this means that potentially the system stops processing data after 6 minutes.

谢谢,

Jeremy

推荐答案

当您加入/联合两个不同的流时,StreamInsight将同步到最慢的流,这就是您在此处看到的内容。缓慢移动的数据流阻止了您的结果流向前移动,因此,输入队列正在填充

When you join/union two different streams, StreamInsight will sync to the slowest stream and that's what you're seeing here. The slow-moving data stream is preventing your resulting stream from moving forward and, because of that, the input queue is filling up.

您需要做的是将CTI引入数据流。您可以通过从数据流导入CTI或添加计数器来在流中人为生成CTI,而不管是否有排队的事件。 IncreaseStartTime
- 事实上,任何CTI生成选项 - 只有在您将事件排入队列时才将CTI添加到流中。

What you need to do is introduce CTIs into your data stream. You can do this either by importing CTIs from the data stream or by adding, say, a counter to artificially produce CTIs in the stream regardless of whether there are events being queued. IncreasingStartTime - and, in fact, any CTI generation option - only adds CTIs to the stream when you have events being enqueued.


这篇关于点流连接在100,000个消耗的事件处停止的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆