在hadoop中操作行数据以添加缺失的列 [英] Manipulate row data in hadoop to add missing columns

查看:28
本文介绍了在hadoop中操作行数据以添加缺失的列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有来自 IIS 的日志文件存储在 hdfs 中,但由于网络服务器配置,一些日志没有包含所有列或它们以不同的顺序出现.我想生成具有通用架构的文件,以便我可以在它们之上定义 Hive 表.

I have log files from IIS stored in hdfs, but due to webserver configuration some of the logs do not have all the columns or they appear in different order. I want to generate files that have a common schema so I can define a Hive table over them.

好的日志示例:

#Fields: date time s-ip cs-method cs-uri-stem useragent
2013-07-16 00:00:00 10.1.15.8 GET /common/viewFile/1232 Mozilla/5.0+Chrome/27.0.1453.116

缺少列的示例日志(缺少 cs-method 和 useragent):

Example log with missing columns (cs-method and useragent missing):

#Fields: date time s-ip cs-uri-stem 
2013-07-16 00:00:00 10.1.15.8 /common/viewFile/1232

缺少列的日志需要像这样映射到完整模式:

The log with missing columns needs to be mapped to the full schema like this:

#Fields: date time s-ip cs-method cs-uri-stem useragent
2013-07-16 00:00:00 10.1.15.8 null /common/viewFile/1232 null

坏日志可以启用不同顺序的任意列组合.

如何根据日志文件中的字段行将可用列映射到完整架构?

How can I map the available columns to the full schema according to the Fields row within the log file?

通常,我会通过将我的列模式定义为将列名映射到索引的字典来解决这个问题.即: col['date']=0 col['time']=1 等等.然后我会从文件中读取 #Fields 行并解析出启用的列并生成标题 dict 将标题名称映射到文件中的列索引.然后对于剩余的数据行,我通过索引知道它的标题,通过 header=column name 将其映射到我的列模式,并以正确的顺序生成新行,插入带有空数据的缺失列.我的问题是我不明白如何在 hadoop 中执行此操作,因为每个地图都是单独执行的,因此我如何与每个地图共享 #Fields 信息?

Normally I would approach this by defining my column schema as a dict mapping column name to index. ie: col['date']=0 col['time']=1 etc. Then I would read the #Fields row from the file and parse out the enabled columns and generate header dict mapping header name to column index in the file. Then for remaining rows of data I know its header by index, map that to my column schema by header=column name and generate new row in correct order inserting missing columns with null data. My issue is I do not understand how to do this within hadoop since each map executes alone and therefore how can I share the #Fields information with each map?

推荐答案

您可以使用 this 将标题应用到创建地图的列.从那里你可以使用像这样的 UDF:

You can use this to apply the header to the columns creating a map. From there you can use a UDF like:

myudf.py

#!/usr/bin/python

@outputSchema('newM:map[]')
def completemap(M):
    if M is None:
        return None
    to_add = ['A', 'D', 'F']
    for item in to_add:
        if item not in M:
            M[item] = None
    return M

@outputSchema('A:chararray, B:chararray, C:chararray, D:chararray, E:chararray, F:chararray')
def completemap_v2(M):
    if M is None:
        return (None,
                None,
                None,
                None,
                None,
                None)
    return (M.get('A', None),
            M.get('B', None),
            M.get('C', None),
            M.get('D', None),
            M.get('E', None),
            M.get('F', None))

将缺失的元组添加到地图中.

To add in the missing tuples to the map.

样本输入:

csv1.in             csv2.in
-------            ---------
A|B|C               D|E|F
Hello|This|is       PLEASE|WORK|FOO
FOO|BAR|BING        OR|EVERYTHING|WILL
BANG|BOSH           BE|FOR|NAUGHT

示例脚本:

A = LOAD 'tests/csv' USING myudfs.ExampleCSVLoader('\\|') AS (M:map[]); 
B = FOREACH A GENERATE FLATTEN(myudf.completemap_v2(M));

输出:

B: {null::A: chararray,null::B: chararray,null::C: chararray,null::D: chararray,null::E: chararray,null::F: chararray}
(,,,,,)
(,,,PLEASE,WORK,FOO)
(,,,OR,EVERYTHING,WILL)
(,,,BE,FOR,NAUGHT)
(,,,,,)
(Hello,This,is,,,)
(FOO,BAR,BING,,,)
(BANG,BOSH,,,,)

这篇关于在hadoop中操作行数据以添加缺失的列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆