如何正确合并多个FlowFile? [英] How to properly merge multiple FlowFile's?

查看:315
本文介绍了如何正确合并多个FlowFile?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用MergeContent 1.3.0来合并来自2个来源的FlowFiles:1)来自ListenHTTP和2)来自QueryElasticsearchHTTP.

I use MergeContent 1.3.0 in order to merge FlowFiles from 2 sources: 1) from ListenHTTP and 2) from QueryElasticsearchHTTP.

问题在于合并结果是JSON字符串列表.如何将它们转换为单个JSON字符串?

The problem is that the merging result is a list of JSON strings. How can I convert them into a single JSON string?

{"event-date":"2017-08-08T00:00:00"}{"event-date":"2017-02-23T00:00:00"}{"eid":1,"zid":1,"latitude":38.3,"longitude":2.4} 

我会得到以下结果:

{"event-date":["2017-08-08T00:00:00","2017-02-23T00:00:00"],"eid":1,"zid":1,"latitude":38.3,"longitude":2.4} 

有可能吗?

更新:

在Elastic中更改数据结构后,我能够得出以下MergeContent的输出结果.现在,我在两个JSON字符串中都有一个公共字段eid.我想通过eid合并这些字符串,以获得单个JSON文件.我应该使用哪个运算符?

After changing data structure in Elastic, I was able to come up with the following output result of MergeContent. Now I have a common field eid in both JSON strings. I would like to merge these strings by eid in order to get a single JSON file. Which operator should I use?

{"eid":"1","zid":1,"latitude":38.3,"longitude":2.4}{"eid":"1","dates":{"event-date":["2017-08-08","2017-02-23"]}}

我需要获得以下输出:

{"eid":"1","zid":1,"latitude":38.3,"longitude":2.4,"dates":{"event-date":["2017-08-08","2017-02-23"]}}

建议使用ExecuteScript合并文件.但是我不知道如何做到这一点.这是我尝试过的:

It was suggested to use ExecuteScript to merge files. However I cannot figure out how to do this. This is what I tried:

import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback

class ModJSON(StreamCallback):
  def __init__(self):
        pass
  def process(self, inputStream, outputStream):
    text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
    obj = json.loads(text)
    newObj = {
          "eid": obj['eid'],
          "zid": obj['zid'],
          ...
        }
    outputStream.write(bytearray(json.dumps(newObj, indent=4).encode('utf-8')))

flowFile1 = session.get()
flowFile2 = session.get()
if (flowFile1 != None && flowFile2 != None):
  # WHAT SHOULD I PUT HERE??
  flowFile = session.write(flowFile, ModJSON())
  flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename').split('.')[0]+'_translated.json')
session.transfer(flowFile, REL_SUCCESS)
session.commit()

推荐答案

该示例如何使用过滤从传入队列中读取多个文件

The example how to read multiple files from incoming queue using filtering

假设您有多对具有以下内容的流文件:

Assume you have multiple pairs of flow files with following content:

{"eid":"1","zid":1,"latitude":38.3,"longitude":2.4}

{"eid":"1","dates":{"event-date":["2017-08-08","2017-02-23"]}}

eid字段的相同值在线对之间提供链接.

The same value of eid field provides a link between pairs.

在合并之前,我们必须提取eid字段的值并将其放入流文件的na属性中以进行快速过滤.

Before merging we have to extract the value of eid field and put it into na attribute of the flow file for fast filtering.

使用具有以下属性的EvaluateJsonPath处理器:

Use the EvaluateJsonPath processor with properties:

Destination :  flowfile-attribute 
eid         :  $.eid

此后,您将具有流文件的新eid属性.

After this you'll have new eid attribute of the flow file.

然后使用具有时髦语言和以下代码的ExecuteScript处理器:

Then use ExecuteScript processor with groovy language and with following code:

import org.apache.nifi.processor.FlowFileFilter;
import groovy.json.JsonSlurper
import groovy.json.JsonBuilder

//get first flow file
def ff0 = session.get()
if(!ff0)return

def eid = ff0.getAttribute('eid')

//try to find files with same attribute in the incoming queue
def ffList = session.get(new FlowFileFilter(){
    public FlowFileFilterResult filter(FlowFile ff) {
        if( eid == ff.getAttribute('eid') )return FlowFileFilterResult.ACCEPT_AND_CONTINUE
        return FlowFileFilterResult.REJECT_AND_CONTINUE
    }
})

//let's assume you require two additional files in queue with the same attribute

if( !ffList || ffList.size()<1 ){
    //if less than required
    //rollback current session with penalize retrieved files so they will go to the end of the incoming queue
    //with pre-configured penalty delay (default 30sec)
    session.rollback(true)
    return
}

//let's put all in one list to simplify later iterations
ffList.add(ff0)

if( ffList.size()>2 ){
    //for example unexpected situation. you have more files then expected
    //redirect all of them to failure
    session.transfer(ffList, REL_FAILURE)
    return
}

//create empty map (aka json object)
def json = [:]
//iterate through files parse and merge attributes
ffList.each{ff->
    session.read(ff).withStream{rawIn->
        def fjson = new JsonSlurper().parse(rawIn)
        json.putAll(fjson)
    }
}
//create new flow file and write merged json as a content
def ffOut = session.create()
ffOut = session.write(ffOut,{rawOut->
    rawOut.withWriter("UTF-8"){writer->
        new JsonBuilder(json).writeTo(writer)
    }
} as OutputStreamCallback )
//set mime-type
ffOut = session.putAttribute(ffOut, "mime.type", "application/json")

session.remove(ffList)
session.transfer(ffOut, REL_SUCCESS)

这篇关于如何正确合并多个FlowFile?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆