如何处理一个平面文件,JSON字符串作为每一行的一部分,转换为CSV文件使用PIG Loader? [英] How to process a flat file with JSON string as a part of each line, into CSV file Using PIG Loader?

查看:157
本文介绍了如何处理一个平面文件,JSON字符串作为每一行的一部分,转换为CSV文件使用PIG Loader?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在HDFS中有一个档案为

I have a file in HDFS as


44,UK,{names:{name1:John name2:marry,name3:stuart},fruits:{fruit1:apple,fruit2:orange}},31-07-2016

44,UK,{"names":{"name1":"John","name2":"marry","name3":"stuart"},"fruits":{"fruit1":"apple","fruit2":"orange"}},31-07-2016

91,INDIA,{names:{name1:Ram,name2:Sam},fruits:{}},31-07-2016

91,INDIA,{"names":{"name1":"Ram","name2":"Sam"},"fruits":{}},31-07-2016

并要使用PIG载入程式将其储存到SCV档案中:

and want to store this into a SCV file as below using PIG loader :


44,英国,名字,name1,约翰,31-07-2016

44,英国,名字,name2,Marry,31-07-2016

..

44,UK,fruit,fruit1,apple,31-07-2016

..

91 ,INDIA,names,name1,Ram,31-07-2016

..

91,印度,null,null,Ram,31-07-2016

44,UK,names,name1,John,31-07-2016
44,UK,names,name2,Marry,31-07-2016
..
44,UK,fruit,fruit1,apple,31-07-2016
..
91,INDIA,names,name1,Ram,31-07-2016
..
91,INDIA,null,null,Ram,31-07-2016

此PIG脚本应该是什么?

What should be the PIG script for this ?

推荐答案

因为你的记录不是一个合适的JSON字符串任何json storer / loader将不会帮助你。编写UDF将是一个更简单的方法。

Since your record is not a proper JSON string any json storer/loader will not help you. Writing a UDF will be a simpler approach.

更新方法1: -

如果您将输入转换为标签,UDF和PIG脚本分隔的文件。

Below UDF and PIG script will work if you are converting your input to tab separated file.

UDF: -

package com.test.udf;

import org.apache.commons.lang3.StringUtils;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.TypeReference;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
*input format :-
*  {"names":{"name1":"John","name2":"marry","name3":"stuart"},"fruits":    {"fruit1":"apple","fruit2":"orange"}}
*/
public class jsonToTuples extends EvalFunc<DataBag> {
ObjectMapper objectMapper = new ObjectMapper();
TypeReference typeRef = new TypeReference<HashMap<String, Object>>() {
};

@Override
public DataBag exec(Tuple input) throws IOException {
    if (input == null || input.size() == 0) {
        return null;
    } else {
        String jsonRecord = (String) input.get(0);
        if (StringUtils.isNotBlank(jsonRecord)) {
            try {
                List<String> recordList = new ArrayList<String>();
                Map<String, Object> jsonDataMap = objectMapper.readValue(jsonRecord, typeRef);
                if(jsonDataMap.get("names") != null) {
                    Map<String, String> namesDataMap  = (Map<String, String>) jsonDataMap.get("names");
                    for(String key : namesDataMap.keySet()){
                        recordList.add("names" + "," + key + "," + namesDataMap.get(key));
                    }

                }
                if(jsonDataMap.get("fruits") != null) {
                    Map<String, String> fruitsDataMap  = (Map<String, String>) jsonDataMap.get("fruits");
                    for(String key : fruitsDataMap.keySet()){
                        recordList.add("fruits" + "," + key + "," + fruitsDataMap.get(key));
                    }

                }
                DataBag outputBag = BagFactory.getInstance().newDefaultBag();
                for( int i = 0 ; i < recordList.size() ; i++){
                    Tuple outputTuple = TupleFactory.getInstance().newTuple(1);
                    outputTuple.set(0 , recordList.get(i));
                    outputBag.add(outputTuple);
                }

                return outputBag;
            }catch(Exception e){
                System.out.println("caught exception for ");
                e.printStackTrace();
                return null;
            }
        }
    }
    return null;
    }
}

PIG SCRIPT: -

PIG SCRIPT :-

register 'testUDF.jar' ;
A = load 'data.txt' using PigStorage() as (id:chararray , country:chararray , record:chararray , date:chararray);
B = Foreach A generate id, country , FLATTEN(com.test.udf.jsonToTuples(record)) , date ;
dump B ;

旧方法: -

如果我在下面的注释中尝试使用split在UDF中的魔法,那么我将使用UDF来读取你的记录。分隔您的字段。我没有测试,但这里是我可以在我的UDF尝试: -

As mentined in my below comment try to use magic of split in UDF to separate your fields. I have not tested but here is what I may try in my UDF :-

(请注意,我不知道这是最好的选择 - 你可能想改善它)

(please note that I am not sure this is best option - you may want to improve it further.)

String[] strSplit = ((String) input.get(0)).split("," , 3);
String id = strSplit[0] ;
String country = strSplit[1] ;
String jsonWithDate = strSplit[2] ;

String[] datePart =  ((String) input.get(0)).split(",");    
String date = datePart[datePart.length-1];

/**
 * above jsonWithDate should look like -
 * {"names":{"name1":"Ram","name2":"Sam"},"fruits":{}},31-07-2016
 * 
*/
String jsonString = jsonWithDate.replace(date,"").replace(",$", "");

/**
* now use some parser or object mapper to convert jsonString to desired list of values.
*/

这篇关于如何处理一个平面文件,JSON字符串作为每一行的一部分,转换为CSV文件使用PIG Loader?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆