如何正确合并多个 FlowFile? [英] How to properly merge multiple FlowFile's?

查看:20
本文介绍了如何正确合并多个 FlowFile?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用 MergeContent 1.3.0 来合并来自 2 个来源的 FlowFiles:1) 来自 ListenHTTP 和 2) 来自 QueryElasticsearchHTTP.

I use MergeContent 1.3.0 in order to merge FlowFiles from 2 sources: 1) from ListenHTTP and 2) from QueryElasticsearchHTTP.

问题是合并结果是一个JSON字符串列表.如何将它们转换为单个 JSON 字符串?

The problem is that the merging result is a list of JSON strings. How can I convert them into a single JSON string?

{"event-date":"2017-08-08T00:00:00"}{"event-date":"2017-02-23T00:00:00"}{"eid":1,"zid":1,"latitude":38.3,"longitude":2.4} 

我想得到这个结果:

{"event-date":["2017-08-08T00:00:00","2017-02-23T00:00:00"],"eid":1,"zid":1,"latitude":38.3,"longitude":2.4} 

有可能吗?

更新:

在 Elastic 中更改数据结构后,我能够得出以下 MergeContent 的输出结果.现在我在两个 JSON 字符串中都有一个公共字段 eid.我想通过 eid 合并这些字符串以获得单个 JSON 文件.我应该使用哪个运算符?

After changing data structure in Elastic, I was able to come up with the following output result of MergeContent. Now I have a common field eid in both JSON strings. I would like to merge these strings by eid in order to get a single JSON file. Which operator should I use?

{"eid":"1","zid":1,"latitude":38.3,"longitude":2.4}{"eid":"1","dates":{"event-date":["2017-08-08","2017-02-23"]}}

我需要得到以下输出:

{"eid":"1","zid":1,"latitude":38.3,"longitude":2.4,"dates":{"event-date":["2017-08-08","2017-02-23"]}}

建议使用ExecuteScript 来合并文件.但是我无法弄清楚如何做到这一点.这是我试过的:

It was suggested to use ExecuteScript to merge files. However I cannot figure out how to do this. This is what I tried:

import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback

class ModJSON(StreamCallback):
  def __init__(self):
        pass
  def process(self, inputStream, outputStream):
    text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
    obj = json.loads(text)
    newObj = {
          "eid": obj['eid'],
          "zid": obj['zid'],
          ...
        }
    outputStream.write(bytearray(json.dumps(newObj, indent=4).encode('utf-8')))

flowFile1 = session.get()
flowFile2 = session.get()
if (flowFile1 != None && flowFile2 != None):
  # WHAT SHOULD I PUT HERE??
  flowFile = session.write(flowFile, ModJSON())
  flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename').split('.')[0]+'_translated.json')
session.transfer(flowFile, REL_SUCCESS)
session.commit()

推荐答案

示例如何使用过滤从传入队列中读取多个文件

The example how to read multiple files from incoming queue using filtering

假设您有多对包含以下内容的流文件:

Assume you have multiple pairs of flow files with following content:

{"eid":"1","zid":1,"latitude":38.3,"longitude":2.4}

{"eid":"1","dates":{"event-date":["2017-08-08","2017-02-23"]}}

eid 字段的相同值提供了对之间的链接.

The same value of eid field provides a link between pairs.

在合并之前,我们必须提取eid字段的值并将其放入流文件的na属性中以进行快速过滤.

Before merging we have to extract the value of eid field and put it into na attribute of the flow file for fast filtering.

使用带有属性的 EvaluateJsonPath 处理器:

Use the EvaluateJsonPath processor with properties:

Destination :  flowfile-attribute 
eid         :  $.eid

在此之后,您将拥有流文件的新 eid 属性.

After this you'll have new eid attribute of the flow file.

然后使用带有 groovy 语言和以下代码的 ExecuteScript 处理器:

Then use ExecuteScript processor with groovy language and with following code:

import org.apache.nifi.processor.FlowFileFilter;
import groovy.json.JsonSlurper
import groovy.json.JsonBuilder

//get first flow file
def ff0 = session.get()
if(!ff0)return

def eid = ff0.getAttribute('eid')

//try to find files with same attribute in the incoming queue
def ffList = session.get(new FlowFileFilter(){
    public FlowFileFilterResult filter(FlowFile ff) {
        if( eid == ff.getAttribute('eid') )return FlowFileFilterResult.ACCEPT_AND_CONTINUE
        return FlowFileFilterResult.REJECT_AND_CONTINUE
    }
})

//let's assume you require two additional files in queue with the same attribute

if( !ffList || ffList.size()<1 ){
    //if less than required
    //rollback current session with penalize retrieved files so they will go to the end of the incoming queue
    //with pre-configured penalty delay (default 30sec)
    session.rollback(true)
    return
}

//let's put all in one list to simplify later iterations
ffList.add(ff0)

if( ffList.size()>2 ){
    //for example unexpected situation. you have more files then expected
    //redirect all of them to failure
    session.transfer(ffList, REL_FAILURE)
    return
}

//create empty map (aka json object)
def json = [:]
//iterate through files parse and merge attributes
ffList.each{ff->
    session.read(ff).withStream{rawIn->
        def fjson = new JsonSlurper().parse(rawIn)
        json.putAll(fjson)
    }
}
//create new flow file and write merged json as a content
def ffOut = session.create()
ffOut = session.write(ffOut,{rawOut->
    rawOut.withWriter("UTF-8"){writer->
        new JsonBuilder(json).writeTo(writer)
    }
} as OutputStreamCallback )
//set mime-type
ffOut = session.putAttribute(ffOut, "mime.type", "application/json")

session.remove(ffList)
session.transfer(ffOut, REL_SUCCESS)

这篇关于如何正确合并多个 FlowFile?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆