实现一个缓冲区来从多个线程写入数据? [英] Implementing a buffer to write data from multiple threads?

查看:107
本文介绍了实现一个缓冲区来从多个线程写入数据?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的程序使用迭代器遍历地图,并产生一些工作线程来处理读取迭代器中的,这是非常好的。现在,我想为每个点写出输出,为此我正在使用内存缓冲区来确保在写入文件(通过另一个迭代器写入)之前以正确的顺序从线程中收集数据, :

  public class MapMain 
{
//这里使用多个线程,每个线程都在Run )
//请求和处理地图点

public void Run()
{
//从某处和点处获取点
int pointIndex = ...

bufferWriter.StartPoint(pointIndex);

//执行多个计算。
//为简单起见,本例中的numberOfComputations = 1
bufferedWriter.BufferValue(pointIndex,value);

bufferWriter.EndPoint(pointIndex);
}
}

我尝试实现缓冲区:

  public class BufferWriter 
{
private const int BufferSize = 4;

private readonly IIterator迭代器;
private readonly float?[] bufferArray;
private readonly bool [] bufferingCompleted;
private readonly SortedDictionary< long,int> pointIndexToBufferIndexMap;
private readonly object syncObject = new object();

private int bufferCount = 0;
private int endBufferCount = 0;

public BufferWriter(....)
{
iterator = ...
bufferArray = new float?[BufferSize];
bufferingCompleted = new bool [BufferSize];
pointIndexToBufferIndexMap = new SortedDictionary< long,int>();
}

public void StartPoint(long pointIndex)
{
lock(syncObject)
{
if(bufferCount == BufferSize)
{
Monitor.Wait(syncObject);
}

pointIndexToBufferIndexMap.Add(pointIndex,bufferCount);
bufferCount ++;
}
}

public void BufferValue(long pointIndex,float value)
{
lock(syncObject)
{
int bufferIndex = pointIndexToBufferIndexMap [pointIndex];
bufferArray [bufferIndex] = value;
}
}

public void EndPoint(long pointIndex)
{
lock(syncObject)
{
int bufferIndex = pointIndexToBufferIndexMap [pointIndex];
bufferingCompleted [bufferIndex] = true;

endBufferCount ++;
if(endBufferCount == BufferSize)
{
FlushBuffer();
Monitor.PulseAll(syncObject);
}
}
}

private void FlushBuffer()
{
//按顺序迭代
foreach( long pointIndex in pointIndexToBufferIndexMap.Keys)
{
//移动迭代器
iterator.MoveNext();

int bufferIndex = pointIndexToBufferIndexMap [pointIndex];

if(bufferArray [bufferIndex] .HasValue)
{
iterator.Current = bufferArray [bufferIndex];

//清除为空
bufferArray [bufferIndex] = null;
}
}

bufferCount = 0;
endBufferCount = 0;
pointIndexToBufferIndexMap.Clear();
}
}

我正在寻找反馈来修正和纠正我的代码中的错误并解决了任何性能问题:



[1]简而言之:我有一个固定大小的缓冲区,以稍微随机的顺序从多个线程处理点收集数据。当缓冲区完全填满数据时,必须刷新。但是如果我收集了0到9分,但是8号是丢失了怎么办?我的缓冲区已经满了,任何尝试使用缓冲区的点将阻塞,直到执行刷新,这需要8点。



[2]缓冲区中的值顺序不对应于值所指的地图点的顺序。如果是这种情况,那么我认为刷新会更容易(数组访问速度比SortedDictionary检索时间快)?另外,这可能会让我们重新使用冲洗的插槽来传入数据(循环缓冲区?)



但是我不能想到一个工作模式来实现这一点。 / p>

[3]缓冲区等待,直到它在冲洗前完全填满。线程调用 EndPoint() iterator.Current 的情况很多都是指这一点。
这一点可以立即写(即调用iterator.Current并枚举一次)可能更有意义,但是如何才能完成?



为了清楚起见,BufferWriter中写入 iterator 在其自身的级别有一个缓冲区,用于缓存在当前属性在写入输出之前,但我不用担心。



我觉得整个事情需要从头开始重写! >

任何帮助赞赏,谢谢。

解决方案

这是我应该工作的解决方案,虽然我没有测试它。
添加一个新字段:

 私有只读队列< AutoResetEvent> waitHandles = new Queue< AutoResetEvent>(); 

两个(开始和结束)需要更改为:



开始:

  if(bufferCount == BufferSize)
{
AutoResetEvent ev =新的AutoResetEvent(false);
waitHandles.Enqueue(ev);
ev.WaitOne();
}

结束:

  if(endBufferCount == BufferSize)
{
FlushBuffer(); (int i = 0; i< Math.Min(waitHandles.Count,BufferSize); ++ i)
{
waitHandles.Dequeue()Set();
}
}


My program uses an iterator to traverse through a map, and spawns off a number of worker threads to process the points from the read iterator, which is all good. Now, I'd like to write the output for each point, and for that I'm using a memory buffer to ensure data is collected from the threads in the correct order before it is written to the file (via another iterator for writing):

public class MapMain
{
    // Multiple threads used here, each thread starts in Run() 
    // requests and processes map points

    public void Run()
    {
        // Get point from somewhere and process point
        int pointIndex = ...

        bufferWriter.StartPoint(pointIndex);

        // Perform a number of computations.
        // For simplicity, numberOfComputations = 1 in this example   
        bufferedWriter.BufferValue(pointIndex, value);

        bufferWriter.EndPoint(pointIndex); 
    }
}

My attempt at implementing a buffer:

public class BufferWriter
{
  private const int BufferSize = 4;

  private readonly IIterator iterator;
  private readonly float?[] bufferArray;
  private readonly bool[] bufferingCompleted;
  private readonly SortedDictionary<long, int> pointIndexToBufferIndexMap;
  private readonly object syncObject = new object();  

  private int bufferCount = 0;
  private int endBufferCount = 0;

  public BufferWriter(....)
  {
      iterator = ...
      bufferArray = new float?[BufferSize];
      bufferingCompleted = new bool[BufferSize];
      pointIndexToBufferIndexMap = new SortedDictionary<long, int>();
  }

  public void StartPoint(long pointIndex)
  {
    lock (syncObject)
    {
        if (bufferCount == BufferSize)
        {
            Monitor.Wait(syncObject);
        }

        pointIndexToBufferIndexMap.Add(pointIndex, bufferCount);   
        bufferCount++;
    }
  }

  public void BufferValue(long pointIndex, float value)
  {
      lock (syncObject)
      {
          int bufferIndex = pointIndexToBufferIndexMap[pointIndex];
          bufferArray[bufferIndex] = value;          
      }
  }

  public void EndPoint(long pointIndex)
  {
      lock (syncObject)
      {
          int bufferIndex = pointIndexToBufferIndexMap[pointIndex];
          bufferingCompleted[bufferIndex] = true;

          endBufferCount++;
          if (endBufferCount == BufferSize)
          {
              FlushBuffer();
              Monitor.PulseAll(syncObject);
          }
      }
  }

  private void FlushBuffer()
  {
      // Iterate in order of points
      foreach (long pointIndex in pointIndexToBufferIndexMap.Keys)
      {
          // Move iterator 
          iterator.MoveNext();

          int bufferIndex = pointIndexToBufferIndexMap[pointIndex];

          if (bufferArray[bufferIndex].HasValue)
          {                  
              iterator.Current = bufferArray[bufferIndex];

              // Clear to null
              bufferArray[bufferIndex] = null;                  
          }
      }

      bufferCount = 0;
      endBufferCount = 0;
      pointIndexToBufferIndexMap.Clear();
  }        
}

I'm looking for feedback to fix and correct the bugs in my code and resolve any performance issues:

[1] In short: I have a fixed-size buffer that collects data from multiple threads processing points in somewhat random order. When the buffer gets completely filled with data, it has to be flushed. But what if I collected points 0 to 9 but point 8 was missing ? My buffer is already full and any point trying to use the buffer will block until a flush is performed, which needs point 8.

[2] Order of values in the buffer does not correspond to the order of the map points the values refer to. If this was the case, then I think flushing would be easier (array access faster than SortedDictionary retrieval time ?). In addition, this might allow us to reuse the flushed slots for incoming data (circular buffer ?)

But I can't think of a working model to achieve this.

[3] Buffer waits until it gets completely filled before flushing. There are many instances where a thread invokes EndPoint() and iterator.Current happens to refer to that point. It might make more sense to instantly "write" (i.e. call 'iterator.Current' and enumerate once) for that point, but how can this be done ?

Just to be clear, the writing iterator in BufferWriter has a buffer at its own level to cache values invoked on its Current property before writing to output, but I don't have to worry about it.

I feel like the whole thing needs to be rewritten from scratch !

Any help appreciated, Thank you.

解决方案

That's my solution that should work, although I haven't tested it. Add a new field:

private readonly Queue<AutoResetEvent> waitHandles = new Queue<AutoResetEvent>();

Two if's (Start and End) require changing to:

Start:

if (bufferCount == BufferSize)
{
    AutoResetEvent ev = new AutoResetEvent( false );
    waitHandles.Enqueue( ev );
    ev.WaitOne();
}

End:

if (endBufferCount == BufferSize)
{
   FlushBuffer();
   for ( int i = 0; i < Math.Min( waitHandles.Count, BufferSize ); ++i )
   {
      waitHandles.Dequeue().Set();
   }
}

这篇关于实现一个缓冲区来从多个线程写入数据?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆