什么IProducerConsumerCollection&LT型; T>用我的任务是什么? [英] What type of IProducerConsumerCollection<T> to use for my task?

查看:83
本文介绍了什么IProducerConsumerCollection&LT型; T>用我的任务是什么?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有整整100个传感器每个测量自己的数据。我有一个确切的 DataSender 的应该从传感器发送信息。最新的信息应该被发送

I have exactly 100 Sensors each "measuring" own data. I have exactly one DataSender which should send information from "sensors". The most recent information should be sent.

信道的带宽可以小于由100个传感器产生的数据。在这种情况下,一些数据可以略过 - 但我们应该是公平粗略。例如,我们可以跳过从各传感器每秒测量。

Bandwidth of the channel may be less than data produced by 100 sensors. In this case some data can be skipped - but we should be "roughly fair". For example, we could skip every second measurement from each sensor.

我不知道每个传感器的频率产生数据,但一般来说,他们往往漂亮生成数据。

I don't know how often each sensor generates data, but in general they generate data pretty often.

我的其他职位后:

  • how to create singleton which always running in separate thread?
  • Modified Producer/Consumer example, any problems with it?

我已经决定,我有经典的生产者/消费者问题,用:

I have decided that I have classical Producer/Consumer problem, with:


  • 100生产者和

  • 1消费者

我一直建议使用 BlockingCollection 这一点。与 BlockingCollection 唯一的问题 - 一旦你添加的项目,你不能的替换它。但是,在我的应用程序,如果传感器产生新的价值,而不是由消费类加工前值,其值应替换

I've been suggested to use BlockingCollection for this. The only problem with BlockingCollection - once you have added item, you cannot replace it. But in my application, if sensor produces a new value, and previous value was not processed by the Consumer, the value should be replaced.

我应该使用使用的 ConcurentDictionary 或的 ConcurentBag 该任务?

Should I use use a ConcurentDictionary or ConcurentBag for that task?

从概念上讲,所有我需要是100个元素的数组。

Conceptually, all I need is an array of 100 elements.

传感器#33应的替换它的值到数组[33]:

Sensor #33 should replace it's value into array[33]:

| Sensor | Value |
|--------|-------|
|      1 |       |
|      2 |       |
|      3 |       |
/......../......./
|     32 |       |
|     33 | 101.9 |
|     34 |       |
/......../......./
|     98 |       |
|     99 |       |
|    100 |       |



消费者应<$ C取值$ C>数组[33] ,如果不为空,然后把它和设置阵列[33]为null。 消费者应在阵列尽快任何不为空值反应。

Consumer should take value from array[33] and if not null, then send it and set array[33] to null. Consumer should react on any not null values in array asap.

推荐答案

我想你应该实现自己的 IProducerConsumerCollection< T> 。这就是为什么它是一个接口,这样你可以很容易地制作自己的

I think you should implement your own IProducerConsumerCollection<T>. That's why it's an interface: so that you could easily make your own.

您可以用做词典< K,V> 队列< T> ,以确保接收数据是公平的,也就是说,如果你有一个产生的数据非常快的只需一台设备,你就不会发送数据。单从这个

You could do it using Dictionary<K,V> and Queue<T> to make sure receiving the data is fair, i.e. if you have just one device that produces data very fast, you won't send data just from this one.

public class DeviceDataQueue<TDevice, TData>
    : IProducerConsumerCollection<Tuple<TDevice, TData>>
{
    private readonly object m_lockObject = new object();
    private readonly Dictionary<TDevice, TData> m_data
        = new Dictionary<TDevice, TData>();
    private readonly Queue<TDevice> m_queue = new Queue<TDevice>();

    //some obviously implemented methods elided, just make sure they are thread-safe

    public int Count { get { return m_queue.Count; } }

    public object SyncRoot { get { return m_lockObject; } }

    public bool IsSynchronized { get { return true; } }

    public bool TryAdd(Tuple<TDevice, TData> item)
    {
        var device = item.Item1;
        var data = item.Item2;

        lock (m_lockObject)
        {
            if (!m_data.ContainsKey(device))
                m_queue.Enqueue(device);

            m_data[device] = data;
        }

        return true;
    }

    public bool TryTake(out Tuple<TDevice, TData> item)
    {
        lock (m_lockObject)
        {
            if (m_queue.Count == 0)
            {
                item = null;
                return false;
            }

            var device = m_queue.Dequeue();
            var data = m_data[device];
            m_data.Remove(device);
            item = Tuple.Create(device, data);
            return true;
        }
    }
}

在沿着这些线路使用:

When used along these lines:

Queue = new BlockingCollection<Tuple<IDevice, Data>>(
    new DeviceDataQueue<IDevice, Data>());

Device1 = new Device(1, TimeSpan.FromSeconds(3), Queue);
Device2 = new Device(2, TimeSpan.FromSeconds(5), Queue);

while (true)
{
    var tuple = Queue.Take();
    var device = tuple.Item1;
    var data = tuple.Item2;

    Console.WriteLine("{0}: Device {1} produced data at {2}.",
        DateTime.Now, device.Id, data.Created);

    Thread.Sleep(TimeSpan.FromSeconds(2));
}



它产生的输出如下:

it produces the following output:

30.4.2011 20:40:43: Device 1 produced data at 30.4.2011 20:40:43.
30.4.2011 20:40:45: Device 2 produced data at 30.4.2011 20:40:44.
30.4.2011 20:40:47: Device 1 produced data at 30.4.2011 20:40:47.
30.4.2011 20:40:49: Device 2 produced data at 30.4.2011 20:40:49.
30.4.2011 20:40:51: Device 1 produced data at 30.4.2011 20:40:51.
30.4.2011 20:40:54: Device 2 produced data at 30.4.2011 20:40:54.

这篇关于什么IProducerConsumerCollection&LT型; T&GT;用我的任务是什么?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆