具有重复键的嵌套 JSON [英] Nested JSON with duplicate keys

查看:24
本文介绍了具有重复键的嵌套 JSON的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我每天必须使用 NiFi(1.9 版)处理 100 亿条嵌套的 JSON 记录.作为工作的一部分,我尝试使用 Groovy 脚本将嵌套的 JSON 转换为 csv.我参考了以下与同一主题相关的堆栈溢出问题,并提出了以下代码.

I will have to process 10 billion Nested JSON records per day using NiFi (version 1.9). As part of the job, am trying to convert the nested JSON to csv using Groovy script. I referred the below Stack Overflow questions related to the same topic and came up with the below code.

Groovy 从地图和子地图收集

如何将json转换成键值完全使用 groovy 配对

但我不确定如何检索重复键的值.示例 json 在以下代码中的变量json"中定义.键Flag1"将出现在多个部分(即OF"和SF").我想获得 csv 格式的输出.

But am not sure how to retrieve the value of duplicate keys. Sample json is defined in the variable "json" in the below code. key "Flag1" will be coming in multiple sections (i.e., "OF" & "SF"). I want to get the output as csv.

以下是我执行以下常规代码时的输出 2019-10-08 22:33:29.244000,v12,-,36178,0,0/0,10.65.5.56,sf,sf (flag1 键值被该键列的最后一次出现值替换)

Below is the output if I execute the below groovy code 2019-10-08 22:33:29.244000,v12,-,36178,0,0/0,10.65.5.56,sf,sf (flag1 key value is replaced by that key column's last occurrence value)

我不是 Groovy 专家.还请建议是否有其他更好的方法,以便我尝试.

I am not an expert in Groovy. Also please suggest if there is any other better approach, so that I will give a try.

    import groovy.json.*

    def json = '{"transaction":{"TS":"2019-10-08 22:33:29.244000","CIPG":{"CIP":"10.65.5.56","CP":"0"},"OF":{"Flag1":"of","Flag2":"-"},"SF":{"Flag1":"sf","Flag2":"-"}}'

    def jsonReplace = json.replace('{"transaction":{','{"transaction":[{').replace('}}}','}}]}')
        def jsonRecord = new JsonSlurper().parseText(jsonReplace)
        def columns = ["TS","V","PID","RS","SR","CnID","CIP","Flag1","Flag1"]

        def flatten
        flatten = { row ->
            def flattened = [:]
            row.each { k, v ->
                if (v instanceof Map) {
                    flattened << flatten(v)
                } else if (v instanceof Collection && v.every {it instanceof Map}) {
                    v.each { flattened << flatten(it) }
                } else {
                    flattened[k] = v
                }
            }
            flattened
        }
        print "output: " + jsonRecord.transaction.collect {row -> columns.collect {colName -> flatten(row)[colName]}.join(',')}.join('\n')

根据@cfrick 和@stck 的回复,我已经尝试了该选项并在下面提出了后续问题.

Based on the reply from @cfrick and @stck, I have tried the option and have follow up question below.

@cfrick 和 @stck- 感谢您的回复.

@cfrick and @stck- Thanks for your response.

  1. 原始源 JSON 记录将有 100 多列,我在 NiFi 中使用InvokeScriptedProcessor"来触发 Groovy 脚本.
  2. 下面是我在InvokeScriptedProcessor"中使用的原始 Groovy 脚本,我在其中使用了 Streams(inputstream, outputstream).你指的是这个吗.我做错了什么吗?

import groovy.json.JsonSlurper
class customJSONtoCSV implements Processor {
def REL_SUCCESS = new Relationship.Builder().name("success").description("FlowFiles that were successfully processed").build();

    def log
    static def flatten(row, prefix="") {
            def flattened = new HashMap<String, String>()
                            row.each { String k, Object v ->
            def key = prefix ? prefix + "_" + k : k;

            if (v instanceof Map) {
                flattened.putAll(flatten(v, k))
            } else {
                flattened.put(key, v.toString())
            }
        }
        return flattened
    }
        static def toCSVRow(HashMap row) {
        def columns = ["CIPG_CIP","CIPG_CP","CIPG_SLP","CIPG_SLEP","CIPG_CVID","SIPG_SIP","SIPG_SP","SIPG_InP","SIPG_SVID","TG_T","TG_R","TG_C","TG_SDL","DL","I_R","UAP","EDBL","Ca","A","RQM","RSM","FIT","CSR","OF_Flag1","OF_Flag2","OF_Flag3","OF_Flag4","OF_Flag5","OF_Flag6","OF_Flag7","OF_Flag8","OF_Flag9","OF_Flag10","OF_Flag11","OF_Flag12","OF_Flag13","OF_Flag14","OF_Flag15","OF_Flag16","OF_Flag17","OF_Flag18","OF_Flag19","OF_Flag20","OF_Flag21","OF_Flag22","OF_Flag23","SF_Flag1","SF_Flag2","SF_Flag3","SF_Flag4","SF_Flag5","SF_Flag6","SF_Flag7","SF_Flag8","SF_Flag9","SF_Flag10","SF_Flag11","SF_Flag12","SF_Flag13","SF_Flag14","SF_Flag15","SF_Flag16","SF_Flag17","SF_Flag18","SF_Flag19","SF_Flag20","SF_Flag21","SF_Flag22","SF_Flag23","SF_Flag24","GF_Flag1","GF_Flag2","GF_Flag3","GF_Flag4","GF_Flag5","GF_Flag6","GF_Flag7","GF_Flag8","GF_Flag9","GF_Flag10","GF_Flag11","GF_Flag12","GF_Flag13","GF_Flag14","GF_Flag15","GF_Flag16","GF_Flag17","GF_Flag18","GF_Flag19","GF_Flag20","GF_Flag21","GF_Flag22","GF_Flag23","GF_Flag24","GF_Flag25","GF_Flag26","GF_Flag27","GF_Flag28","GF_Flag29","GF_Flag30","GF_Flag31","GF_Flag32","GF_Flag33","GF_Flag34","GF_Flag35","VSL_VSID","VSL_TC","VSL_MTC","VSL_NRTC","VSL_ET","VSL_HRES","VSL_VRES","VSL_FS","VSL_FR","VSL_VSD","VSL_ACB","VSL_ASB","VSL_VPR","VSL_VSST","HRU_HM","HRU_HD","HRU_HP","HRU_HQ","URLF_CID","URLF_CGID","URLF_CR","URLF_RA","URLF_USM","URLF_USP","URLF_MUS","TCPSt_WS","TCPSt_SE","TCPSt_WSFNS","TCPSt_WSF","TCPSt_EM","TCPSt_RSTE","TCPSt_MSS","NS_OPID","NS_ODID","NS_EPID","NS_TrID","NS_VSN","NS_LSUT","NS_STTS","NS_TCPPR","CQA_NL","CQA_CL","CQA_CLC","CQA_SQ","CQA_SQC","TS","V","PID","RS","SR","CnID","A_S","OS","CPr","CVB","CS","HS","SUNR","SUNS","ML","MT","TCPSL","CT","MS","MSH","SID","SuID","UA","DID","UAG","CID","HR","CRG","CP1","CP2","AIDF","UCB","CLID","CLCL","OPTS","PUAG","SSLIL"]

            return columns.collect { column ->
            return row.containsKey(column) ? row.get(column) : ""
        }.join(',')
    }
    @Override
    void initialize(ProcessorInitializationContext context) {
        log = context.getLogger()
    }
    @Override

    Set<Relationship> getRelationships() {
        return [REL_SUCCESS] as Set
    }
    @Override
    void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
        try {

            def session = sessionFactory.createSession()
            def flowFile = session.get()
            if (!flowFile) return
            flowFile = session.write(flowFile,
                    { inputStream, outputStream ->
                        def bufferedReader = new BufferedReader(new InputStreamReader(inputStream, 'UTF-8'))

        def jsonSlurper = new JsonSlurper()
        def line
        def header = "CIPG_CIP,CIPG_CP,CIPG_SLP,CIPG_SLEP,CIPG_CVID,SIPG_SIP,SIPG_SP,SIPG_InP,SIPG_SVID,TG_T,TG_R,TG_C,TG_SDL,DL,I_R,UAP,EDBL,Ca,A,RQM,RSM,FIT,CSR,OF_Flag1,OF_Flag2,OF_Flag3,OF_Flag4,OF_Flag5,OF_Flag6,OF_Flag7,OF_Flag8,OF_Flag9,OF_Flag10,OF_Flag11,OF_Flag12,OF_Flag13,OF_Flag14,OF_Flag15,OF_Flag16,OF_Flag17,OF_Flag18,OF_Flag19,OF_Flag20,OF_Flag21,OF_Flag22,OF_Flag23,SF_Flag1,SF_Flag2,SF_Flag3,SF_Flag4,SF_Flag5,SF_Flag6,SF_Flag7,SF_Flag8,SF_Flag9,SF_Flag10,SF_Flag11,SF_Flag12,SF_Flag13,SF_Flag14,SF_Flag15,SF_Flag16,SF_Flag17,SF_Flag18,SF_Flag19,SF_Flag20,SF_Flag21,SF_Flag22,SF_Flag23,SF_Flag24,GF_Flag1,GF_Flag2,GF_Flag3,GF_Flag4,GF_Flag5,GF_Flag6,GF_Flag7,GF_Flag8,GF_Flag9,GF_Flag10,GF_Flag11,GF_Flag12,GF_Flag13,GF_Flag14,GF_Flag15,GF_Flag16,GF_Flag17,GF_Flag18,GF_Flag19,GF_Flag20,GF_Flag21,GF_Flag22,GF_Flag23,GF_Flag24,GF_Flag25,GF_Flag26,GF_Flag27,GF_Flag28,GF_Flag29,GF_Flag30,GF_Flag31,GF_Flag32,GF_Flag33,GF_Flag34,GF_Flag35,VSL_VSID,VSL_TC,VSL_MTC,VSL_NRTC,VSL_ET,VSL_HRES,VSL_VRES,VSL_FS,VSL_FR,VSL_VSD,VSL_ACB,VSL_ASB,VSL_VPR,VSL_VSST,HRU_HM,HRU_HD,HRU_HP,HRU_HQ,URLF_CID,URLF_CGID,URLF_CR,URLF_RA,URLF_USM,URLF_USP,URLF_MUS,TCPSt_WS,TCPSt_SE,TCPSt_WSFNS,TCPSt_WSF,TCPSt_EM,TCPSt_RSTE,TCPSt_MSS,NS_OPID,NS_ODID,NS_EPID,NS_TrID,NS_VSN,NS_LSUT,NS_STTS,NS_TCPPR,CQA_NL,CQA_CL,CQA_CLC,CQA_SQ,CQA_SQC,TS,V,PID,RS,SR,CnID,A_S,OS,CPr,CVB,CS,HS,SUNR,SUNS,ML,MT,TCPSL,CT,MS,MSH,SID,SuID,UA,DID,UAG,CID,HR,CRG,CP1,CP2,AIDF,UCB,CLID,CLCL,OPTS,PUAG,SSLIL"

        outputStream.write("${header}\n".getBytes('UTF-8'))
        while (line = bufferedReader.readLine()) {

        def jsonReplace = line.replace('{"transaction":{','{"transaction":[{').replace('}}}','}}]}')
        def jsonRecord = new JsonSlurper().parseText(jsonReplace)
        def a = jsonRecord.transaction.collect { row ->
                return flatten(row)
                }.collect { row ->
                return toCSVRow(row)
                }

        outputStream.write("${a}\n".getBytes('UTF-8'))

        }
 } as StreamCallback)

            session.transfer(flowFile, REL_SUCCESS)
            session.commit()
        }
        catch (e) {
            throw new ProcessException(e)
        }
    }

    @Override
    Collection<ValidationResult> validate(ValidationContext context) { return null }

    @Override
    PropertyDescriptor getPropertyDescriptor(String name) { return null }

    @Override

    void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }

    @Override

   List<PropertyDescriptor> getPropertyDescriptors() { 

return [] as List
}
    @Override

    String getIdentifier() { return null }
}
processor = new customJSONtoCSV() 

  1. 如果我不应该使用收集",那么我还需要使用什么来创建行.
  2. 在输出流文件中,记录输出进入 [].我尝试了以下但它不起作用.不确定我是否在做正确的事情.我想要没有 []
  3. 的 csv 输出

return toCSVRow(row).toString()

推荐答案

这个想法是修改flatten"方法——它应该通过提供父键作为前缀来区分相同的嵌套键.我稍微简化了代码:

The idea is to modify "flatten" method - it should differentiate between same nested keys by providing parent key as a prefix. I've simplified code a bit:

import groovy.json.*

def json = '{"transaction":{"TS":"2019-10-08 22:33:29.244000","CIPG":{"CIP":"10.65.5.56","CP":"0"},"OF":{"Flag1":"of","Flag2":"-"},"SF":{"Flag1":"sf","Flag2":"-"}}'
def jsonReplace = json.replace('{"transaction":{','{"transaction":[{').replace('}}','}}]')
def jsonRecord = new JsonSlurper().parseText(jsonReplace)

static def flatten(row, prefix="") {
    def flattened = new HashMap<String, String>()
    row.each { String k, Object v ->
        def key = prefix ? prefix + "." + k : k;

        if (v instanceof Map) {
            flattened.putAll(flatten(v, k))
        } else {
            flattened.put(key, v.toString())
        }
    }

    return flattened
}

static def toCSVRow(HashMap row) {
    def columns = ["TS","V","PID","RS","SR","CnID","CIP","OF.Flag1","SF.Flag1"] // Last 2 keys have changed!

    return columns.collect { column ->
        return row.containsKey(column) ? row.get(column) : ""
    }.join(', ')
}

def a = jsonRecord.transaction.collect { row ->
    return flatten(row)
}.collect { row ->
    return toCSVRow(row)
}.join('\n')

println a

输出将是:

2019-10-08 22:33:29.244000, , , , , , , of, sf

这篇关于具有重复键的嵌套 JSON的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆