如何使用 PIG Loader 将带有 JSON 字符串作为每行一部分的平面文件处理为 CSV 文件? [英] How to process a flat file with JSON string as a part of each line, into CSV file Using PIG Loader?

查看:20
本文介绍了如何使用 PIG Loader 将带有 JSON 字符串作为每行一部分的平面文件处理为 CSV 文件?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在 HDFS 中有一个文件

I have a file in HDFS as

44,UK,{"names":{"name1":"John","name2":"marry","name3":"stuart"},"fruits":{"fruit1":"apple","fruit2":"orange"}},31-07-2016

44,UK,{"names":{"name1":"John","name2":"marry","name3":"stuart"},"fruits":{"fruit1":"apple","fruit2":"orange"}},31-07-2016

91,INDIA,{"names":{"name1":"Ram","name2":"Sam"},"fruits":{}},31-07-2016

91,INDIA,{"names":{"name1":"Ram","name2":"Sam"},"fruits":{}},31-07-2016

并希望使用 PIG 加载器将其存储到 SCV 文件中,如下所示:

and want to store this into a SCV file as below using PIG loader :

44,UK,names,name1,John,31-07-2016
44,UK,names,name2,Marry,31-07-2016
..
44,UK,fruit,fruit1,apple,31-07-2016
..
91,INDIA,names,name1,Ram,31-07-2016
..
91,INDIA,null,null,Ram,31-07-2016

44,UK,names,name1,John,31-07-2016
44,UK,names,name2,Marry,31-07-2016
..
44,UK,fruit,fruit1,apple,31-07-2016
..
91,INDIA,names,name1,Ram,31-07-2016
..
91,INDIA,null,null,Ram,31-07-2016

这个 PIG 脚本应该是什么?

What should be the PIG script for this ?

推荐答案

由于您的记录不是正确的 JSON 字符串,因此任何 json 存储器/加载器都不会帮助您.编写 UDF 将是一种更简单的方法.

Since your record is not a proper JSON string any json storer/loader will not help you. Writing a UDF will be a simpler approach.

更新方法 1 :-

如果您将输入转换为制表符分隔的文件,下面的 UDF 和 PIG 脚本将起作用.

Below UDF and PIG script will work if you are converting your input to tab separated file.

UDF :-

package com.test.udf;

import org.apache.commons.lang3.StringUtils;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.TypeReference;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
*input format :-
*  {"names":{"name1":"John","name2":"marry","name3":"stuart"},"fruits":    {"fruit1":"apple","fruit2":"orange"}}
*/
public class jsonToTuples extends EvalFunc<DataBag> {
ObjectMapper objectMapper = new ObjectMapper();
TypeReference typeRef = new TypeReference<HashMap<String, Object>>() {
};

@Override
public DataBag exec(Tuple input) throws IOException {
    if (input == null || input.size() == 0) {
        return null;
    } else {
        String jsonRecord = (String) input.get(0);
        if (StringUtils.isNotBlank(jsonRecord)) {
            try {
                List<String> recordList = new ArrayList<String>();
                Map<String, Object> jsonDataMap = objectMapper.readValue(jsonRecord, typeRef);
                if(jsonDataMap.get("names") != null) {
                    Map<String, String> namesDataMap  = (Map<String, String>) jsonDataMap.get("names");
                    for(String key : namesDataMap.keySet()){
                        recordList.add("names" + "," + key + "," + namesDataMap.get(key));
                    }

                }
                if(jsonDataMap.get("fruits") != null) {
                    Map<String, String> fruitsDataMap  = (Map<String, String>) jsonDataMap.get("fruits");
                    for(String key : fruitsDataMap.keySet()){
                        recordList.add("fruits" + "," + key + "," + fruitsDataMap.get(key));
                    }

                }
                DataBag outputBag = BagFactory.getInstance().newDefaultBag();
                for( int i = 0 ; i < recordList.size() ; i++){
                    Tuple outputTuple = TupleFactory.getInstance().newTuple(1);
                    outputTuple.set(0 , recordList.get(i));
                    outputBag.add(outputTuple);
                }

                return outputBag;
            }catch(Exception e){
                System.out.println("caught exception for ");
                e.printStackTrace();
                return null;
            }
        }
    }
    return null;
    }
}

猪脚本:-

register 'testUDF.jar' ;
A = load 'data.txt' using PigStorage() as (id:chararray , country:chararray , record:chararray , date:chararray);
B = Foreach A generate id, country , FLATTEN(com.test.udf.jsonToTuples(record)) , date ;
dump B ;

旧方法:-

下面我将提到我将在 UDF 中使用以逗号分隔的方式来读取您的记录.

Below I am mentioning the way I will use in my UDF to read your record if it is comma separated.

正如我在下面的评论中提到的,尝试使用 UDF 中的 split 魔法来分隔您的字段.我还没有测试过,但这是我可以在我的 UDF 中尝试的:-

As mentined in my below comment try to use magic of split in UDF to separate your fields. I have not tested but here is what I may try in my UDF :-

(请注意,我不确定这是最佳选择 - 您可能希望进一步改进它.)

(please note that I am not sure this is best option - you may want to improve it further.)

String[] strSplit = ((String) input.get(0)).split("," , 3);
String id = strSplit[0] ;
String country = strSplit[1] ;
String jsonWithDate = strSplit[2] ;

String[] datePart =  ((String) input.get(0)).split(",");    
String date = datePart[datePart.length-1];

/**
 * above jsonWithDate should look like -
 * {"names":{"name1":"Ram","name2":"Sam"},"fruits":{}},31-07-2016
 * 
*/
String jsonString = jsonWithDate.replace(date,"").replace(",$", "");

/**
* now use some parser or object mapper to convert jsonString to desired list of values.
*/

这篇关于如何使用 PIG Loader 将带有 JSON 字符串作为每行一部分的平面文件处理为 CSV 文件?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆