如何在Jython中使用修改后的数据更新行? [英] How to update line with modified data in Jython?

查看:136
本文介绍了如何在Jython中使用修改后的数据更新行?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个包含数十万行的csv文件,下面是一些示例行.

I'm have a csv file which contains hundred thousands of rows and below are some sample lines..,

1,Ni,23,28-02-2015 12:22:33.2212-02
2,Fi,21,28-02-2015 12:22:34.3212-02
3,Us,33,30-03-2015 12:23:35-01
4,Uk,34,31-03-2015 12:24:36.332211-02

我需要获取错误格式的日期时间格式的csv数据的最后一列.因此,我需要从数据的最后一列获取默认的datetimeformat("YYYY-MM-DD hh:mm:ss[.nnn]").

I need to get the last column of csv data which is in wrong datetime format. So I need to get default datetimeformat("YYYY-MM-DD hh:mm:ss[.nnn]") from last column of the data.

我尝试使用以下脚本从中获取行并将其写入流文件.

I have tried the following script to get lines from it and write into flow file.

import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback

class PyStreamCallback(StreamCallback):
  def __init__(self):
        pass
  def process(self, inputStream, outputStream):
    text = IOUtils.readLines(inputStream, StandardCharsets.UTF_8)
    for line in text[1:]:
        outputStream.write(line + "\n") 

flowFile = session.get()
if (flowFile != None):
  flowFile = session.write(flowFile,PyStreamCallback())
  flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename'))
  session.transfer(flowFile, REL_SUCCESS)

但是我找不到像下面的输出那样进行转换的方法.

but I am not able to find a way to convert it like below output.

1,Ni,23,28-02-2015 12:22:33.221
2,Fi,21,29-02-2015 12:22:34.321
3,Us,33,30-03-2015 12:23:35
4,Uk,34,31-03-2015 12:24:36.332

我已经和我的朋友(google)检查了解决方案,但仍然找不到解决方案.

I have checked solutions with my friend(google) and was still not able to find solution.

有人可以指导我将这些输入数据转换为我所需的输出吗?

推荐答案

在此转换中,不必要的数据位于每行的末尾,因此使用正则表达式管理转换任务确实非常容易.

In this transformation the unnecessary data located at the end of each line, so it's really easy to manage transform task with regular expression.

^(.*:\d\d)((\.\d{1,3})(\d*))?(-\d\d)?

在此处检查正则表达式和解释: https://regex101.com/r/sAB4SA/2

Check the regular expression and explanation here: https://regex101.com/r/sAB4SA/2

文件较大时-最好不要将其加载到内存中.以下代码将整个文件加载到内存中:

As soon as you have a large file - better not to load it into the memory. The following code loads whole the file into the memory:

IOUtils.readLines(inputStream, StandardCharsets.UTF_8)

最好逐行进行迭代.

因此此代码适用于ExecuteScript具有python(Jython)语言的nifi处理器:

So this code is for ExecuteScript nifi processor with python (Jython) language:

import sys
import re
import traceback
from org.apache.commons.io import IOUtils
from org.apache.nifi.processor.io import StreamCallback
from org.python.core.util import StringUtil
from java.lang import Class
from java.io import BufferedReader
from java.io import InputStreamReader
from java.io import OutputStreamWriter


class TransformCallback(StreamCallback):
    def __init__(self):
        pass

    def process(self, inputStream, outputStream):
        try:
            writer = OutputStreamWriter(outputStream,"UTF-8")
            reader = BufferedReader(InputStreamReader(inputStream,"UTF-8"))
            line = reader.readLine()
            p = re.compile('^(.*:\d\d)((\.\d{1,3})(\d*))?(-\d\d)?')
            while line!= None:
                # print line
                match = p.search(line)
                writer.write( match.group(1) + (match.group(3) if match.group(3)!=None else '') )
                writer.write('\n')
                line = reader.readLine()
            writer.flush()
            writer.close()
            reader.close()
        except:
            traceback.print_exc(file=sys.stdout)
            raise


flowFile = session.get()
if flowFile != None:
    flowFile = session.write(flowFile, TransformCallback())

    # Finish by transferring the FlowFile to an output relationship
    session.transfer(flowFile, REL_SUCCESS)


关于nifi的问题一经提出,以下替代方法似乎更容易


And as soon as question is about nifi, here are alternatives that seems to be easier

与上面相同的代码,但对于nifi ExecuteScript处理器却很普通:

the same code as above but in groovy for nifi ExecuteScript processor:

def ff = session.get()
if(!ff)return
ff = session.write(ff, {rawIn, rawOut->
    // ## transform streams into reader and writer
    rawIn.withReader("UTF-8"){reader->
        rawOut.withWriter("UTF-8"){writer->
            reader.eachLine{line, lineNum->
                if(lineNum>1) { // # skip the first line
                    // ## let use regular expression to transform each line
                    writer << line.replaceAll( /^(.*:\d\d)((\.\d{1,3})(\d*))?(-\d\d)?/ , '$1$3' ) << '\n'
                }
            }
        }
    }
} as StreamCallback)
session.transfer(ff, REL_SUCCESS)


ReplaceText处理器

如果正则表达式还可以-nifi中最简单的方法是ReplaceText处理器,它可以执行正则表达式逐行替换.


ReplaceText processor

And if regular expression is ok - the easiest way in nifi is a ReplaceText processor that could do regular expression replace line-by-line.

在这种情况下,您无需编写任何代码,只需构建正则表达式并正确配置处理器即可.

In this case you don't need to write any code, just build the regular expression and configure your processor correctly.

这篇关于如何在Jython中使用修改后的数据更新行?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆