在hadoop中操作行数据以添加缺失的列 [英] Manipulate row data in hadoop to add missing columns
问题描述
我有来自 IIS 的日志文件存储在 hdfs 中,但由于网络服务器配置,一些日志没有包含所有列或它们以不同的顺序出现.我想生成具有通用架构的文件,以便我可以在它们之上定义 Hive 表.
I have log files from IIS stored in hdfs, but due to webserver configuration some of the logs do not have all the columns or they appear in different order. I want to generate files that have a common schema so I can define a Hive table over them.
好的日志示例:
#Fields: date time s-ip cs-method cs-uri-stem useragent
2013-07-16 00:00:00 10.1.15.8 GET /common/viewFile/1232 Mozilla/5.0+Chrome/27.0.1453.116
缺少列的示例日志(缺少 cs-method 和 useragent):
Example log with missing columns (cs-method and useragent missing):
#Fields: date time s-ip cs-uri-stem
2013-07-16 00:00:00 10.1.15.8 /common/viewFile/1232
缺少列的日志需要像这样映射到完整模式:
The log with missing columns needs to be mapped to the full schema like this:
#Fields: date time s-ip cs-method cs-uri-stem useragent
2013-07-16 00:00:00 10.1.15.8 null /common/viewFile/1232 null
坏日志可以启用不同顺序的任意列组合.
如何根据日志文件中的字段行将可用列映射到完整架构?
How can I map the available columns to the full schema according to the Fields row within the log file?
通常,我会通过将我的列模式定义为将列名映射到索引的字典来解决这个问题.即: col['date']=0 col['time']=1 等等.然后我会从文件中读取 #Fields 行并解析出启用的列并生成标题 dict 将标题名称映射到文件中的列索引.然后对于剩余的数据行,我通过索引知道它的标题,通过 header=column name 将其映射到我的列模式,并以正确的顺序生成新行,插入带有空数据的缺失列.我的问题是我不明白如何在 hadoop 中执行此操作,因为每个地图都是单独执行的,因此我如何与每个地图共享 #Fields 信息?
Normally I would approach this by defining my column schema as a dict mapping column name to index. ie: col['date']=0 col['time']=1 etc. Then I would read the #Fields row from the file and parse out the enabled columns and generate header dict mapping header name to column index in the file. Then for remaining rows of data I know its header by index, map that to my column schema by header=column name and generate new row in correct order inserting missing columns with null data. My issue is I do not understand how to do this within hadoop since each map executes alone and therefore how can I share the #Fields information with each map?
推荐答案
您可以使用 this 将标题应用到创建地图的列.从那里你可以使用像这样的 UDF:
You can use this to apply the header to the columns creating a map. From there you can use a UDF like:
myudf.py
#!/usr/bin/python
@outputSchema('newM:map[]')
def completemap(M):
if M is None:
return None
to_add = ['A', 'D', 'F']
for item in to_add:
if item not in M:
M[item] = None
return M
@outputSchema('A:chararray, B:chararray, C:chararray, D:chararray, E:chararray, F:chararray')
def completemap_v2(M):
if M is None:
return (None,
None,
None,
None,
None,
None)
return (M.get('A', None),
M.get('B', None),
M.get('C', None),
M.get('D', None),
M.get('E', None),
M.get('F', None))
将缺失的元组添加到地图中.
To add in the missing tuples to the map.
样本输入:
csv1.in csv2.in
------- ---------
A|B|C D|E|F
Hello|This|is PLEASE|WORK|FOO
FOO|BAR|BING OR|EVERYTHING|WILL
BANG|BOSH BE|FOR|NAUGHT
示例脚本:
A = LOAD 'tests/csv' USING myudfs.ExampleCSVLoader('\\|') AS (M:map[]);
B = FOREACH A GENERATE FLATTEN(myudf.completemap_v2(M));
输出:
B: {null::A: chararray,null::B: chararray,null::C: chararray,null::D: chararray,null::E: chararray,null::F: chararray}
(,,,,,)
(,,,PLEASE,WORK,FOO)
(,,,OR,EVERYTHING,WILL)
(,,,BE,FOR,NAUGHT)
(,,,,,)
(Hello,This,is,,,)
(FOO,BAR,BING,,,)
(BANG,BOSH,,,,)
这篇关于在hadoop中操作行数据以添加缺失的列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!