带有消费者和生产者线程的循环缓冲区:它在某些执行中陷入困境 [英] Circular Buffer with Threads Consumer and Producer: it get stucks some executions

查看:94
本文介绍了带有消费者和生产者线程的循环缓冲区:它在某些执行中陷入困境的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在开发带有两个线程的循环缓冲区:消费者和生产者. 我正在使用Thread.yield进行主动等待. 我知道可以使用信号量来做到这一点,但是我想要没有信号量的缓冲区.

I'm developing a circular buffer with two Threads: Consumer and Producer. I'm using active waiting with Thread.yield. I know that it is possible to do that with semaphores, but I wanted the buffer without semaphores.

两者都有一个共享变量:bufferCircular.

Both have a shared variable: bufferCircular.

虽然缓冲区中未充满有用的信息,但producer在数组的位置p写入数据,而有些有用的信息consumer在数组的位置c读取数据. BufferCircular中的变量nElem是尚未读取的值数据的数量.

While the buffer is not full of useful information, producer write data in the position pof array, and while there are some useful information consumer read data in the position c of array. The variable nElem from BufferCircular is the number of value datas that haven't been read yet.

该程序可以运行9/10次,效果很好.然后,有时,它会陷入无限循环,然后才在屏幕上显示最后一个元素(循环的数字为500),或者只是不显示任何元素.

The program works quite good 9/10 times that runs. Then, sometimes, it get stucks in a infinite loop before show the last element on screen (number 500 of loop for), or just dont' show any element.

我认为可能是一个liveLock,但是我找不到错误.

I think is probably a liveLock, but I can't find the mistake.

共享变量:

public class BufferCircular {
    volatile int[] array;
    volatile int p;
    volatile int c;
    volatile int nElem;

    public BufferCircular(int[] array) {
       this.array = array;
       this.p = 0;
       this.c = 0;
       this.nElem = 0;
    }

    public void writeData (int data) {
       this.array[p] = data;
       this.p = (p + 1) % array.length;
       this.nElem++;
    }

    public int readData() {
       int data = array[c];
       this.c = (c + 1) % array.length;
       this.nElem--;
       return data;
    }

}

生产者线程:

public class Producer extends Thread {
    BufferCircular buffer;
    int bufferTam;
    int contData;

    public Productor(BufferCircular buff) {
       this.buffer = buff;
       this.bufferTam = buffer.array.length;
       this.contData = 0;

    }

    public void produceData() {
       this.contData++;
       this.buffer.writeData(contData);
    }

    public void run() {
       for (int i = 0; i < 500; i++) {
          while (this.buffer.nElem == this.bufferTam) {
          Thread.yield();
        }
          this.produceData();
       }
    }
}

消费者主题:

    public class Consumer extends Thread {
        BufferCircular buffer;
        int cont;

        public Consumer(BufferCircular buff) {
           this.buffer = buff;
           this.cont = 0;
        }

        public void consumeData() {
           int data = buffer.readData();
           cont++;
           System.out.println("data  " + cont + ": " + data);
        }

        public void run() {
           for (int i = 0; i < 500; i++) {
              while (this.buffer.nElem == 0) {
                 Thread.yield();
              }
               this.consumeData();
           }
        }
   }

主要:

public class Main {

    public static void main(String[] args) {
       Random ran = new Random();
       int tamArray = ran.nextInt(21) + 1;
       int[] array = new int[tamArray];

       BufferCircular buffer = new BufferCircular(array);

       Producer producer = new Producer (buffer);
       Consumer consumer = new Consumer (buffer);

       producer.start();
       consumer.start();

       try {
           producer.join();
           consumer.join();
       } catch (InterruptedException e) {
           System.err.println("Error with Threads");
           e.printStackTrace();
    }

    }

}

任何帮助都将受到欢迎.

Any help will be welcome.

推荐答案

您的问题是您的BufferCircular方法对竞争条件敏感.以writeData()为例.它分3个步骤执行,其中一些步骤也不是原子的:

Your problem here is that your BufferCircular methods are sensitive to race conditions. Take for example writeData(). It executes in 3 steps, some of which are also not atomic:

this.array[p] = data;             // 1
this.p = (p + 1) % array.length;  // 2  not atomic
this.nElem++;                     // 3  not atomic

假设有2个线程同时进入writeData().在步骤1,它们都具有相同的p值,并且都重写了array[p]值.现在,array[p]被重写了两次,并且第一个线程必须写入的数据丢失了,因为第二个线程之后写入了相同的索引.然后它们执行步骤2-结果是不可预测的,因为p可以增加1或2(p = (p + 1) % array.length由3个操作组成,线程可以在其中进行交互).然后,执行步骤3.++运算符也不是原子的:它在幕后使用了2个运算.因此nElem也将增加1或2.

Suppose that 2 threads entered writeData() at the same time. At step 1, they both have the same p value, and both rewrite array[p] value. Now, array[p] is rewritten twice and data that first thread had to write, is lost, because second thread wrote to the same index after. Then they execute step 2--and result is unpredictable since p can be incremented by 1 or 2 (p = (p + 1) % array.length consists of 3 operations, where threads can interact). Then, step 3. ++ operator is also not atomic: it uses 2 operations behind the scenes. So nElem becomes also incremented by 1 or 2.

因此,我们得到了完全不可预测的结果.这会导致您的程序执行不佳.

So we have fully unpredictable result. Which leads to poor execution of your program.

最简单的解决方案是将readData()writeData()方法序列化.为此,将它们声明为synchronized:

The simplest solution is to make readData() and writeData() methods serialized. For this, declare them synchronized:

public synchronized void writeData (int data) { //...
public synchronized void readData () { //...


如果只有一个生产者线程和一个消费者线程,则涉及nElem的操作可能会出现竞争条件.解决方案是使用 AtomicInteger 代替int:


If you have only one producer and one consumer threads, race conditions may occur on operations involving nElem. Solution is to use AtomicInteger instead of int:

final AtomicInteger nElem = new AtomicInteger();

并使用其incrementAndGet()decrementAndGet()方法.

这篇关于带有消费者和生产者线程的循环缓冲区:它在某些执行中陷入困境的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆