Apache NiFi ExecuteScript:通过映射文件替换Json值的Groovy脚本 [英] Apache NiFi ExecuteScript: Groovy script to replace Json values via a mapping file

查看:202
本文介绍了Apache NiFi ExecuteScript:通过映射文件替换Json值的Groovy脚本的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Groovy脚本中的Apache NiFi 0.5.1将传入的Json值替换为映射文件中包含的Json值。映射文件看起来像这样(它是一个简单的.txt):

  Header1; Header2; Header3 
A;一些文字; A2

我已经开始使用以下内容:

  import groovy.json.JsonBuilder 
import groovy.json.JsonSlurper
import java.nio.charset.StandardCharsets

def flowFile = session.get();
if(flowFile == null){
return;
}

flowFile = session.write(flowFile,
{inputStream,outputStream - >

def content =
{
field1:A
field2:A,
field3:A



def slurped = new JsonSlurper()。parseText(content)
def builder = new JsonBuilder(slurped)
builder.content.field1 =A
builder.content。作为StreamCallback)
uilder.content.field3 =A2
outputStream.write(builder.toPrettyString()。getBytes(StandardCharsets.UTF_8))
} $ b $ $ b session.transfer(flowFile,ExecuteScript.REL_SUCCESS)

这第一步工作得很好,虽然它是硬编码的,它远非理想。我最初的想法是使用ReplaceTextWithMapping能够执行替换,但对复杂的映射文件(例如多列)不起作用。我想更进一步,但我不知道如何去做。首先,我想读取传入的流文件,而不是传入整个编码的JSON。 NiFi中可能如何?在将脚本作为ExecuteScript的一部分运行之前,我通过UpdateAttribute输出了一个带有内容的.Json文件,其中filename = myResultingJSON.json。此外,我知道如何使用Groovy加载.txt文件( String mappingContent = new File('/ path / to / file')。getText('UTF-8' ),但是如何使用加载的文件执行替换,以便生成的JSON如下所示:

  {
field1:A
field2:一些文本,
field3:A2
}



感谢您的帮助,

我。

编辑:

脚本的第一个修改允许我从InputStream中读取:

  import groovy.json.JsonBuilder 
import groovy.json.JsonSlurper

import java.nio.charset.StandardCharsets

def flowFile = session.get();
if(flowFile == null){
return;
}

flowFile = session.write(flowFile,
{inputStream,outputStream - >

def content = org.apache.commons.io.IOUtils.toString(inputStream,java.ni o.charset.StandardCharsets.UTF_8)
$ b $ def slurped = new JsonSlurper()。parseText(content)
def builder = new JsonBuilder(slurped)
builder.content.field1 = A
builder.content.field2 =一些文本
builder.content.field3 =A2
outputStream.write(builder.toPrettyString()。getBytes(StandardCharsets.UTF_8) )
}作为StreamCallback)
session.transfer(flowFile,ExecuteScript.REL_SUCCESS)



<然后,我将这些逻辑注入到Groovy ExecuteScript中,然后转向使用ConfigSlurper测试方法并编写了一个泛型类:

  class TestLoadingMappings {

static void main(String [] args){

def content ='''
{field2:A,
field3:A
}
'''

println这是JSON文件的内容+ cont ent b
$ b def slurped = new JsonSlurper()。parseText(content)
def builder = new JsonBuilder(slurped)

println这是我的构建器的内容+ builder

def propertiesFile = new File(D:\\myFile.txt)
属性props = new Properties()
props.load(new FileInputStream (propertiesFile))
def config = new ConfigSlurper()。parse(props).flatten()

println这是我的配置的内容+ config

config.each {k,v - >
if(builder [k]){
builder [k] = v
}
}
println(builder.toPrettyString())
}


我返回一个groovy.lang.MissinPropertyException,这是因为映射不是那么简单。所有字段/属性(从字段1到字段3)以相同的值(例如)进入InpuStream,这意味着例如每次字段2具有该值时,都可以确定它对另外两个属性是有效的。但是,我无法映射field2映射字段:someText,因为实际映射是由映射文件中的第一个值驱动的。这里有一个例子:

pre $ $ $ $ $ $ $ $ $ $ $ $ $ $'$'$'$'$'$'$'$' ,
field3:A

}

在我的映射文件中,我有:

  A;一些文本; A2 

然而,field1需要映射到A(文件中的第一个值)或保持不变,如果您愿意的话。 Field2需要映射到最后一列(A2)中的值,最后Field3需要映射到中间列中的某些文本。

你能帮忙吗?这是我可以用Groovy和ExecuteScript实现的。如果需要,我可以将配置文件分成两部分。



另外,我快速浏览了另一个选项(PutDistributedMapCache),我不确定我是否理解如何将键值对加载到分布式映射中缓存。看起来你需要有一个DistributedMapCacheClient,我不确定这是否容易实现。



谢谢!

编辑2:

一些其他的进展,我现在有了映射工作,但不知道为什么在阅读属性文件的第二行时失败:

 AsomeText 
A2anotherText

class TestLoadingMappings {

static void main(String [] args){

def content ='''
{field2:A,
field3:A
}
'''

println这是JSON文件的内容+ content

def slurper = new JsonSlurper()。parseText(content)
def builder = new JsonBuilder(slurper)

println这是我的构建器的内容+构建器

断言builder.content.field2 ==A
断言builder.content.field3 ==A

def propertiesFile = new File('D:\\myTest.txt')
属性props = new Properties ()
props.load(new FileInputStream(propertiesFile))
println这是属性的内容+ props
def config = new ConfigSlurper()。parse(props).flatten ()

config.each {k,v - >
if(builder.content.field2){

builder.content.field2 = config [k]
}
if(builder.content.field3){

builder.content.field3 = config [k]
}

println(builder.toPrettyString())
println这是我的构建器+生成器
}
}
}

这是我的构建器{field2:someText,field3:someText}



想法为什么?



非常感谢你

编辑3(从下面移动)



我写了以下内容:

  import groovy.json.JsonBuilder 
import groovy.json .JsonSlurper
$ b $ class TestLoadingMappings {

static void main(String [] args){

def content =
'''
{field2:A,
field3:A
}
'''
def slurper = new JsonSlurper()。parseText(content )
def builder = new JsonBuilder(slurper)

println这是我的构建器的内容+构建器

def propertiesFile = new File('D: \\properties.txt')
属性props = new Properties()
props.load(new FileInputStream(propertiesFile))
def conf = new ConfigSlurper()。parse(props) .flatten()

conf.each {k,v - >
if(builder.content [k]){
builder.content [k] = v
}
println(This print the result JSON:+ builder.toPrettyString( ))
}
}
}

然而,如下所示更改映射文件的结构:

 field1=substitutionText
field2= substitutionText2

然后,我将ConfigSlurper'合并到ExecuteScript脚本中,如下所示: p>

 导入groovy.json.JsonBuilder 
导入groovy.json.JsonSlurper
导入org.apache.commons.io .IOUtils
import org.apache.nifi.processor.io.StreamCallback

import java.nio.charset.StandardCharsets
$ b $ def flowFile = session.get() ;
if(flowFile == null){
return;

$ b $ flowFile = session.write(flowFile,
{inputStream,outputStream - >

def content = IOUtils.toString(inputStream,StandardCharsets .UTF_8)

def slurped = new JsonSlurper()。parseText(content)
def builder = new JsonBuilder(slurped)
outputStream.write(builder.toPrettyString()。getBytes (StandardCharsets.UTF_8))

def propertiesFile = new File(''D:\\properties.txt')
属性props = new Properties()
道具。 load(new FileInputStream(propertiesFile))
def conf = new ConfigSlurper()。parse(props).flatten();

conf.each {k,v - > $ b $ (builder.content [k]){
builder.content [k] = v
}
}
outputStream.write(content.toString()。getBytes(StandardCharsets .UTF_8))
}作为StreamCallba ck)
session.transfer(flowFile,ExecuteScript.REL_SUCCESS)

这个问题似乎是事实上,我无法通过使用类似于在我的TestLoadingMappings中创建的类似的东西来复制原始映射文件中的逻辑。正如我以前的评论/编辑中提到的那样,映射应该以这种方式工作:

field2 = if A然后替换为some text

field3 =如果A代替A2

...



field2 = B然后替换为其他文本

field3 = B然后替换为B2



和son on 。简而言之,这些映射是由InputStream中的传入值驱动的(它有所不同),它根据JSON属性有条件地映射到不同的值。你能否推荐一个更好的方式来通过Groovy / ExecuteScript实现这个映射?我有修改映射文件的灵活性,你能否看到我可以改变它以实现所需映射的方式?

谢谢

解决方案

我有一些关于如何读取包含JSON的流文件的示例: href =http://funnifi.blogspot.com/2016/02/executescript-explained-split-fields.html =noreferrer> http://funnifi.blogspot.com/2016/02/executescript-explained -split-fields.html
http://funnifi.blogspot.com/2016/05/validating-json-in-nifi-with.html
http://funnifi.blogspot.com/2016/02/executescript-processor-replacing-flow.html



上面有正确的结构;基本上你可以在闭包中使用该inputStream变量来读取传入的流文件内容。如果您想一次读完所有内容(您可能需要为JSON执行此操作),则可以使用IOUtils.toString()后跟JsonSlurper,如上面链接中的示例所示。



对于你的映射文件,特别是如果你的JSON是flat,你可以有一个Java属性文件,将字段名称映射到新值:



field2 =一些文字

field3 = A2

查看 ConfigSlurper 阅读属性文件。



在传入JSON文件并读入映射文件后,可以使用数组符号而不是直接成员符号来获取JSON的各个字段。假设我将这些属性读入ConfigSlurper,我想用属性文件中的一个覆盖我的输入JSON中的任何现有属性(例如,称为json)。这可能如下所示:

  config.parse(props).flatten()。each {k,v  - > 
if(json [k]){
json [k] = v
}
}

然后,您可以继续使用outputStream.write()。



不用从文件中读取映射,您可以还可以通过 PutDistributedMapCache 处理器。您可以在ExecuteScript中从DistributedCacheMapServer读取数据,我在这里有一个例子:

http://funnifi.blogspot.com/2016/04/inspecting-your-nifi.html



如果您的映射比较复杂,则可能需要使用TransformJSON处理器,该处理器将在下一个版本的NiFi(0.7.0)中提供。相关的Jira案例如下:



https://issues.apache.org/jira/browse/NIFI-361



编辑



为了回应您的编辑,我没有意识到您对各种值有多重规则。在这种情况下,属性文件可能不是表示映射的最佳方式。您可以使用JSON:

  {
field2:{
A:some文本,
B:其他文本
},
field3:{
A:A2,
B :B2
}
}

然后您可以使用JSONSlurper读入映射文件。以下是使用上述映射文件的示例:

  import groovy.json.JsonBuilder 
导入groovy.json。 JsonSlurper
import org.apache.commons.io.IOUtils
import org.apache.nifi.processor.io.StreamCallback

import java.nio.charset.StandardCharsets

def flowFile = session.get();
if(flowFile == null){
return;

$ b $ def mappingJson = new File('/ Users / mburgess / mappings.json')。text

flowFile = session.write(flowFile,{inputStream ,outputStream - >

def content = IOUtils.toString(inputStream,StandardCharsets.UTF_8)
def inJson = new JsonSlurper()。parseText(content)
def mappings = new JsonSlurper()。parseText(mappingJson)

inJson.each {k,v - >
inJson [k] =映射[k] [v]
}
outputStream.write(inJson.toString()。getBytes(StandardCharsets.UTF_8))
} as StreamCallback)

session.transfer(flowFile,REL_SUCCESS)


I am working with Apache NiFi 0.5.1 on a Groovy script to replace incoming Json values with the ones contained in a mapping file. The mapping file looks like this (it is a simple .txt):

Header1;Header2;Header3
 A;some text;A2

I have started with the following:

import groovy.json.JsonBuilder 
import groovy.json.JsonSlurper 
import java.nio.charset.StandardCharsets 

def flowFile = session.get(); 
if (flowFile == null) { 
    return; 
} 

flowFile = session.write(flowFile, 
        { inputStream, outputStream -> 

            def content = """ 
{ 
  "field1": "A"
  "field2": "A", 
  "field3": "A" 

}""" 

            def slurped = new JsonSlurper().parseText(content) 
            def builder = new JsonBuilder(slurped) 
            builder.content.field1 = "A"
            builder.content.field2 = "some text" 
            builder.content.field3 = "A2" 
            outputStream.write(builder.toPrettyString().getBytes(StandardCharsets.UTF_8)) 
        } as StreamCallback) 
session.transfer(flowFile, ExecuteScript.REL_SUCCESS)

This first step works just fine, although it is hardcoded and it is far from being ideal. My initial thought was to use the ReplaceTextWithMapping to be able to perform the substitutions, however it does not work well with complex mapping files (e.g. multi-columns). I would like to take this further, but I am not sure how to go about it. First of all, instead of passing in the entire harcoded JSON, I would like to read the incoming flowfile. How is that possible in NiFi? Before running the script as part of ExecuteScript, I have output a .Json file with content via the UpdateAttribute where filename = myResultingJSON.json. Furthermore, I know how to load a .txt file with Groovy (String mappingContent= new File('/path/to/file').getText('UTF-8'), however how do I use the loaded file to perform the substitutions so that my resulting JSON would look like this:

{ 
  "field1": "A"
  "field2": "some text", 
  "field3": "A2" 
}

Thank you for your help,

I.

EDIT:

First modification to the script does allow me to read from the InputStream:

import groovy.json.JsonBuilder
import groovy.json.JsonSlurper

import java.nio.charset.StandardCharsets

def flowFile = session.get();
if (flowFile == null) {
    return;
}

flowFile = session.write(flowFile,
        { inputStream, outputStream ->

            def content = org.apache.commons.io.IOUtils.toString(inputStream, java.nio.charset.StandardCharsets.UTF_8)

            def slurped = new JsonSlurper().parseText(content)
            def builder = new JsonBuilder(slurped)
            builder.content.field1 = "A"
            builder.content.field2 = "some text" 
            builder.content.field3 = "A2" 
            outputStream.write(builder.toPrettyString().getBytes(StandardCharsets.UTF_8))
        } as StreamCallback)
session.transfer(flowFile, ExecuteScript.REL_SUCCESS)

I have then moved to testing the approach with the ConfigSlurper and wrote a generic class before injecting the logic into the Groovy ExecuteScript:

class TestLoadingMappings {

    static void main(String[] args) {

        def content = '''
         {"field2":"A",
         "field3": "A"
         }
         '''

        println "This is the content of the JSON file" + content

        def slurped = new JsonSlurper().parseText(content)
        def builder = new JsonBuilder(slurped)

        println "This is the content of my builder " + builder

        def propertiesFile = new File("D:\\myFile.txt")
        Properties props = new Properties()
        props.load(new FileInputStream(propertiesFile))
        def config = new ConfigSlurper().parse(props).flatten()

        println "This is the content of my config " + config

        config.each { k, v ->
            if (builder[k]) {
                builder[k] = v
            }
        }
        println(builder.toPrettyString())
    }

}

I am returned with a groovy.lang.MissinPropertyException and this is because the mapping is not that straightforward. All fields/properties (from field1 to field3) come into the InpuStream with the same value (e.g.) and this means that every time field2, for example, has that value you can be certain that it will be valid for the other two properties. However, I cannot have a mapping field that maps "field2" : "someText" because the actual mapping is driven by the first value in the mapping file. Here an example:

{ 
      "field1": "A"
      "field2": "A", 
      "field3": "A" 

 }

In my mapping file I have:

A;some text;A2

However field1 needs mapping to A (first value in the file) or stay the same, if you wish. Field2 needs mapping to the value in the last column (A2) and finally Field3 needs mapping to 'some text' in the middle column.

Can you help with this? Is that something I can achieve with Groovy and ExecuteScript. If needed I can split the config files into two.

Also, I have had a quick look at the other option (PutDistributedMapCache) and I am not sure I have understood how to load key-value pairs into to a distributed map cache. It looks like you would need to have a DistributedMapCacheClient and I am not sure whether this can be easy to implement.

Thank you!

EDIT 2:

Some other progress, I have now the mapping working, but not sure why it fails when reading the second line of the properties file:

"A" someText
"A2" anotherText

class TestLoadingMappings {

    static void main(String[] args) {

        def content = '''
         {"field2":"A",
         "field3":"A"
         }
         '''

        println "This is the content of the JSON file" + content

        def slurper = new JsonSlurper().parseText(content)
        def builder = new JsonBuilder(slurper)

        println "This is the content of my builder " + builder

        assert builder.content.field2 == "A"
        assert builder.content.field3 == "A"

        def propertiesFile = new File('D:\\myTest.txt')
        Properties props = new Properties()
        props.load(new FileInputStream(propertiesFile))
        println "This is the content of the properties " + props
        def config = new ConfigSlurper().parse(props).flatten()

        config.each { k, v ->
            if (builder.content.field2) {

                builder.content.field2 = config[k]
            }
            if (builder.content.field3) {

                builder.content.field3 = config[k]
            }

            println(builder.toPrettyString())
            println "This is my builder " + builder
        }
    }
}

I am returned with: This is my builder {"field2":"someText","field3":"someText"}

Any idea why?

Thank you so much

EDIT 3 (Moved from below)

I have written the following:

    import groovy.json.JsonBuilder
    import groovy.json.JsonSlurper

    class TestLoadingMappings {

        static void main(String[] args) {

            def content =
            '''
            {"field2":"A",
             "field3":"A"
            }
            '''
            def slurper = new JsonSlurper().parseText(content)
            def builder = new JsonBuilder(slurper)

            println "This is the content of my builder " + builder

            def propertiesFile = new File('D:\\properties.txt')
            Properties props = new Properties()
            props.load(new FileInputStream(propertiesFile))
            def conf = new ConfigSlurper().parse(props).flatten()

            conf.each { k, v ->
            if (builder.content[k]) {
                builder.content[k] = v
            }
            println("This prints the resulting JSON :" + builder.toPrettyString())
        }
    }
}

However, I had to change the structure of the mapping file as following:

"field1"="substitutionText"
"field2"="substitutionText2"

I have then 'incorporated' the ConfigSlurper into the ExecuteScript script, as follows:

import groovy.json.JsonBuilder
import groovy.json.JsonSlurper
import org.apache.commons.io.IOUtils
import org.apache.nifi.processor.io.StreamCallback

import java.nio.charset.StandardCharsets

def flowFile = session.get();
if (flowFile == null) {
    return;
}

flowFile = session.write(flowFile,
        { inputStream, outputStream ->

            def content = IOUtils.toString(inputStream, StandardCharsets.UTF_8)

            def slurped = new JsonSlurper().parseText(content)
            def builder = new JsonBuilder(slurped)
            outputStream.write(builder.toPrettyString().getBytes(StandardCharsets.UTF_8))

            def propertiesFile = new File(''D:\\properties.txt')
            Properties props = new Properties()
            props.load(new FileInputStream(propertiesFile))
            def conf = new ConfigSlurper().parse(props).flatten();

            conf.each { k, v ->
                if (builder.content[k]) {
                    builder.content[k] = v
                }
            }
            outputStream.write(content.toString().getBytes(StandardCharsets.UTF_8))
        } as StreamCallback)
session.transfer(flowFile, ExecuteScript.REL_SUCCESS)

The problem seems to be the fact that I cannot really replicate the logic in the original mapping file by using something similar to the one created for in my TestLoadingMappings. As mentioned in my previous comments/edits, the mapping should work in this way:

field2 = if A then substitute to "some text"

field3 = if A then substitute to A2

...

field2 = B then substitute to "some other text"

field3 = B then substitute to B2

and son on.

In a nutshell, the mappings are driven by the incoming value in the InputStream (which varies), which conditionally maps to different values depending on the JSON attribute. Can you please recommend a better way to achieve this mapping via a Groovy/ExecuteScript? I have flexibility in modifying the mapping file, can you see a way where I can change it in order to achieve the desired mappings?

Thanks

解决方案

I have some examples on how to read in a flow file containing JSON:

http://funnifi.blogspot.com/2016/02/executescript-explained-split-fields.html http://funnifi.blogspot.com/2016/05/validating-json-in-nifi-with.html http://funnifi.blogspot.com/2016/02/executescript-processor-replacing-flow.html

You've got the right structure above; basically you can use that "inputStream" variable in the closure to read the incoming flow file contents. If you want to read it in all at once (which you will likely need to do for JSON), you can use IOUtils.toString() followed by a JsonSlurper, as is done in the examples in the links above.

For your mapping file, especially if your JSON is "flat", you could have a Java Properties file, mapping the name of the field to the new value:

field2=some text

field3=A2

Check out ConfigSlurper for reading in properties files.

Once you have slurped the incoming JSON file and read in the mapping file, you can get at the individual fields of the JSON using array notation instead of direct member notation. So let's say I read the properties into a ConfigSlurper, and I want to overwrite any existing property in my input JSON (called "json" for the example) with the one from the properties file. That might look like the following:

config.parse(props).flatten().each { k,v ->
  if(json[k]) {
    json[k] = v
  }
}

You can then continue on with your outputStream.write().

Instead of reading your mappings from a file, you could also load it into a distributed cache via the PutDistributedMapCache processor. You can read from a DistributedCacheMapServer in your ExecuteScript, I have an example here:

http://funnifi.blogspot.com/2016/04/inspecting-your-nifi.html

If your mapping is complex, you may want to use the TransformJSON processor, which will be available in the next release of NiFi (0.7.0). The associated Jira case is here:

https://issues.apache.org/jira/browse/NIFI-361

EDIT:

In response to your edits, I didn't realize you had multiple rules for various values. In this case a properties file is probably not the best way to represent the mappings. Instead you could use JSON:

{
  "field2": {
         "A": "some text",
         "B": "some other text"
       },
  "field3": {
         "A": "A2",
         "B": "B2"
       }
}

Then you can use a JSONSlurper to read in the mappings file. Here is an example of using the above mapping file:

import groovy.json.JsonBuilder
import groovy.json.JsonSlurper
import org.apache.commons.io.IOUtils
import org.apache.nifi.processor.io.StreamCallback

import java.nio.charset.StandardCharsets

def flowFile = session.get();
if (flowFile == null) {
    return;
}

def mappingJson = new File('/Users/mburgess/mappings.json').text

flowFile = session.write(flowFile, { inputStream, outputStream ->

    def content = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
    def inJson = new JsonSlurper().parseText(content)
    def mappings = new JsonSlurper().parseText(mappingJson)

    inJson.each {k,v -> 
        inJson[k] = mappings[k][v]
    }
    outputStream.write(inJson.toString().getBytes(StandardCharsets.UTF_8))
} as StreamCallback)

session.transfer(flowFile, REL_SUCCESS)

这篇关于Apache NiFi ExecuteScript:通过映射文件替换Json值的Groovy脚本的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆