数据损坏C ++和Python之间的管道 [英] Data corruption Piping between C++ and Python

查看:174
本文介绍了数据损坏C ++和Python之间的管道的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我编写一些代码,从Python获取二进制数据,将其管道到C ++,对数据进行一些处理(在这种情况下计算交互信息度量),然后将结果管道传回到python。虽然测试我发现,如果我发送的数据是一组2个阵列的维度小于1500 X 1500,但如果我发送2个数组是2K X 2K一切正常,我得到了很多腐败的废话。

I am writing some code that takes binary data from Python, Pipes it to C++, does some processing on the data, (in this case calculating a mutual information metric) and then pipes the results back to python. While testing I have found that everything works fine if the data I send is a set of 2 arrays with dimensions less than 1500 X 1500, but if I send 2 arrays that are 2K X 2K I get back a lot of corrupted nonsense.

我目前认为代码的算法部分很好,因为它在使用小(<= 1500 X1500)数组的测试期间提供了预期的答案。这使我相信这是stdin或stdout管道的问题。也许我在某处传递了一些内在的限制。

I currently believe the algorithmic portion of the code is fine because it provides the expected answers during testing with small (<=1500 X1500) arrays. That leads me to believe that this is an issue with either the stdin or stdout piping. That maybe I’m passing some intrinsic limit somewhere.

Python代码和C ++代码如下。

The Python Code and C++ code are below.

Python代码:

import subprocess
import struct
import sys
import numpy as np

#set up the variables needed 
bytesPerDouble = 8
sizeX = 2000
sizeY = 2000
offset = sizeX*sizeY
totalBytesPerArray = sizeX*sizeY*bytesPerDouble
totalBytes = totalBytesPerArray*2                   #the 2 is because we pass 2 different versions of the 2D array

#setup the testing data array 
a = np.zeros(sizeX*sizeY*2, dtype='d')
for i in range(sizeX):
    for j in range(sizeY):
        a[j+i*sizeY] = i
        a[j+i*sizeY+offset] = i
        if i % 10 == 0:
            a[j+i*sizeY+offset] = j

data = a.tobytes('C')      

strTotalBytes = str(totalBytes)
strLineBytes  = str(sizeY*bytesPerDouble)

#communicate with c++ code
print("starting C++ code")     
command =   "C:\Python27\PythonPipes.exe"
proc = subprocess.Popen([command, strTotalBytes, strLineBytes, str(sizeY), str(sizeX)], stdin=subprocess.PIPE,stderr=subprocess.PIPE,stdout=subprocess.PIPE)

ByteBuffer = (data)
proc.stdin.write(ByteBuffer)

print("Reading results back from C++")
for i in range(sizeX):
    returnvalues = proc.stdout.read(sizeY*bytesPerDouble)
    a = buffer(returnvalues)
    b = struct.unpack_from(str(sizeY)+'d', a)
    print str(b) + " " + str(i)

print('done')

C ++代码:
主函数:

C++ Code: Main function:

int main(int argc, char **argv) {
    int count = 0;
    long totalbytes = stoi(argv[argc-4], nullptr,10);       //bytes being transfered
    long bytechunk = stoi(argv[argc - 3], nullptr, 10); //bytes being transfered at a time
    long height = stoi(argv[argc-2], nullptr, 10);  //bytes being transfered at a time
    long width  = stoi(argv[argc-1], nullptr, 10);  //bytes being transfered at a time
    long offset = totalbytes / sizeof(double) / 2;


    data = new double[totalbytes/sizeof(double)];
    int columnindex = 0;
    //read in data from pipe
    while (count<totalbytes) {

        fread(&(data[columnindex]), 1, bytechunk, stdin);
        columnindex += bytechunk / sizeof(double);
        count += bytechunk;

    }


    //calculate the data transform
    MutualInformation MI = MutualInformation();
    MI.Initialize(data, height, width, offset);
    MI.calcMI();
    count = 0;
    //*
    //write out data to pipe
    columnindex = 0;
    while (count<totalbytes/2) {

        fwrite(&(MI.getOutput()[columnindex]), 1, bytechunk, stdout);
        fflush(stdout);
        count += bytechunk;
        columnindex += bytechunk/sizeof(double);
    }
    //*/
    delete [] data;

    return 0;
}

并且如果需要,实际的处理代码:

and in case you need it the actual processing code:

double MutualInformation::calcMI(){
    double rvalue = 0.0;
    std::map<int, map<int, double>> lHistXY = map<int, map<int, double>>();
    std::map<int, double> lHistX = map<int, double>();
    std::map<int, double> lHistY = map<int, double>();
    typedef std::map<int, std::map<int, double>>::iterator HistXY_iter;
    typedef std::map<int, double>::iterator HistY_iter;

    //calculate Entropys and MI
    double MI = 0.0;
    double Hx = 0.0;
    double Hy = 0.0;
    double Px = 0.0;
    double Py = 0.0;
    double Pxy = 0.0;

    //scan through the image
    int ip = 0;
    int jp = 0;
    int chipsize = 3;

    //setup zero array
    double * zeros = new double[this->mHeight];
    for (int j = 0; j < this->mHeight; j++){
        zeros[j] = 0.0;
    }

    //zero out Output array
    for (int i = 0; i < this->mWidth; i++){
        memcpy(&(this->mOutput[i*this->mHeight]), zeros, this->mHeight*8);
    }


    double index = 0.0;
    for (int ioutter = chipsize; ioutter < (this->mWidth - chipsize); ioutter++){
        //write out processing status
        //index = (double)ioutter;
        //fwrite(&index, 8, 1, stdout);
        //fflush(stdout);
        //*
        for (int j = chipsize; j < (this->mHeight - chipsize); j++){

            //clear the histograms
            lHistX.clear();
            lHistY.clear();
            lHistXY.clear();
            //chip out a section of the image
            for (int k = -chipsize; k <= chipsize; k++){
                for (int l = -chipsize; l <= chipsize; l++){
                    ip = ioutter + k;
                    jp = j + l;
                    //update X histogram
                    if (lHistX.count(int(this->mData[ip*this->mHeight + jp]))){
                        lHistX[int(this->mData[ip*this->mHeight + jp])] += 1.0;
                    }else{
                        lHistX[int(this->mData[ip*this->mHeight + jp])] = 1.0;

                    }
                    //update Y histogram
                    if (lHistY.count(int(this->mData[ip*this->mHeight + jp+this->mOffset]))){
                        lHistY[int(this->mData[ip*this->mHeight + jp+this->mOffset])] += 1.0;
                    }
                    else{
                        lHistY[int(this->mData[ip*this->mHeight + jp+this->mOffset])] = 1.0;

                    }

                    //update X and Y Histogram
                    if (lHistXY.count(int(this->mData[ip*this->mHeight + jp]))){ 
                        //X Key exists check if Y key exists
                        if (lHistXY[int(this->mData[ip*this->mHeight + jp])].count(int(this->mData[ip*this->mHeight + jp + this->mOffset]))){
                            //X & Y keys exist
                            lHistXY[int(this->mData[ip*this->mHeight + jp])][int(this->mData[ip*this->mHeight + jp + this->mOffset])] += 1;
                        }else{
                            //X exist but Y doesn't
                            lHistXY[int(this->mData[ip*this->mHeight + jp])][int(this->mData[ip*this->mHeight + jp + this->mOffset])] = 1;
                        }
                    }else{
                        //X Key Didn't exist
                        lHistXY[int(this->mData[ip*this->mHeight + jp])][int(this->mData[ip*this->mHeight + jp + this->mOffset])] = 1;
                    };
                }
            }

            //calculate PMI, Hx, Hy
            // iterator->first = key
            // iterator->second = value

             MI = 0.0;
             Hx = 0.0;
             Hy = 0.0;

            for (HistXY_iter Hist2D_iter = lHistXY.begin(); Hist2D_iter != lHistXY.end(); Hist2D_iter++) {

                Px = lHistX[Hist2D_iter->first] / ((double) this->mOffset);
                Hx -= Px*log(Px);

                for (HistY_iter HistY_iter = Hist2D_iter->second.begin(); HistY_iter != Hist2D_iter->second.end(); HistY_iter++) {
                    Py = lHistY[HistY_iter->first] / ((double) this->mOffset);
                    Hy -= Py*log(Py);
                    Pxy = HistY_iter->second / ((double) this->mOffset);
                    MI += Pxy*log(Pxy / Py / Px);
                }
            }

            //normalize PMI to max(Hx,Hy) so that the PMI value runs from 0 to 1
            if (Hx >= Hy && Hx > 0.0){
                MI /= Hx;
            }else if(Hy > Hx && Hy > 0.0){
                MI /= Hy;
            }
            else{
                MI = 0.0;
            }

            //write PMI to data output array
            if (MI < 1.1){
                this->mOutput[ioutter*this->mHeight + j] = MI;
            }
            else{
                this->mOutput[ioutter*this->mHeight + j] = 0.0;

            }

        }



    }

    return rvalue;
}

用数组返回有意义的输出像这样:

with arrays that return something that makes sense I get output bounded between 0 and 1 like this:

(0.0,0.0,0.0,0.7160627908692593,0.6376472316395495,0.5728801401524277,...

(0.0, 0.0, 0.0, 0.7160627908692593, 0.6376472316395495, 0.5728801401524277,...

2Kx2K或更高的数组我得到nonesense像这样(即使代码夹在0和1之间的值):

with the 2Kx2K or higher arrays I get nonesense like this (even though the code clamps the values between 0 and 1):

( - 2.2491400820412374e + 228,-2.2491400820412374e + 228,-2.2491400820412374e + 228,-2.2491400820412374e + 228,-2.2491400820412374e + 228,...

(-2.2491400820412374e+228, -2.2491400820412374e+228, -2.2491400820412374e+228, -2.2491400820412374e+228, -2.2491400820412374e+228,...

我想知道为什么这段代码会破坏数据设置它在0.0和1之间分配,以及它是否是管道问题,stdin / stdout问题,某种缓冲问题,或编码问题,我根本看不到。

I would like to know why this code is corrupting the data set after it is assigned between 0.0 and 1, and whether or not it is a piping issue, a stdin/stdout issue, a buffer issue of some sort, or a coding issue I am simply not seeing.

更新 我尝试使用Chris建议的没有运气的代码传递小数据,还注意到我添加了一个catch对于ferror在stdout上,它从来没有跳闸,所以我很确定的字节是至少使它stdout。有可能是别的东西写到stdout不知何故吗?也许一个额外的字节进入stdout,而我的程序运行?我发现这个疑问,因为错误在第10个条目的第4个fwrite读取中一致出现。

Update I tried passing the data in smaller chunks using the code that Chris suggested with no luck. also of note is that I added a catch for ferror on stdout and it never got tripped so I am pretty sure that the bytes are at least making it to stdout. Is it possible that something else is writing to stdout somehow? maybe an extra byte making its way into stdout while my program is running? I find this doubtful as the errors are appearing consistently on the 4th fwrite read in the 10th entry.

Per Craig的请求是完整的C ++代码已发布):它位于3个文件中:

Per Craig's request here is the full C++ code (the full Python Code is already posted): it is sitting in 3 files:

main.cpp

#include <stdio.h>
#include <stdlib.h>
#include <string>
#include <iostream>
#include "./MutualInformation.h"

double * data;
using namespace std;

void
xxwrite(unsigned char *buf, size_t wlen, FILE *fo)
{
    size_t xlen;

    for (; wlen > 0; wlen -= xlen, buf += xlen) {
        xlen = wlen;
        if (xlen > 1024)
            xlen = 1024;
        xlen = fwrite(buf, 1, xlen, fo);
        fflush(fo);
    }
}

int main(int argc, char **argv) {
    int count = 0;
    long totalbytes = stoi(argv[argc-4], nullptr,10);       //bytes being transfered
    long bytechunk = stoi(argv[argc - 3], nullptr, 10); //bytes being transfered at a time
    long height = stoi(argv[argc-2], nullptr, 10);  //bytes being transfered at a time
    long width  = stoi(argv[argc-1], nullptr, 10);  //bytes being transfered at a time
    long offset = totalbytes / sizeof(double) / 2;


    data = new double[totalbytes/sizeof(double)];
    int columnindex = 0;
    //read in data from pipe
    while (count<totalbytes) {

        fread(&(data[columnindex]), 1, bytechunk, stdin);
        columnindex += bytechunk / sizeof(double);
        count += bytechunk;

    }


    //calculate the data transform
    MutualInformation MI = MutualInformation();
    MI.Initialize(data, height, width, offset);
    MI.calcMI();
    count = 0;

    columnindex = 0;
    while (count<totalbytes/2) {

        xxwrite((unsigned char*)&(MI.getOutput()[columnindex]),  bytechunk, stdout);
        count += bytechunk;
        columnindex += bytechunk/sizeof(double);
    }
    delete [] data;

    return 0;
}

MutualInformation.h

MutualInformation.h

#include <map>

using namespace std;

class MutualInformation
{
private:
    double * mData;
    double * mOutput;
    long mHeight;
    long mWidth;
    long mOffset;

public:
    MutualInformation();
    ~MutualInformation();
    bool Initialize(double * data, long Height, long Width, long Offset);
    const double * getOutput();

    double calcMI();

};

MutualInformation.cpp

MutualInformation.cpp

#include "MutualInformation.h"


MutualInformation::MutualInformation()
{
    this->mData = nullptr;
    this->mOutput = nullptr;
    this->mHeight = 0;
    this->mWidth = 0;

}


MutualInformation::~MutualInformation()
{
    delete[] this->mOutput;
}

bool MutualInformation::Initialize(double * data, long Height, long Width, long Offset){
    bool rvalue = false;
    this->mData = data;
    this->mHeight = Height;
    this->mWidth = Width;
    this->mOffset = Offset;


    //allocate output data
    this->mOutput = new double[this->mHeight*this->mWidth];

    return rvalue;
}

const double * MutualInformation::getOutput(){
    return this->mOutput;
}


double MutualInformation::calcMI(){
    double rvalue = 0.0;
    std::map<int, map<int, double>> lHistXY = map<int, map<int, double>>();
    std::map<int, double> lHistX = map<int, double>();
    std::map<int, double> lHistY = map<int, double>();
    typedef std::map<int, std::map<int, double>>::iterator HistXY_iter;
    typedef std::map<int, double>::iterator HistY_iter;

    //calculate Entropys and MI
    double MI = 0.0;
    double Hx = 0.0;
    double Hy = 0.0;
    double Px = 0.0;
    double Py = 0.0;
    double Pxy = 0.0;

    //scan through the image
    int ip = 0;
    int jp = 0;
    int chipsize = 3;

    //setup zero array
    double * zeros = new double[this->mHeight];
    for (int j = 0; j < this->mHeight; j++){
        zeros[j] = 0.0;
    }

    //zero out Output array
    for (int i = 0; i < this->mWidth; i++){
        memcpy(&(this->mOutput[i*this->mHeight]), zeros, this->mHeight*8);
    }


    double index = 0.0;
    for (int ioutter = chipsize; ioutter < (this->mWidth - chipsize); ioutter++){

        for (int j = chipsize; j < (this->mHeight - chipsize); j++){

            //clear the histograms
            lHistX.clear();
            lHistY.clear();
            lHistXY.clear();
            //chip out a section of the image
            for (int k = -chipsize; k <= chipsize; k++){
                for (int l = -chipsize; l <= chipsize; l++){
                    ip = ioutter + k;
                    jp = j + l;
                    //update X histogram
                    if (lHistX.count(int(this->mData[ip*this->mHeight + jp]))){
                        lHistX[int(this->mData[ip*this->mHeight + jp])] += 1.0;
                    }else{
                        lHistX[int(this->mData[ip*this->mHeight + jp])] = 1.0;

                    }
                    //update Y histogram
                    if (lHistY.count(int(this->mData[ip*this->mHeight + jp+this->mOffset]))){
                        lHistY[int(this->mData[ip*this->mHeight + jp+this->mOffset])] += 1.0;
                    }
                    else{
                        lHistY[int(this->mData[ip*this->mHeight + jp+this->mOffset])] = 1.0;

                    }

                    //update X and Y Histogram
                    if (lHistXY.count(int(this->mData[ip*this->mHeight + jp]))){ 
                        //X Key exists check if Y key exists
                        if (lHistXY[int(this->mData[ip*this->mHeight + jp])].count(int(this->mData[ip*this->mHeight + jp + this->mOffset]))){
                            //X & Y keys exist
                            lHistXY[int(this->mData[ip*this->mHeight + jp])][int(this->mData[ip*this->mHeight + jp + this->mOffset])] += 1;
                        }else{
                            //X exist but Y doesn't
                            lHistXY[int(this->mData[ip*this->mHeight + jp])][int(this->mData[ip*this->mHeight + jp + this->mOffset])] = 1;
                        }
                    }else{
                        //X Key Didn't exist
                        lHistXY[int(this->mData[ip*this->mHeight + jp])][int(this->mData[ip*this->mHeight + jp + this->mOffset])] = 1;
                    };
                }
            }

            //calculate PMI, Hx, Hy
            // iterator->first = key
            // iterator->second = value

             MI = 0.0;
             Hx = 0.0;
             Hy = 0.0;

            for (HistXY_iter Hist2D_iter = lHistXY.begin(); Hist2D_iter != lHistXY.end(); Hist2D_iter++) {

                Px = lHistX[Hist2D_iter->first] / ((double) this->mOffset);
                Hx -= Px*log(Px);

                for (HistY_iter HistY_iter = Hist2D_iter->second.begin(); HistY_iter != Hist2D_iter->second.end(); HistY_iter++) {
                    Py = lHistY[HistY_iter->first] / ((double) this->mOffset);
                    Hy -= Py*log(Py);
                    Pxy = HistY_iter->second / ((double) this->mOffset);
                    MI += Pxy*log(Pxy / Py / Px);
                }
            }

            //normalize PMI to max(Hx,Hy) so that the PMI value runs from 0 to 1
            if (Hx >= Hy && Hx > 0.0){
                MI /= Hx;
            }else if(Hy > Hx && Hy > 0.0){
                MI /= Hy;
            }
            else{
                MI = 0.0;
            }

            //write PMI to data output array
            if (MI < 1.1){
                this->mOutput[ioutter*this->mHeight + j] = MI;
            }
            else{
                this->mOutput[ioutter*this->mHeight + j] = 0.0;
                //cout << "problem with output";
            }

        }



    }



    //*/
    return rvalue;
}

由6502解决 >

6502的回答下面解决了我的问题。我需要明确告诉Windows使用stdin / stdout的二进制模式。为此,我不得不在我的主cpp文件中包括2个新的头文件。

6502's answer below solved my problem. I needed to explicitly tell Windows to use a binary mode for stdin / stdout. to do that I had to include 2 new header files in my main cpp file.

#include <fcntl.h>
#include <io.h>

添加以下代码行(从6502的POSIX版本修改,因为Visual Studio抱怨)我的主要功能

add the following lines of code (modified away from 6502's POSIX versions because Visual Studio complained) to the beginning of my main function

_setmode(_fileno(stdout), O_BINARY);
_setmode(_fileno(stdin), O_BINARY);

,然后将这些行添加到我的Python代码:

and then add these lines to my Python code:

import os, msvcrt
msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)
msvcrt.setmode(sys.stdin.fileno(), os.O_BINARY)


推荐答案

问题是,在窗口中的 stdin / 在文本模式下打开,而不是以二进制模式打开,因此当字符13 ( \r )。

The problem is that stdin/stdout in windows are opened in text mode, not in binary mode and therefore will mess up when the character 13 (\r) is sent.



You can set for example binary mode in Python with

import os, msvcrt
msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)
msvcrt.setmode(sys.stdin.fileno(), os.O_BINARY)

和在C ++中使用

_setmode(fileno(stdout), O_BINARY);
_setmode(fileno(stdin), O_BINARY);

请参阅https://msdn.microsoft.com/en-us/library/tw4k6df8.aspx

这篇关于数据损坏C ++和Python之间的管道的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆