如何正确合并多个FlowFile? [英] How to properly merge multiple FlowFile's?
问题描述
我使用MergeContent
1.3.0来合并来自2个来源的FlowFiles:1)来自ListenHTTP和2)来自QueryElasticsearchHTTP
.
I use MergeContent
1.3.0 in order to merge FlowFiles from 2 sources: 1) from ListenHTTP and 2) from QueryElasticsearchHTTP
.
问题在于合并结果是JSON字符串列表.如何将它们转换为单个JSON字符串?
The problem is that the merging result is a list of JSON strings. How can I convert them into a single JSON string?
{"event-date":"2017-08-08T00:00:00"}{"event-date":"2017-02-23T00:00:00"}{"eid":1,"zid":1,"latitude":38.3,"longitude":2.4}
我会得到以下结果:
{"event-date":["2017-08-08T00:00:00","2017-02-23T00:00:00"],"eid":1,"zid":1,"latitude":38.3,"longitude":2.4}
有可能吗?
更新:
在Elastic中更改数据结构后,我能够得出以下MergeContent
的输出结果.现在,我在两个JSON字符串中都有一个公共字段eid
.我想通过eid
合并这些字符串,以获得单个JSON文件.我应该使用哪个运算符?
After changing data structure in Elastic, I was able to come up with the following output result of MergeContent
. Now I have a common field eid
in both JSON strings. I would like to merge these strings by eid
in order to get a single JSON file. Which operator should I use?
{"eid":"1","zid":1,"latitude":38.3,"longitude":2.4}{"eid":"1","dates":{"event-date":["2017-08-08","2017-02-23"]}}
我需要获得以下输出:
{"eid":"1","zid":1,"latitude":38.3,"longitude":2.4,"dates":{"event-date":["2017-08-08","2017-02-23"]}}
建议使用ExecuteScript
合并文件.但是我不知道如何做到这一点.这是我尝试过的:
It was suggested to use ExecuteScript
to merge files. However I cannot figure out how to do this. This is what I tried:
import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
class ModJSON(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
obj = json.loads(text)
newObj = {
"eid": obj['eid'],
"zid": obj['zid'],
...
}
outputStream.write(bytearray(json.dumps(newObj, indent=4).encode('utf-8')))
flowFile1 = session.get()
flowFile2 = session.get()
if (flowFile1 != None && flowFile2 != None):
# WHAT SHOULD I PUT HERE??
flowFile = session.write(flowFile, ModJSON())
flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename').split('.')[0]+'_translated.json')
session.transfer(flowFile, REL_SUCCESS)
session.commit()
推荐答案
该示例如何使用过滤从传入队列中读取多个文件
The example how to read multiple files from incoming queue using filtering
假设您有多对具有以下内容的流文件:
Assume you have multiple pairs of flow files with following content:
{"eid":"1","zid":1,"latitude":38.3,"longitude":2.4}
和
{"eid":"1","dates":{"event-date":["2017-08-08","2017-02-23"]}}
eid
字段的相同值在线对之间提供链接.
The same value of eid
field provides a link between pairs.
在合并之前,我们必须提取eid
字段的值并将其放入流文件的na属性中以进行快速过滤.
Before merging we have to extract the value of eid
field and put it into na attribute of the flow file for fast filtering.
使用具有以下属性的EvaluateJsonPath
处理器:
Use the EvaluateJsonPath
processor with properties:
Destination : flowfile-attribute
eid : $.eid
此后,您将具有流文件的新eid
属性.
After this you'll have new eid
attribute of the flow file.
然后使用具有时髦语言和以下代码的ExecuteScript处理器:
Then use ExecuteScript processor with groovy language and with following code:
import org.apache.nifi.processor.FlowFileFilter;
import groovy.json.JsonSlurper
import groovy.json.JsonBuilder
//get first flow file
def ff0 = session.get()
if(!ff0)return
def eid = ff0.getAttribute('eid')
//try to find files with same attribute in the incoming queue
def ffList = session.get(new FlowFileFilter(){
public FlowFileFilterResult filter(FlowFile ff) {
if( eid == ff.getAttribute('eid') )return FlowFileFilterResult.ACCEPT_AND_CONTINUE
return FlowFileFilterResult.REJECT_AND_CONTINUE
}
})
//let's assume you require two additional files in queue with the same attribute
if( !ffList || ffList.size()<1 ){
//if less than required
//rollback current session with penalize retrieved files so they will go to the end of the incoming queue
//with pre-configured penalty delay (default 30sec)
session.rollback(true)
return
}
//let's put all in one list to simplify later iterations
ffList.add(ff0)
if( ffList.size()>2 ){
//for example unexpected situation. you have more files then expected
//redirect all of them to failure
session.transfer(ffList, REL_FAILURE)
return
}
//create empty map (aka json object)
def json = [:]
//iterate through files parse and merge attributes
ffList.each{ff->
session.read(ff).withStream{rawIn->
def fjson = new JsonSlurper().parse(rawIn)
json.putAll(fjson)
}
}
//create new flow file and write merged json as a content
def ffOut = session.create()
ffOut = session.write(ffOut,{rawOut->
rawOut.withWriter("UTF-8"){writer->
new JsonBuilder(json).writeTo(writer)
}
} as OutputStreamCallback )
//set mime-type
ffOut = session.putAttribute(ffOut, "mime.type", "application/json")
session.remove(ffList)
session.transfer(ffOut, REL_SUCCESS)
这篇关于如何正确合并多个FlowFile?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!