无法将 twitter avro 数据正确加载到 hive 表中 [英] Unable to correctly load twitter avro data into hive table

查看:33
本文介绍了无法将 twitter avro 数据正确加载到 hive 表中的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

需要你的帮助!

我正在尝试从 Twitter 获取数据,然后将其加载到 Hive 中进行分析的简单练习.虽然我能够使用flume(使用Twitter 1% firehose Source)将数据导入HDFS,也能够将数据加载到Hive表中.

I am trying a trivial exercise of getting the data from twitter and then loading it up in Hive for analysis. Though I am able to get data into HDFS using flume (using Twitter 1% firehose Source) and also able to load the data into Hive table.

但无法看到我期望在 Twitter 数据中出现的所有列,例如 user_location、user_description、user_friends_count、user_description、user_statuses_count.源自 Avro 的架构仅包含两列标题和正文.

But unable to see all the columns I have expected to be there in the twitter data like user_location, user_description, user_friends_count, user_description, user_statuses_count. The schema derived from Avro only contains two columns header and body.

以下是我完成的步骤:

1) 使用以下配置创建一个水槽代理:

1) create a flume agent with below conf:

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type =org.apache.flume.source.twitter.TwitterSource
#a1.sources.r1.type = com.cloudera.flume.source.TwitterSource
a1.sources.r1.consumerKey =XXXXXXXXXXXXXXXXXXXXXXXXXXXX
a1.sources.r1.consumerSecret =XXXXXXXXXXXXXXXXXXXXXXXXXXXX
a1.sources.r1.accessToken =XXXXXXXXXXXXXXXXXXXXXXXXXXXX
a1.sources.r1.accessTokenSecret =XXXXXXXXXXXXXXXXXXXXXXXXXXXX
a1.sources.r1.keywords = bigdata, healthcare, oozie


# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://192.168.192.128:8020/hdp/apps/2.2.0.0-2041/flume/twitter
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text

a1.sinks.k1.hdfs.inUsePrefix = _
a1.sinks.k1.hdfs.fileSuffix = .avro
# added for invalid block size error
a1.sinks.k1.serializer = avro_event

#a1.sinks.k1.deserializer.schemaType = LITERAL
# added for  exception java.io.IOException:org.apache.avro.AvroTypeException: Found Event, expecting Doc
#a1.sinks.k1.serializer.compressionCodec = snappy

a1.sinks.k1.hdfs.batchSize = 1000
a1.sinks.k1.hdfs.rollSize = 67108864
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollInterval = 30


# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 1000

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2) 从 avro 数据文件派生架构,我不知道为什么从 avro 数据文件派生的架构只有两列标题和正文:

2) Derive the schema from the avro data file, I don't have any idea why the schema derived from the avro data file only has two columns header and body:

java -jar avro-tools-1.7.7.jar getschema FlumeData.14315982                             30978.avro
{
  "type" : "record",
  "name" : "Event",
  "fields" : [ {
    "name" : "headers",
    "type" : {
      "type" : "map",
      "values" : "string"
    }
  }, {
    "name" : "body",
    "type" : "bytes"
  } ]
}

3) 运行上述代理,获取 HDFS 中的数据,找出 avro 数据的 schema 并创建一个 Hive 表:

3) Run the above agent and get the data in HDFS, find out the schema of the avro data and create a Hive table as:

    CREATE EXTERNAL TABLE TwitterData
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
WITH SERDEPROPERTIES ('avro.schema.literal'='
{
  "type" : "record",
  "name" : "Event",
  "fields" : [ {
    "name" : "headers",
    "type" : {
      "type" : "map",
      "values" : "string"
    }
  }, {
    "name" : "body",
    "type" : "bytes"
  } ]
}

')
STORED AS
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
LOCATION 'hdfs://192.168.192.128:8020/hdp/apps/2.2.0.0-2041/flume/twitter'
;

4) 描述 Hive 表:

4) Describe Hive Table:

hive> describe  twitterdata;
OK
headers                 map<string,string>      from deserializer
body                    binary                  from deserializer
Time taken: 0.472 seconds, Fetched: 2 row(s)

5) 查询表:当我查询表时,我看到body"列中的二进制数据和header"列中的实际架构信息.

5) Query the table: When I query the table I see the binary data in the 'body'column and the actual schema info in the 'header' column.

select * from twitterdata limit 1;
OK

{"type":"record","name":"Doc","doc":"adoc","fields":[{"name":"id","type":"string"},{"name":"user_friends_count","type":["int","null"]},{"name":"user_location","type":["string","null"]},{"name":"user_description","type":["string","null"]},{"name":"user_statuses_count","type":["int","null"]},{"name":"user_followers_count","type":["int","null"]},{"name":"user_name","type":["string","null"]},{"name":"user_screen_name","type":["string","null"]},{"name":"created_at","type":["string","null"]},{"name":"text","type":["string","null"]},{"name":"retweet_count","type":["long","null"]},{"name":"retweeted","type":["boolean","null"]},{"name":"in_reply_to_user_id","type":["long","null"]},{"name":"source","type":["string","null"]},{"name":"in_reply_to_status_id","type":["long","null"]},{"name":"media_url_https","type":["string","null"]},{"name":"expanded_url","type":["string","null"]}]}�1|$���)]'��G�$598792495703543808�Bあいたぁぁぁぁぁぁぁ!�~�ゆっけ0725Yukken(2015-05-14T10:10:30Z<ん?なんか意味違うわ�<a href="http://twitter.com/download/iphone" rel="nofollow">Twitter for iPhone</a>�1|$���)]'��
Time taken: 2.24 seconds, Fetched: 1 row(s)

如何创建一个包含实际模式中所有列的 hive 表,如标题"列中所示.我的意思是像 user_location、user_description、user_friends_count、user_description、user_statuses_count 这样的所有列?

How do I create a hive table with all the columns in the actual schema as shown in the 'header' column. I mean with all the columns like user_location, user_description, user_friends_count, user_description, user_statuses_count?

从 avro 数据文件派生的架构不应该包含更多列吗?

Shouldn't the schema derived from the avro data file contain more columns?

我在水槽代理 (org.apache.flume.source.twitter.TwitterSource) 中使用的水槽 avro 源有什么问题吗?

Is there any issue with the flume-avro source I used in the flume agent (org.apache.flume.source.twitter.TwitterSource)?

感谢阅读..

谢谢 Farrukh,我的错误是配置a1.sinks.k1.serializer = avro_event",我将其更改为a1.sinks.k1.serializer = text",然后我能够加载数据进入 Hive.但是现在问题是从 Hive 检索数据,这样做时出现以下错误:

Thanks Farrukh, I have done that the mistake was the configuration 'a1.sinks.k1.serializer = avro_event', I changed this to 'a1.sinks.k1.serializer = text', and I was able to load the data into Hive. But now the issue is retrieving the data from Hive, I am getting the below error while doing so:

        hive> describe twitterdata_09062015;
    OK
    id                      string                  from deserializer
    user_friends_count      int                     from deserializer
    user_location           string                  from deserializer
    user_description        string                  from deserializer
    user_statuses_count     int                     from deserializer
    user_followers_count    int                     from deserializer
    user_name               string                  from deserializer
    user_screen_name        string                  from deserializer
    created_at              string                  from deserializer
    text                    string                  from deserializer
    retweet_count           bigint                  from deserializer
    retweeted               boolean                 from deserializer
    in_reply_to_user_id     bigint                  from deserializer
    source                  string                  from deserializer
    in_reply_to_status_id   bigint                  from deserializer
    media_url_https         string                  from deserializer
    expanded_url            string                  from deserializer


select count(1) as num_rows from TwitterData_09062015; 
    Query ID = root_20150609130404_10ef21db-705a-4e94-92b7-eaa58226ee2e 
    Total jobs = 1 
    Launching Job 1 out of 1 
    Number of reduce tasks determined at compile time: 1 
    In order to change the average load for a reducer (in bytes): 
    set hive.exec.reducers.bytes.per.reducer=<number> 
    In order to limit the maximum number of reducers: 
    set hive.exec.reducers.max=<number> 
    In order to set a constant number of reducers: 
    set mapreduce.job.reduces=<number> 
    Starting Job = job_1433857038961_0003, Tracking URL = http://sandbox.hortonworks.com:8088/proxy/application_14338570 38961_0003/ 
    Kill Command = /usr/hdp/2.2.0.0-2041/hadoop/bin/hadoop job -kill job_1433857038961_0003 
    Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1 
    * 13:04:36,856 Stage-1 map = 0%, reduce = 0%

    * 13:05:09,576 Stage-1 map = 100%, reduce = 100%

    Ended Job = job_1433857038961_0003 with errors 
    Error during job, obtaining debugging information... 
    Examining task ID: task_1433857038961_0003_m_000000 (and more) from job job_1433857038961_0003

    Task with the most failures(4):

    Task ID: 
    task_1433857038961_0003_m_000000

    URL: 
    http://sandbox.hortonworks.com:8088/taskdetails.jsp?jobid=job_1433857038961_0003&tipid=task_1433857038961_0003_m_0 00000

    Diagnostic Messages for this Task: 
    Error: java.io.IOException: java.io.IOException: org.apache.avro.AvroRuntimeException: java.io.IOException: Block si ze invalid or too large for this implementation: -40 
    at org.apache.hadoop.hive.io.HiveIOExceptionHandlerChain.handleRecordReaderNextException(HiveIOExceptionHand lerChain.java:121) 

推荐答案

这里是下载推文并加载到 hive 的分步过程

Here is step by step process which used to download tweets and loaded them into hive

Flume 代理

##TwitterAgent for collecting Twitter data to Hadoop HDFS #####

TwitterAgent.sources = Twitter
TwitterAgent.channels = FileChannel
TwitterAgent.sinks = HDFS


TwitterAgent.sources.Twitter.type = org.apache.flume.source.twitter.TwitterSource
TwitterAgent.sources.Twitter.channels = FileChannel
TwitterAgent.sources.Twitter.consumerKey = *************
TwitterAgent.sources.Twitter.consumerSecret = **********
TwitterAgent.sources.Twitter.accessToken = ************
TwitterAgent.sources.Twitter.accessTokenSecret = ***********

TwitterAgent.sources.Twitter.maxBatchSize = 50000
TwitterAgent.sources.Twitter.maxBatchDurationMillis = 100000

TwitterAgent.sources.Twitter.keywords = Apache, Hadoop, Mapreduce, hadooptutorial, Hive, Hbase, MySql

TwitterAgent.sinks.HDFS.channel = FileChannel
TwitterAgent.sinks.HDFS.type = hdfs
TwitterAgent.sinks.HDFS.hdfs.path = hdfs://nn1.itbeams.com:9000/user/flume/tweets/avrotweets
TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream
# you do not need to mentioned avro format here. just mention Text
TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text
TwitterAgent.sinks.HDFS.hdfs.batchSize = 200000
TwitterAgent.sinks.HDFS.hdfs.rollSize = 0
TwitterAgent.sinks.HDFS.hdfs.rollCount = 2000000

TwitterAgent.channels.FileChannel.type = file
TwitterAgent.channels.FileChannel.checkpointDir = /var/log/flume/checkpoint/
TwitterAgent.channels.FileChannel.dataDirs = /var/log/flume/data/

我在 avsc 文件中创建了 avro 架构.创建后,将此文件放在 hadoop 中针对您的用户文件夹(如/user/youruser/).

I created avro schema in avsc file. Once you create then put this file in hadoop against your user folder like /user/youruser/.

{"type":"record",
 "name":"Doc",
 "doc":"adoc",
 "fields":[{"name":"id","type":"string"},
           {"name":"user_friends_count","type":["int","null"]},
           {"name":"user_location","type":["string","null"]},
           {"name":"user_description","type":["string","null"]},
           {"name":"user_statuses_count","type":["int","null"]},
           {"name":"user_followers_count","type":["int","null"]},
           {"name":"user_name","type":["string","null"]},
           {"name":"user_screen_name","type":["string","null"]},
           {"name":"created_at","type":["string","null"]},
           {"name":"text","type":["string","null"]},
           {"name":"retweet_count","type":["long","null"]},
           {"name":"retweeted","type":["boolean","null"]},
           {"name":"in_reply_to_user_id","type":["long","null"]},
           {"name":"source","type":["string","null"]},
           {"name":"in_reply_to_status_id","type":["long","null"]},
           {"name":"media_url_https","type":["string","null"]},
           {"name":"expanded_url","type":["string","null"]}

在 hive 表中加载推文.如果您将代码保存在 hql 文件中,那就太好了.

Loaded tweets in hive table. If you save code in hql file that would be great.

CREATE TABLE tweetsavro
  ROW FORMAT SERDE
     'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
  STORED AS INPUTFORMAT
     'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
  OUTPUTFORMAT
     'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
  TBLPROPERTIES ('avro.schema.url'='hdfs:///user/youruser/examples/schema/twitteravroschema.avsc') ;

LOAD DATA INPATH '/user/flume/tweets/avrotweets/FlumeData.*' OVERWRITE INTO TABLE tweetsavro;

hive 中的 tweetsavro 表

tweetsavro table in hive

hive> describe tweetsavro;
OK
id                      string                  from deserializer
user_friends_count      int                     from deserializer
user_location           string                  from deserializer
user_description        string                  from deserializer
user_statuses_count     int                     from deserializer
user_followers_count    int                     from deserializer
user_name               string                  from deserializer
user_screen_name        string                  from deserializer
created_at              string                  from deserializer
text                    string                  from deserializer
retweet_count           bigint                  from deserializer
retweeted               boolean                 from deserializer
in_reply_to_user_id     bigint                  from deserializer
source                  string                  from deserializer
in_reply_to_status_id   bigint                  from deserializer
media_url_https         string                  from deserializer
expanded_url            string                  from deserializer
Time taken: 0.6 seconds, Fetched: 17 row(s)

这篇关于无法将 twitter avro 数据正确加载到 hive 表中的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆