如何在Jython中使用修改后的数据更新行? [英] How to update line with modified data in Jython?
问题描述
我有一个包含数十万行的csv文件,下面是一些示例行.
I'm have a csv file which contains hundred thousands of rows and below are some sample lines..,
1,Ni,23,28-02-2015 12:22:33.2212-02
2,Fi,21,28-02-2015 12:22:34.3212-02
3,Us,33,30-03-2015 12:23:35-01
4,Uk,34,31-03-2015 12:24:36.332211-02
我需要获取错误格式的日期时间格式的csv数据的最后一列.因此,我需要从数据的最后一列获取默认的datetimeformat("YYYY-MM-DD hh:mm:ss[.nnn]"
).
I need to get the last column of csv data which is in wrong datetime format. So I need to get default datetimeformat("YYYY-MM-DD hh:mm:ss[.nnn]"
) from last column of the data.
我尝试使用以下脚本从中获取行并将其写入流文件.
I have tried the following script to get lines from it and write into flow file.
import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
class PyStreamCallback(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
text = IOUtils.readLines(inputStream, StandardCharsets.UTF_8)
for line in text[1:]:
outputStream.write(line + "\n")
flowFile = session.get()
if (flowFile != None):
flowFile = session.write(flowFile,PyStreamCallback())
flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename'))
session.transfer(flowFile, REL_SUCCESS)
但是我找不到像下面的输出那样进行转换的方法.
but I am not able to find a way to convert it like below output.
1,Ni,23,28-02-2015 12:22:33.221
2,Fi,21,29-02-2015 12:22:34.321
3,Us,33,30-03-2015 12:23:35
4,Uk,34,31-03-2015 12:24:36.332
我已经和我的朋友(google)检查了解决方案,但仍然找不到解决方案.
I have checked solutions with my friend(google) and was still not able to find solution.
有人可以指导我将这些输入数据转换为我所需的输出吗?
推荐答案
在此转换中,不必要的数据位于每行的末尾,因此使用正则表达式管理转换任务确实非常容易.
In this transformation the unnecessary data located at the end of each line, so it's really easy to manage transform task with regular expression.
^(.*:\d\d)((\.\d{1,3})(\d*))?(-\d\d)?
在此处检查正则表达式和解释: https://regex101.com/r/sAB4SA/2
Check the regular expression and explanation here: https://regex101.com/r/sAB4SA/2
文件较大时-最好不要将其加载到内存中.以下代码将整个文件加载到内存中:
As soon as you have a large file - better not to load it into the memory. The following code loads whole the file into the memory:
IOUtils.readLines(inputStream, StandardCharsets.UTF_8)
最好逐行进行迭代.
因此此代码适用于ExecuteScript
具有python(Jython)语言的nifi处理器:
So this code is for ExecuteScript
nifi processor with python (Jython) language:
import sys
import re
import traceback
from org.apache.commons.io import IOUtils
from org.apache.nifi.processor.io import StreamCallback
from org.python.core.util import StringUtil
from java.lang import Class
from java.io import BufferedReader
from java.io import InputStreamReader
from java.io import OutputStreamWriter
class TransformCallback(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
try:
writer = OutputStreamWriter(outputStream,"UTF-8")
reader = BufferedReader(InputStreamReader(inputStream,"UTF-8"))
line = reader.readLine()
p = re.compile('^(.*:\d\d)((\.\d{1,3})(\d*))?(-\d\d)?')
while line!= None:
# print line
match = p.search(line)
writer.write( match.group(1) + (match.group(3) if match.group(3)!=None else '') )
writer.write('\n')
line = reader.readLine()
writer.flush()
writer.close()
reader.close()
except:
traceback.print_exc(file=sys.stdout)
raise
flowFile = session.get()
if flowFile != None:
flowFile = session.write(flowFile, TransformCallback())
# Finish by transferring the FlowFile to an output relationship
session.transfer(flowFile, REL_SUCCESS)
关于nifi的问题一经提出,以下替代方法似乎更容易
And as soon as question is about nifi, here are alternatives that seems to be easier
与上面相同的代码,但对于nifi ExecuteScript
处理器却很普通:
the same code as above but in groovy for nifi ExecuteScript
processor:
def ff = session.get()
if(!ff)return
ff = session.write(ff, {rawIn, rawOut->
// ## transform streams into reader and writer
rawIn.withReader("UTF-8"){reader->
rawOut.withWriter("UTF-8"){writer->
reader.eachLine{line, lineNum->
if(lineNum>1) { // # skip the first line
// ## let use regular expression to transform each line
writer << line.replaceAll( /^(.*:\d\d)((\.\d{1,3})(\d*))?(-\d\d)?/ , '$1$3' ) << '\n'
}
}
}
}
} as StreamCallback)
session.transfer(ff, REL_SUCCESS)
ReplaceText处理器
如果正则表达式还可以-nifi中最简单的方法是ReplaceText
处理器,它可以执行正则表达式逐行替换.
ReplaceText processor
And if regular expression is ok - the easiest way in nifi is a ReplaceText
processor that could do regular expression replace line-by-line.
在这种情况下,您无需编写任何代码,只需构建正则表达式并正确配置处理器即可.
In this case you don't need to write any code, just build the regular expression and configure your processor correctly.
这篇关于如何在Jython中使用修改后的数据更新行?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!