具有重复键的嵌套JSON [英] Nested JSON with duplicate keys

查看:51
本文介绍了具有重复键的嵌套JSON的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我每天必须使用NiFi(1.9版)处理100亿个嵌套JSON记录.作为工作的一部分,我正在尝试使用Groovy脚本将嵌套的JSON转换为csv.我引用了以下与同一主题相关的Stack Overflow问题,并提出了以下代码.

I will have to process 10 billion Nested JSON records per day using NiFi (version 1.9). As part of the job, am trying to convert the nested JSON to csv using Groovy script. I referred the below Stack Overflow questions related to the same topic and came up with the below code.

Groovy从地图和子地图中收集

如何将json转换为键值完全使用groovy配对

但是不确定如何检索重复键的值.示例json在以下代码的变量"json"中定义.键"Flag1"将出现在多个部分(即"OF"和"SF").我想将输出作为csv.

But am not sure how to retrieve the value of duplicate keys. Sample json is defined in the variable "json" in the below code. key "Flag1" will be coming in multiple sections (i.e., "OF" & "SF"). I want to get the output as csv.

如果执行下面的常规代码,则输出如下 2019-10-08 22:33:29.244000,v12,-,36178,0,0/0,10.65.5.56,sf,sf (flag1键值被该键列的最后一次出现值替换)

Below is the output if I execute the below groovy code 2019-10-08 22:33:29.244000,v12,-,36178,0,0/0,10.65.5.56,sf,sf (flag1 key value is replaced by that key column's last occurrence value)

我不是Groovy的专家.另外,请提出是否还有其他更好的方法,以便我尝试一下.

I am not an expert in Groovy. Also please suggest if there is any other better approach, so that I will give a try.

    import groovy.json.*

    def json = '{"transaction":{"TS":"2019-10-08 22:33:29.244000","CIPG":{"CIP":"10.65.5.56","CP":"0"},"OF":{"Flag1":"of","Flag2":"-"},"SF":{"Flag1":"sf","Flag2":"-"}}'

    def jsonReplace = json.replace('{"transaction":{','{"transaction":[{').replace('}}}','}}]}')
        def jsonRecord = new JsonSlurper().parseText(jsonReplace)
        def columns = ["TS","V","PID","RS","SR","CnID","CIP","Flag1","Flag1"]

        def flatten
        flatten = { row ->
            def flattened = [:]
            row.each { k, v ->
                if (v instanceof Map) {
                    flattened << flatten(v)
                } else if (v instanceof Collection && v.every {it instanceof Map}) {
                    v.each { flattened << flatten(it) }
                } else {
                    flattened[k] = v
                }
            }
            flattened
        }
        print "output: " + jsonRecord.transaction.collect {row -> columns.collect {colName -> flatten(row)[colName]}.join(',')}.join('\n')

基于@cfrick和@stck的回复,我已经尝试了该选项,并在下面提出了后续问题.

Based on the reply from @cfrick and @stck, I have tried the option and have follow up question below.

@cfrick和@ stck-感谢您的回复.

@cfrick and @stck- Thanks for your response.

  1. 原始JSON记录将包含100多个列,而我正在NiFi中使用"InvokeScriptedProcessor"来触发Groovy脚本.
  2. 下面是在"InvokeScriptedProcessor"中使用的原始Groovy脚本,其中我使用了Streams(inputstream,outputstream).这就是您所指的. 我做错什么了吗?
  1. Original source JSON record will have more than 100 columns and I am using "InvokeScriptedProcessor" in NiFi to trigger the Groovy script.
  2. Below is the original Groovy script am using in "InvokeScriptedProcessor" in which I have used Streams(inputstream, outputstream). Is this what you are referring. Am I doing anything wrong?

import groovy.json.JsonSlurper
class customJSONtoCSV implements Processor {
def REL_SUCCESS = new Relationship.Builder().name("success").description("FlowFiles that were successfully processed").build();

    def log
    static def flatten(row, prefix="") {
            def flattened = new HashMap<String, String>()
                            row.each { String k, Object v ->
            def key = prefix ? prefix + "_" + k : k;

            if (v instanceof Map) {
                flattened.putAll(flatten(v, k))
            } else {
                flattened.put(key, v.toString())
            }
        }
        return flattened
    }
        static def toCSVRow(HashMap row) {
        def columns = ["CIPG_CIP","CIPG_CP","CIPG_SLP","CIPG_SLEP","CIPG_CVID","SIPG_SIP","SIPG_SP","SIPG_InP","SIPG_SVID","TG_T","TG_R","TG_C","TG_SDL","DL","I_R","UAP","EDBL","Ca","A","RQM","RSM","FIT","CSR","OF_Flag1","OF_Flag2","OF_Flag3","OF_Flag4","OF_Flag5","OF_Flag6","OF_Flag7","OF_Flag8","OF_Flag9","OF_Flag10","OF_Flag11","OF_Flag12","OF_Flag13","OF_Flag14","OF_Flag15","OF_Flag16","OF_Flag17","OF_Flag18","OF_Flag19","OF_Flag20","OF_Flag21","OF_Flag22","OF_Flag23","SF_Flag1","SF_Flag2","SF_Flag3","SF_Flag4","SF_Flag5","SF_Flag6","SF_Flag7","SF_Flag8","SF_Flag9","SF_Flag10","SF_Flag11","SF_Flag12","SF_Flag13","SF_Flag14","SF_Flag15","SF_Flag16","SF_Flag17","SF_Flag18","SF_Flag19","SF_Flag20","SF_Flag21","SF_Flag22","SF_Flag23","SF_Flag24","GF_Flag1","GF_Flag2","GF_Flag3","GF_Flag4","GF_Flag5","GF_Flag6","GF_Flag7","GF_Flag8","GF_Flag9","GF_Flag10","GF_Flag11","GF_Flag12","GF_Flag13","GF_Flag14","GF_Flag15","GF_Flag16","GF_Flag17","GF_Flag18","GF_Flag19","GF_Flag20","GF_Flag21","GF_Flag22","GF_Flag23","GF_Flag24","GF_Flag25","GF_Flag26","GF_Flag27","GF_Flag28","GF_Flag29","GF_Flag30","GF_Flag31","GF_Flag32","GF_Flag33","GF_Flag34","GF_Flag35","VSL_VSID","VSL_TC","VSL_MTC","VSL_NRTC","VSL_ET","VSL_HRES","VSL_VRES","VSL_FS","VSL_FR","VSL_VSD","VSL_ACB","VSL_ASB","VSL_VPR","VSL_VSST","HRU_HM","HRU_HD","HRU_HP","HRU_HQ","URLF_CID","URLF_CGID","URLF_CR","URLF_RA","URLF_USM","URLF_USP","URLF_MUS","TCPSt_WS","TCPSt_SE","TCPSt_WSFNS","TCPSt_WSF","TCPSt_EM","TCPSt_RSTE","TCPSt_MSS","NS_OPID","NS_ODID","NS_EPID","NS_TrID","NS_VSN","NS_LSUT","NS_STTS","NS_TCPPR","CQA_NL","CQA_CL","CQA_CLC","CQA_SQ","CQA_SQC","TS","V","PID","RS","SR","CnID","A_S","OS","CPr","CVB","CS","HS","SUNR","SUNS","ML","MT","TCPSL","CT","MS","MSH","SID","SuID","UA","DID","UAG","CID","HR","CRG","CP1","CP2","AIDF","UCB","CLID","CLCL","OPTS","PUAG","SSLIL"]

            return columns.collect { column ->
            return row.containsKey(column) ? row.get(column) : ""
        }.join(',')
    }
    @Override
    void initialize(ProcessorInitializationContext context) {
        log = context.getLogger()
    }
    @Override

    Set<Relationship> getRelationships() {
        return [REL_SUCCESS] as Set
    }
    @Override
    void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
        try {

            def session = sessionFactory.createSession()
            def flowFile = session.get()
            if (!flowFile) return
            flowFile = session.write(flowFile,
                    { inputStream, outputStream ->
                        def bufferedReader = new BufferedReader(new InputStreamReader(inputStream, 'UTF-8'))

        def jsonSlurper = new JsonSlurper()
        def line
        def header = "CIPG_CIP,CIPG_CP,CIPG_SLP,CIPG_SLEP,CIPG_CVID,SIPG_SIP,SIPG_SP,SIPG_InP,SIPG_SVID,TG_T,TG_R,TG_C,TG_SDL,DL,I_R,UAP,EDBL,Ca,A,RQM,RSM,FIT,CSR,OF_Flag1,OF_Flag2,OF_Flag3,OF_Flag4,OF_Flag5,OF_Flag6,OF_Flag7,OF_Flag8,OF_Flag9,OF_Flag10,OF_Flag11,OF_Flag12,OF_Flag13,OF_Flag14,OF_Flag15,OF_Flag16,OF_Flag17,OF_Flag18,OF_Flag19,OF_Flag20,OF_Flag21,OF_Flag22,OF_Flag23,SF_Flag1,SF_Flag2,SF_Flag3,SF_Flag4,SF_Flag5,SF_Flag6,SF_Flag7,SF_Flag8,SF_Flag9,SF_Flag10,SF_Flag11,SF_Flag12,SF_Flag13,SF_Flag14,SF_Flag15,SF_Flag16,SF_Flag17,SF_Flag18,SF_Flag19,SF_Flag20,SF_Flag21,SF_Flag22,SF_Flag23,SF_Flag24,GF_Flag1,GF_Flag2,GF_Flag3,GF_Flag4,GF_Flag5,GF_Flag6,GF_Flag7,GF_Flag8,GF_Flag9,GF_Flag10,GF_Flag11,GF_Flag12,GF_Flag13,GF_Flag14,GF_Flag15,GF_Flag16,GF_Flag17,GF_Flag18,GF_Flag19,GF_Flag20,GF_Flag21,GF_Flag22,GF_Flag23,GF_Flag24,GF_Flag25,GF_Flag26,GF_Flag27,GF_Flag28,GF_Flag29,GF_Flag30,GF_Flag31,GF_Flag32,GF_Flag33,GF_Flag34,GF_Flag35,VSL_VSID,VSL_TC,VSL_MTC,VSL_NRTC,VSL_ET,VSL_HRES,VSL_VRES,VSL_FS,VSL_FR,VSL_VSD,VSL_ACB,VSL_ASB,VSL_VPR,VSL_VSST,HRU_HM,HRU_HD,HRU_HP,HRU_HQ,URLF_CID,URLF_CGID,URLF_CR,URLF_RA,URLF_USM,URLF_USP,URLF_MUS,TCPSt_WS,TCPSt_SE,TCPSt_WSFNS,TCPSt_WSF,TCPSt_EM,TCPSt_RSTE,TCPSt_MSS,NS_OPID,NS_ODID,NS_EPID,NS_TrID,NS_VSN,NS_LSUT,NS_STTS,NS_TCPPR,CQA_NL,CQA_CL,CQA_CLC,CQA_SQ,CQA_SQC,TS,V,PID,RS,SR,CnID,A_S,OS,CPr,CVB,CS,HS,SUNR,SUNS,ML,MT,TCPSL,CT,MS,MSH,SID,SuID,UA,DID,UAG,CID,HR,CRG,CP1,CP2,AIDF,UCB,CLID,CLCL,OPTS,PUAG,SSLIL"

        outputStream.write("${header}\n".getBytes('UTF-8'))
        while (line = bufferedReader.readLine()) {

        def jsonReplace = line.replace('{"transaction":{','{"transaction":[{').replace('}}}','}}]}')
        def jsonRecord = new JsonSlurper().parseText(jsonReplace)
        def a = jsonRecord.transaction.collect { row ->
                return flatten(row)
                }.collect { row ->
                return toCSVRow(row)
                }

        outputStream.write("${a}\n".getBytes('UTF-8'))

        }
 } as StreamCallback)

            session.transfer(flowFile, REL_SUCCESS)
            session.commit()
        }
        catch (e) {
            throw new ProcessException(e)
        }
    }

    @Override
    Collection<ValidationResult> validate(ValidationContext context) { return null }

    @Override
    PropertyDescriptor getPropertyDescriptor(String name) { return null }

    @Override

    void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }

    @Override

   List<PropertyDescriptor> getPropertyDescriptors() { 

return [] as List
}
    @Override

    String getIdentifier() { return null }
}
processor = new customJSONtoCSV() 

  1. 如果我不应该使用收集",那么我还需要使用其他什么来创建行.
  2. 在输出流文件中,记录输出位于[]内部.我尝试了以下内容,但无法正常工作.不确定是否做正确的事.我想要不带[]的CSV输出

return toCSVRow(row).toString()

推荐答案

该想法是修改"flatten"方法-它应通过提供父键作为前缀来区分相同的嵌套键. 我已经简化了一些代码:

The idea is to modify "flatten" method - it should differentiate between same nested keys by providing parent key as a prefix. I've simplified code a bit:

import groovy.json.*

def json = '{"transaction":{"TS":"2019-10-08 22:33:29.244000","CIPG":{"CIP":"10.65.5.56","CP":"0"},"OF":{"Flag1":"of","Flag2":"-"},"SF":{"Flag1":"sf","Flag2":"-"}}'
def jsonReplace = json.replace('{"transaction":{','{"transaction":[{').replace('}}','}}]')
def jsonRecord = new JsonSlurper().parseText(jsonReplace)

static def flatten(row, prefix="") {
    def flattened = new HashMap<String, String>()
    row.each { String k, Object v ->
        def key = prefix ? prefix + "." + k : k;

        if (v instanceof Map) {
            flattened.putAll(flatten(v, k))
        } else {
            flattened.put(key, v.toString())
        }
    }

    return flattened
}

static def toCSVRow(HashMap row) {
    def columns = ["TS","V","PID","RS","SR","CnID","CIP","OF.Flag1","SF.Flag1"] // Last 2 keys have changed!

    return columns.collect { column ->
        return row.containsKey(column) ? row.get(column) : ""
    }.join(', ')
}

def a = jsonRecord.transaction.collect { row ->
    return flatten(row)
}.collect { row ->
    return toCSVRow(row)
}.join('\n')

println a

输出为:

2019-10-08 22:33:29.244000, , , , , , , of, sf

这篇关于具有重复键的嵌套JSON的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆