如何在 Jython 中使用修改后的数据更新行? [英] How to update line with modified data in Jython?

查看:24
本文介绍了如何在 Jython 中使用修改后的数据更新行?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个包含数十万行的 csv 文件,下面是一些示例行..,

I'm have a csv file which contains hundred thousands of rows and below are some sample lines..,

1,Ni,23,28-02-2015 12:22:33.2212-02
2,Fi,21,28-02-2015 12:22:34.3212-02
3,Us,33,30-03-2015 12:23:35-01
4,Uk,34,31-03-2015 12:24:36.332211-02

我需要获取日期时间格式错误的最后一列 csv 数据.所以我需要从数据的最后一列获取默认日期时间格式("YYYY-MM-DD hh:mm:ss[.nnn]").

I need to get the last column of csv data which is in wrong datetime format. So I need to get default datetimeformat("YYYY-MM-DD hh:mm:ss[.nnn]") from last column of the data.

我已尝试使用以下脚本从中获取行并写入流文件.

I have tried the following script to get lines from it and write into flow file.

import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback

class PyStreamCallback(StreamCallback):
  def __init__(self):
        pass
  def process(self, inputStream, outputStream):
    text = IOUtils.readLines(inputStream, StandardCharsets.UTF_8)
    for line in text[1:]:
        outputStream.write(line + "\n") 

flowFile = session.get()
if (flowFile != None):
  flowFile = session.write(flowFile,PyStreamCallback())
  flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename'))
  session.transfer(flowFile, REL_SUCCESS)

但我找不到像下面的输出那样转换它的方法.

but I am not able to find a way to convert it like below output.

1,Ni,23,28-02-2015 12:22:33.221
2,Fi,21,29-02-2015 12:22:34.321
3,Us,33,30-03-2015 12:23:35
4,Uk,34,31-03-2015 12:24:36.332

我已经和我的朋友(谷歌)检查了解决方案,但仍然无法找到解决方案.

I have checked solutions with my friend(google) and was still not able to find solution.

谁能指导我将这些输入数据转换成我需要的输出?

推荐答案

在这个转换中,不必要的数据位于每一行的末尾,所以用正则表达式管理转换任务真的很容易.

In this transformation the unnecessary data located at the end of each line, so it's really easy to manage transform task with regular expression.

^(.*:\d\d)((\.\d{1,3})(\d*))?(-\d\d)?

检查这里的正则表达式和解释:https://regex101.com/r/sAB4SA/2

Check the regular expression and explanation here: https://regex101.com/r/sAB4SA/2

一旦你有一个大文件 - 最好不要将它加载到内存中.以下代码将整个文件加载到内存中:

As soon as you have a large file - better not to load it into the memory. The following code loads whole the file into the memory:

IOUtils.readLines(inputStream, StandardCharsets.UTF_8)

最好逐行迭代.

所以这段代码是用于ExecuteScript nifi 处理器和python (Jython) 语言的:

So this code is for ExecuteScript nifi processor with python (Jython) language:

import sys
import re
import traceback
from org.apache.commons.io import IOUtils
from org.apache.nifi.processor.io import StreamCallback
from org.python.core.util import StringUtil
from java.lang import Class
from java.io import BufferedReader
from java.io import InputStreamReader
from java.io import OutputStreamWriter


class TransformCallback(StreamCallback):
    def __init__(self):
        pass

    def process(self, inputStream, outputStream):
        try:
            writer = OutputStreamWriter(outputStream,"UTF-8")
            reader = BufferedReader(InputStreamReader(inputStream,"UTF-8"))
            line = reader.readLine()
            p = re.compile('^(.*:\d\d)((\.\d{1,3})(\d*))?(-\d\d)?')
            while line!= None:
                # print line
                match = p.search(line)
                writer.write( match.group(1) + (match.group(3) if match.group(3)!=None else '') )
                writer.write('\n')
                line = reader.readLine()
            writer.flush()
            writer.close()
            reader.close()
        except:
            traceback.print_exc(file=sys.stdout)
            raise


flowFile = session.get()
if flowFile != None:
    flowFile = session.write(flowFile, TransformCallback())

    # Finish by transferring the FlowFile to an output relationship
    session.transfer(flowFile, REL_SUCCESS)

<小时>

只要有关于 nifi 的问题,这里有一些似乎更容易的替代方案


And as soon as question is about nifi, here are alternatives that seems to be easier

与上面相同的代码,但在 nifi ExecuteScript 处理器中使用 groovy:

the same code as above but in groovy for nifi ExecuteScript processor:

def ff = session.get()
if(!ff)return
ff = session.write(ff, {rawIn, rawOut->
    // ## transform streams into reader and writer
    rawIn.withReader("UTF-8"){reader->
        rawOut.withWriter("UTF-8"){writer->
            reader.eachLine{line, lineNum->
                if(lineNum>1) { // # skip the first line
                    // ## let use regular expression to transform each line
                    writer << line.replaceAll( /^(.*:\d\d)((\.\d{1,3})(\d*))?(-\d\d)?/ , '$1$3' ) << '\n'
                }
            }
        }
    }
} as StreamCallback)
session.transfer(ff, REL_SUCCESS)

<小时>

替换文本处理器

如果正则表达式没问题 - nifi 中最简单的方法是使用 ReplaceText 处理器,它可以逐行执行正则表达式替换.


ReplaceText processor

And if regular expression is ok - the easiest way in nifi is a ReplaceText processor that could do regular expression replace line-by-line.

在这种情况下,您无需编写任何代码,只需构建正则表达式并正确配置处理器即可.

In this case you don't need to write any code, just build the regular expression and configure your processor correctly.

这篇关于如何在 Jython 中使用修改后的数据更新行?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆