TBB管道输出不正确 [英] incorrect output with TBB pipeline

查看:228
本文介绍了TBB管道输出不正确的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经在文本文件(例如 1.txt,2.txt ... 100.txt )中编写了不同值(100次)的C结构。

I have written a C structure with different values (100 times) in text files such as 1.txt, 2.txt ... 100.txt

我在Linux上使用英特尔TBB。我已创建:

I am using Intel TBB on Linux. I have created:


  1. InputFilter(serial_in_order MODE)

  2. TransformFIlter(serial_in_order MODE)

  3. OutputFilter(Serial_in_order MODE)

InputFilter从文件读取结构并将其传递给TransformFilter。
TrasnformFilter更新结构值并将其传递给OutputFilter。
OutputFilter将新结构写入光盘。

The InputFilter reads structure from a file and passes it to TransformFilter. The TrasnformFilter updates the structure values and passes it to OutputFilter. The OutputFilter writes the new structure on the disc.

基本上,这是一个结构的简单读写应用程序。 em>

class InputFilter: public tbb::filter {
public:
    InputFilter( int );
    ~InputFilter();
private:
    int total_streams;
    int count;
    struct video_process_object input_obj;
    void* operator()( void* );
};

InputFilter::InputFilter( int x )
        : filter( serial_in_order ) {
    total_streams = x;
    count = 1;
}

InputFilter::~InputFilter() {
    total_streams = 0;
}

void* InputFilter::operator()( void* ) {
    char path[50] = { };
    sprintf( path, "input//%d.txt", count );
    printf( "Path : %s\n", path );
    FILE *fp;
    fp = fopen( path, "r" );

    if( fp == NULL || count > total_streams ) {
        fclose( fp );
        printf( "\n*******Cannot find more data.Terminating********\n\n\n" );
        return NULL;
    }

    fscanf( fp, "%d", &input_obj.video_id );
    fscanf( fp, "%s", &input_obj.storage_url );
    fscanf( fp, "%s", &input_obj.storage_type );
    fscanf( fp, "%d", &input_obj.face_detect );
    fscanf( fp, "%d", &input_obj.face_recognise );
    fscanf( fp, "%d", &input_obj.scene_recognise );
    fscanf( fp, "%d", &input_obj.activity_recognise );
    fscanf( fp, "%d", &input_obj.speech_recognise );
    fclose( fp );

    count++;
    return &input_obj;
}

class TransformFilter: public tbb::filter {
public:
    TransformFilter();
    ~TransformFilter();
private:
    struct video_process_object input_transform;
    void* operator()( void* );
};

TransformFilter::TransformFilter()
        : filter( serial_in_order ) {
}

TransformFilter::~TransformFilter() {
}

void* TransformFilter::operator()( void *item ) {

    input_transform = *static_cast<struct video_process_object*>( item );

    input_transform.video_id += 1000;
    strcat( input_transform.storage_url, "  nabeel" );
    strcat( input_transform.storage_type, " N" );
    input_transform.face_detect += 1000;
    input_transform.face_recognise += 1000;

    return &input_transform;
}

class OutputFilter: public tbb::filter {
public:
    OutputFilter();
    ~OutputFilter();
private:
    struct video_process_object output_obj;
    void* operator()( void* );
};

OutputFilter::OutputFilter()
        : filter( serial_in_order ) {
    int status = mkdir( "output", S_IRWXU | S_IRWXG | S_IRWXO );
    if( status == -1 )
        printf( "\nOutput directory already exists\n\n" );
}

OutputFilter::~OutputFilter() {
}

void* OutputFilter::operator()( void *item ) {

    output_obj = *static_cast<struct video_process_object*>( item );

    FILE *fp;

    char path[50] = { };
    sprintf( path, "output//%d.txt", output_obj.video_id - 1000 );
    printf( "Output Path : %s\t\t %d\n\n", path, output_obj.video_id );

    if( (fp = fopen( path, "w" )) == NULL ) {
        fprintf( stderr, "Cannot open output file.\n" );
        return NULL;
    }

    fprintf( fp, "%d\n", output_obj.video_id );
    fprintf( fp, "%s\n", output_obj.storage_url );
    fprintf( fp, "%s\n", output_obj.storage_type );
    fprintf( fp, "%d\n", output_obj.face_detect );
    fprintf( fp, "%d\n", output_obj.face_recognise );
    fprintf( fp, "%d\n", output_obj.scene_recognise );
    fprintf( fp, "%d\n", output_obj.activity_recognise );
    fprintf( fp, "%d\n", output_obj.speech_recognise );

    fclose( fp );
    return NULL;
}

int main() {
    tbb::pipeline pipeline;

    InputFilter input_filter( 100 );
    pipeline.add_filter( input_filter );

    TransformFilter transform_filter;
    pipeline.add_filter( transform_filter );

    OutputFilter output_filter;
    pipeline.add_filter( output_filter );

    tbb::tick_count t0 = tbb::tick_count::now();

    tbb::task_scheduler_init init_parallel;
    pipeline.run( 1 );
    tbb::tick_count t1 = tbb::tick_count::now();

    return 0;
}

一切都适用于少量文件,如5或10.问题开始问题是:

Everything works fine with small number of files such 5 or 10. The problem starts when I read large number of files such as 50 or 100. The problem is:


有时InputFilter会读取10.txt文件,而TransformFilter会处理它。但是立即InputFilter读取11.txt。 OutputFIlter跳过10.txt并处理11.txt。

Sometimes InputFilter reads 10.txt file and TransformFilter processes it. But immediately InputFilter reads 11.txt. OutputFIlter skips 10.txt and processes 11.txt.

如何确保不会发生这种情况?

How can I make sure that this will not happen?

推荐答案

有一个数据竞赛,因为video_process_objects放置在过滤器结构中,并通过过滤器之间的引用传递(当然并行运行)。所以,你有这样的情况,当 InputFilter 开始处理下一个令牌读取新数据到其video_process_object,而第一个令牌只是开始读取数据 TransformFilter

There is a data-race, because the video_process_objects are placed inside filter structure and passed by reference between filters (which run in parallel of course). So, you have the situation when InputFilter starts processing the next token reading new data into its video_process_object while the first token just starts reading the data by the same address in TransformFilter:

     Token 1                ||         Token 2
input_filter.operator()     ||
transform_filter.operator() ||  input_filter.operator()
...

要解决这个问题,例如:

To fix it, allocate the data dynamically, e.g.:

struct video_process_object *input_obj_ptr = new video_process_object;
fscanf( fp, "%d", &input_obj_ptr->video_id );
...
return input_obj_ptr;

并在最后一个过滤器中取消分配,因为它的返回值被忽略。
这个旧的演示文稿描绘了类似的代码。

And deallocate it in the last filter since its return value is ignored anyway. The slides 49-50 in this old presentation sketches the similar code.

最后,让我挑战你对tbb :: pipelene和serial_in_order过滤器类型的选择。 TBB参考说:

And finally, let me challenge your choice of tbb::pipelene and serial_in_order filter type. The TBB Reference says:


并行过滤器在实际中是首选,因为它们允许并行加速。如果过滤器必须是串行的,那么在实际应用中优选次序不同的变体,因为它对处理顺序的约束较小。

Parallel filters are preferred when practical because they permit parallel speedup. If a filter must be serial, the out of order variant is preferred when practical because it puts less contraints on processing order.

没有理由把这个附加的有序限制,因为处理和文件是独立的。为了更好的结构化代码,另一个引用要考虑:

I see no reasons to put this additional 'in order' limitation since the processing and files are independent. And another quote to consider for sake of better-structured code:


函数parallel_pipeline提供一个强类型的lambda友好的方法来构建和运行管道。

Function parallel_pipeline provides a strongly typed lambda-friendly way to build and run pipelines.

这篇关于TBB管道输出不正确的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆