Apache NiFi ExecuteScript:通过映射文件替换 Json 值的 Groovy 脚本 [英] Apache NiFi ExecuteScript: Groovy script to replace Json values via a mapping file

查看:32
本文介绍了Apache NiFi ExecuteScript:通过映射文件替换 Json 值的 Groovy 脚本的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 Apache NiFi 0.5.1 在 Groovy 脚本上使用映射文件中包含的值替换传入的 Json 值.映射文件如下所示(它是一个简单的 .txt):

I am working with Apache NiFi 0.5.1 on a Groovy script to replace incoming Json values with the ones contained in a mapping file. The mapping file looks like this (it is a simple .txt):

Header1;Header2;Header3
 A;some text;A2

我从以下几点开始:

import groovy.json.JsonBuilder 
import groovy.json.JsonSlurper 
import java.nio.charset.StandardCharsets 

def flowFile = session.get(); 
if (flowFile == null) { 
    return; 
} 

flowFile = session.write(flowFile, 
        { inputStream, outputStream -> 

            def content = """ 
{ 
  "field1": "A"
  "field2": "A", 
  "field3": "A" 

}""" 

            def slurped = new JsonSlurper().parseText(content) 
            def builder = new JsonBuilder(slurped) 
            builder.content.field1 = "A"
            builder.content.field2 = "some text" 
            builder.content.field3 = "A2" 
            outputStream.write(builder.toPrettyString().getBytes(StandardCharsets.UTF_8)) 
        } as StreamCallback) 
session.transfer(flowFile, ExecuteScript.REL_SUCCESS)

这第一步工作得很好,尽管它是硬编码的,而且远非理想.我最初的想法是使用 ReplaceTextWithMapping 来执行替换,但是它不适用于复杂的映射文件(例如多列).我想更进一步,但我不知道如何去做.首先,我想读取传入的流文件,而不是传入整个硬编码的 JSON.这在 NiFi 中怎么可能?在将脚本作为 ExecuteScript 的一部分运行之前,我已经通过 UpdateAttribute 输出了一个包含内容的 .Json 文件,其中 filename = myResultingJSON.json.此外,我知道如何使用 Groovy (String mappingContent= new File('/path/to/file').getText('UTF-8') 加载 .txt 文件,但是我该怎么做?使用加载的文件执行替换,以便我生成的 JSON 如下所示:

This first step works just fine, although it is hardcoded and it is far from being ideal. My initial thought was to use the ReplaceTextWithMapping to be able to perform the substitutions, however it does not work well with complex mapping files (e.g. multi-columns). I would like to take this further, but I am not sure how to go about it. First of all, instead of passing in the entire harcoded JSON, I would like to read the incoming flowfile. How is that possible in NiFi? Before running the script as part of ExecuteScript, I have output a .Json file with content via the UpdateAttribute where filename = myResultingJSON.json. Furthermore, I know how to load a .txt file with Groovy (String mappingContent= new File('/path/to/file').getText('UTF-8'), however how do I use the loaded file to perform the substitutions so that my resulting JSON would look like this:

{ 
  "field1": "A"
  "field2": "some text", 
  "field3": "A2" 
}

感谢您的帮助,

我.

对脚本的第一次修改确实允许我从 InputStream 读取:

First modification to the script does allow me to read from the InputStream:

import groovy.json.JsonBuilder
import groovy.json.JsonSlurper

import java.nio.charset.StandardCharsets

def flowFile = session.get();
if (flowFile == null) {
    return;
}

flowFile = session.write(flowFile,
        { inputStream, outputStream ->

            def content = org.apache.commons.io.IOUtils.toString(inputStream, java.nio.charset.StandardCharsets.UTF_8)

            def slurped = new JsonSlurper().parseText(content)
            def builder = new JsonBuilder(slurped)
            builder.content.field1 = "A"
            builder.content.field2 = "some text" 
            builder.content.field3 = "A2" 
            outputStream.write(builder.toPrettyString().getBytes(StandardCharsets.UTF_8))
        } as StreamCallback)
session.transfer(flowFile, ExecuteScript.REL_SUCCESS)

然后,我开始使用 ConfigSlurper 测试该方法,并在将逻辑注入 Groovy ExecuteScript 之前编写了一个通用类:

I have then moved to testing the approach with the ConfigSlurper and wrote a generic class before injecting the logic into the Groovy ExecuteScript:

class TestLoadingMappings {

    static void main(String[] args) {

        def content = '''
         {"field2":"A",
         "field3": "A"
         }
         '''

        println "This is the content of the JSON file" + content

        def slurped = new JsonSlurper().parseText(content)
        def builder = new JsonBuilder(slurped)

        println "This is the content of my builder " + builder

        def propertiesFile = new File("D:\\myFile.txt")
        Properties props = new Properties()
        props.load(new FileInputStream(propertiesFile))
        def config = new ConfigSlurper().parse(props).flatten()

        println "This is the content of my config " + config

        config.each { k, v ->
            if (builder[k]) {
                builder[k] = v
            }
        }
        println(builder.toPrettyString())
    }

}

我返回一个 groovy.lang.MissinPropertyException,这是因为映射不是那么简单.所有字段/属性(从 field1 到 field3)都以相同的值(例如)进入 InpuStream,这意味着每次 field2 具有该值时,您都可以确定它对其他两个属性有效.但是,我不能有映射 "field2" : "someText" 的映射字段,因为实际映射是由映射文件中的第一个值驱动的.举个例子:

I am returned with a groovy.lang.MissinPropertyException and this is because the mapping is not that straightforward. All fields/properties (from field1 to field3) come into the InpuStream with the same value (e.g.) and this means that every time field2, for example, has that value you can be certain that it will be valid for the other two properties. However, I cannot have a mapping field that maps "field2" : "someText" because the actual mapping is driven by the first value in the mapping file. Here an example:

{ 
      "field1": "A"
      "field2": "A", 
      "field3": "A" 

 }

在我的映射文件中,我有:

In my mapping file I have:

A;some text;A2

但是,如果您愿意,field1 需要映射到 A(文件中的第一个值)或保持不变.Field2 需要映射到最后一列 (A2) 中的值,最后 Field3 需要映射到中间列中的某些文本".

However field1 needs mapping to A (first value in the file) or stay the same, if you wish. Field2 needs mapping to the value in the last column (A2) and finally Field3 needs mapping to 'some text' in the middle column.

你能帮忙解决这个问题吗?这是我可以用 Groovy 和 ExecuteScript 实现的吗?如果需要,我可以将配置文件分成两个.

Can you help with this? Is that something I can achieve with Groovy and ExecuteScript. If needed I can split the config files into two.

另外,我快速浏览了另一个选项 (PutDistributedMapCache),但我不确定我是否理解如何将键值对加载到分布式地图缓存中.看起来您需要有一个 DistributedMapCacheClient,我不确定这是否容易实现.

Also, I have had a quick look at the other option (PutDistributedMapCache) and I am not sure I have understood how to load key-value pairs into to a distributed map cache. It looks like you would need to have a DistributedMapCacheClient and I am not sure whether this can be easy to implement.

谢谢!

编辑 2:

其他一些进展,我现在映射工作了,但不确定为什么在读取属性文件的第二行时失败:

Some other progress, I have now the mapping working, but not sure why it fails when reading the second line of the properties file:

"A" someText
"A2" anotherText

class TestLoadingMappings {

    static void main(String[] args) {

        def content = '''
         {"field2":"A",
         "field3":"A"
         }
         '''

        println "This is the content of the JSON file" + content

        def slurper = new JsonSlurper().parseText(content)
        def builder = new JsonBuilder(slurper)

        println "This is the content of my builder " + builder

        assert builder.content.field2 == "A"
        assert builder.content.field3 == "A"

        def propertiesFile = new File('D:\\myTest.txt')
        Properties props = new Properties()
        props.load(new FileInputStream(propertiesFile))
        println "This is the content of the properties " + props
        def config = new ConfigSlurper().parse(props).flatten()

        config.each { k, v ->
            if (builder.content.field2) {

                builder.content.field2 = config[k]
            }
            if (builder.content.field3) {

                builder.content.field3 = config[k]
            }

            println(builder.toPrettyString())
            println "This is my builder " + builder
        }
    }
}

我返回:This is my builder {"field2":"someText","field3":"someText"}

知道为什么吗?

非常感谢

编辑 3(从下面移动)

EDIT 3 (Moved from below)

我写了以下内容:

    import groovy.json.JsonBuilder
    import groovy.json.JsonSlurper

    class TestLoadingMappings {

        static void main(String[] args) {

            def content =
            '''
            {"field2":"A",
             "field3":"A"
            }
            '''
            def slurper = new JsonSlurper().parseText(content)
            def builder = new JsonBuilder(slurper)

            println "This is the content of my builder " + builder

            def propertiesFile = new File('D:\\properties.txt')
            Properties props = new Properties()
            props.load(new FileInputStream(propertiesFile))
            def conf = new ConfigSlurper().parse(props).flatten()

            conf.each { k, v ->
            if (builder.content[k]) {
                builder.content[k] = v
            }
            println("This prints the resulting JSON :" + builder.toPrettyString())
        }
    }
}

然而,我不得不改变映射文件的结构如下:

However, I had to change the structure of the mapping file as following:

"field1"="substitutionText"
"field2"="substitutionText2"

然后,我将 ConfigSlurper合并"到了 ExecuteScript 脚本中,如下所示:

I have then 'incorporated' the ConfigSlurper into the ExecuteScript script, as follows:

import groovy.json.JsonBuilder
import groovy.json.JsonSlurper
import org.apache.commons.io.IOUtils
import org.apache.nifi.processor.io.StreamCallback

import java.nio.charset.StandardCharsets

def flowFile = session.get();
if (flowFile == null) {
    return;
}

flowFile = session.write(flowFile,
        { inputStream, outputStream ->

            def content = IOUtils.toString(inputStream, StandardCharsets.UTF_8)

            def slurped = new JsonSlurper().parseText(content)
            def builder = new JsonBuilder(slurped)
            outputStream.write(builder.toPrettyString().getBytes(StandardCharsets.UTF_8))

            def propertiesFile = new File(''D:\\properties.txt')
            Properties props = new Properties()
            props.load(new FileInputStream(propertiesFile))
            def conf = new ConfigSlurper().parse(props).flatten();

            conf.each { k, v ->
                if (builder.content[k]) {
                    builder.content[k] = v
                }
            }
            outputStream.write(content.toString().getBytes(StandardCharsets.UTF_8))
        } as StreamCallback)
session.transfer(flowFile, ExecuteScript.REL_SUCCESS)

问题似乎是我无法通过使用类似于在我的 TestLoadingMappings 中创建的东西来真正复制原始映射文件中的逻辑.正如我之前的评论/编辑中提到的,映射应该以这种方式工作:

The problem seems to be the fact that I cannot really replicate the logic in the original mapping file by using something similar to the one created for in my TestLoadingMappings. As mentioned in my previous comments/edits, the mapping should work in this way:

field2 = 如果 A 则替换为某些文本"

field2 = if A then substitute to "some text"

field3 = 如果 A 则替换为 A2

field3 = if A then substitute to A2

...

field2 = B 然后替换为一些其他文本"

field2 = B then substitute to "some other text"

field3 = B 然后替换为 B2

field3 = B then substitute to B2

还有儿子.

简而言之,映射由 InputStream 中的传入值驱动(不同),它根据 JSON 属性有条件地映射到不同的值.您能否推荐一种通过 Groovy/ExecuteScript 实现此映射的更好方法?我可以灵活地修改映射文件,您能看到我可以更改它以实现所需映射的方法吗?

In a nutshell, the mappings are driven by the incoming value in the InputStream (which varies), which conditionally maps to different values depending on the JSON attribute. Can you please recommend a better way to achieve this mapping via a Groovy/ExecuteScript? I have flexibility in modifying the mapping file, can you see a way where I can change it in order to achieve the desired mappings?

谢谢

推荐答案

我有一些关于如何读取包含 JSON 的流文件的示例:

I have some examples on how to read in a flow file containing JSON:

http://funnifi.blogspot.com/2016/02/executescript-explained-split-fields.htmlhttp://funnifi.blogspot.com/2016/05/validating-json-in-nifi-with.htmlhttp://funnifi.blogspot.com/2016/02/executescript-processor-replacing-flow.html

上面的结构是正确的;基本上你可以在闭包中使用inputStream"变量来读取传入的流文件内容.如果您想一次性阅读所有内容(对于 JSON,您可能需要这样做),您可以使用 IOUtils.toString() 后跟一个 JsonSlurper,如上面链接中的示例所示.

You've got the right structure above; basically you can use that "inputStream" variable in the closure to read the incoming flow file contents. If you want to read it in all at once (which you will likely need to do for JSON), you can use IOUtils.toString() followed by a JsonSlurper, as is done in the examples in the links above.

对于您的映射文件,特别是如果您的 JSON 是扁平的",您可以有一个 Java 属性文件,将字段的名称映射到新值:

For your mapping file, especially if your JSON is "flat", you could have a Java Properties file, mapping the name of the field to the new value:

field2=一些文本

field2=some text

field3=A2

查看 ConfigSlurper 以了解详情属性文件.

Check out ConfigSlurper for reading in properties files.

一旦您读取了传入的 JSON 文件并读取了映射文件,您就可以使用数组表示法而不是直接成员表示法来获取 JSON 的各个字段.因此,假设我将属性读入 ConfigSlurper,并且我想用属性文件中的属性覆盖我的输入 JSON(例如称为json")中的任何现有属性.这可能如下所示:

Once you have slurped the incoming JSON file and read in the mapping file, you can get at the individual fields of the JSON using array notation instead of direct member notation. So let's say I read the properties into a ConfigSlurper, and I want to overwrite any existing property in my input JSON (called "json" for the example) with the one from the properties file. That might look like the following:

config.parse(props).flatten().each { k,v ->
  if(json[k]) {
    json[k] = v
  }
}

然后您可以继续使用 outputStream.write().

You can then continue on with your outputStream.write().

除了从文件中读取映射之外,您还可以通过 PutDistributedMapCache 处理器.你可以从你的 ExecuteScript 中的 DistributedCacheMapServer 读取,我这里有一个例子:

Instead of reading your mappings from a file, you could also load it into a distributed cache via the PutDistributedMapCache processor. You can read from a DistributedCacheMapServer in your ExecuteScript, I have an example here:

http://funnifi.blogspot.com/2016/04/检查您的 nifi.html

如果您的映射很复杂,您可能需要使用 TransformJSON 处理器,该处理器将在 NiFi (0.7.0) 的下一个版本中提供.相关的 Jira 案例在这里:

If your mapping is complex, you may want to use the TransformJSON processor, which will be available in the next release of NiFi (0.7.0). The associated Jira case is here:

https://issues.apache.org/jira/browse/NIFI-361

编辑:

响应您的编辑,我没有意识到您对各种值有多个规则.在这种情况下,属性文件可能不是表示映射的最佳方式.相反,您可以使用 JSON:

In response to your edits, I didn't realize you had multiple rules for various values. In this case a properties file is probably not the best way to represent the mappings. Instead you could use JSON:

{
  "field2": {
         "A": "some text",
         "B": "some other text"
       },
  "field3": {
         "A": "A2",
         "B": "B2"
       }
}

然后您可以使用 JSONSlurper 读取映射文件.以下是使用上述映射文件的示例:

Then you can use a JSONSlurper to read in the mappings file. Here is an example of using the above mapping file:

import groovy.json.JsonBuilder
import groovy.json.JsonSlurper
import org.apache.commons.io.IOUtils
import org.apache.nifi.processor.io.StreamCallback

import java.nio.charset.StandardCharsets

def flowFile = session.get();
if (flowFile == null) {
    return;
}

def mappingJson = new File('/Users/mburgess/mappings.json').text

flowFile = session.write(flowFile, { inputStream, outputStream ->

    def content = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
    def inJson = new JsonSlurper().parseText(content)
    def mappings = new JsonSlurper().parseText(mappingJson)

    inJson.each {k,v -> 
        inJson[k] = mappings[k][v]
    }
    outputStream.write(inJson.toString().getBytes(StandardCharsets.UTF_8))
} as StreamCallback)

session.transfer(flowFile, REL_SUCCESS)

这篇关于Apache NiFi ExecuteScript:通过映射文件替换 Json 值的 Groovy 脚本的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆