用于Google App Engine数据存储到BigQuery过程的UnicodeEncodeError [英] UnicodeEncodeError for Google App Engine Datastore to BigQuery process

查看:116
本文介绍了用于Google App Engine数据存储到BigQuery过程的UnicodeEncodeError的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图遵循此Codelab ,告诉您如何使用来自Google App Engine数据存储区的数据,并通过设置MapReduce管道将其移至Google Cloud Storage和BigQuery中。我建立了一个Google App Engine数据存储实体,并且有一个过程来收集关于某些我想收集数据的股票的推文。我相信我已经遵循了示例中列出的所有内容,但是分解数据并将其加载到云存储中的所有工作的碎片引发了UnicodeEncodeErrors。以下是我在开发应用程序服务器上测试应用程序的日志:

  INFO 2012-12-18 20:41:07,645 dev_appserver.py:3103]POST / mapreduce / worker_callback HTTP / 1.1500  -  
警告2012-12-18 20:41:07,648 taskqueue_stub.py:1981]任务appengine-mrshard-1582400592541472B07B9-0-0失败执行。此任务将在0.100秒后重试
ERROR 2012-12-18 20:41:09,453 webapp2.py:1552]'ascii'编解码器无法在位置80编码字符u'\\\’':序号不在范围(128)
Traceback(最近一次调用最后一次):
文件C:\程序文件(x86)\Google\google_appengine\lib\webapp2\webapp2.py,第1535行,在__call__
rv = self.handle_exception(request,response,e)
文件C:\程序文件(x86)\Google\google_appengine\lib\webapp2\ webapp2.py,第1529行,在__call__
rv = self.router.dispatch(request,response)
文件C:\程序文件(x86)\Google\google_appengine\lib \webapp2\webapp2.py,第1278行,在default_dispatcher
中返回route.handler_adapter(请求,响应)
文件C:\程序文件(x86)\Google\google_appengine\\ \\ lib\webapp2\webapp2.py,第1102行,在__call__
返回handler.dispatch()
文件C:\ Pr ogram文件(x86)\Google\google_appengine\lib\webapp2\webapp2.py,第572行,发送
返回self.handle_exception(e,self.app.debug)
文件C:\程序文件(x86)\Google\google_appengine\lib\webapp2\webapp2.py,第570行,发送
返回方法(* args,** kwargs)
文件C:\ Users \Tank\Documents\Aptana Studio 3 Workspace\jibdantest-bq\mapreduce\base_handler.py,第65行,后
self.handle( )
文件C:\ Users \Tank\Documents\Aptana Studio 3 Workspace\jibdantest-bq\mapreduce\handlers.py,第181行,处理
实体, input_reader,ctx,tstate)
文件C:\ Users \Tank\Documents\Aptana Studio 3 Workspace\jibdantest-bq\mapreduce\handlers.py,第298行,在process_data
output_writer.write(输出,ctx)
文件C:\ Users \Tank\Documents\Aptana Studio 3 Workspace\jibdantest -bq\mapreduce\output_writers.py,第659行,写入
ctx.get_pool(file_pool)。append(self._filename,str(data))
UnicodeEncodeError:'ascii'编解码器不能在位置80编码字符u'\\\’':序号不在范围内(128)

以下是代码:

  import json 
import webapp2
import urllib2
import time
导入日历
导入日期时间
从google.appengine.ext导入httplib2

从google.appengine.api导入db
导入任务队列
from google.appengine.ext从google.appengine.ext.webapp.util导入blobstore
从google.appengine.ext.webapp导入run_wsgi_app
从google.appengine.ext.webapp导入blobstore_handlers
从google.appengine.ext.webapp导入模板导入util
从google.appengine.api导入模板
从mapreduce.lib导入urlfetch
$ b $从mapreduce导入文件
import base_handler从mapreduce导入
mapreduce_pipeline
from apiclient.discovery import build $ b $ from oauth2client.appengine import AppAssertionCredentials

SCOPE ='https://www.googleapis.com/auth/bigquery'
PROJECT_ID ='project_id'#您的项目ID在这里
BQ_DATASET_ID ='datastore_data'
GS_BUCKET ='bucketname'
ENTITY_KIND ='main.streamdata'

class streamdata db.Model):
querydate = db.DateTimeProperty(auto_now_add = True)
ticker = db.StringProperty()
created_at = db.StringProperty()
tweet_id = db.StringProperty ()
text = db.TextProperty()
source = db.StringProperty()
$ b $ class DatastoreMapperPipeline(base_handler.PipelineBase):

def run (self,entity_type):

output = yield mapreduce_pipeline.MapperPipeline(
Datastore Mapper%s%entity_type,
main.datastore_map,
mapreduce .input_readers.DatastoreInputReader,
output_writer_spec =mapreduce.output_writers.FileOutputWriter,
params = {
input_reader:{
entity_kind:entity_type,
},
output_writer: {
filesystem:gs,
gs_bucket_name:GS_BUCKET,
output_sharding:none,
}
},
shards = 10)

产生CloudStorageToBigQuery(输出)
$ b $ class CloudStorageToBigQuery(base_handler.PipelineBase):

def run(self,csv_output):

credentials = AppAssertionCredentials(scope = SCOPE)
http = credentials.authorize(httplib2.Http())
bigquery_service = build(bigquery,v2,http = http )

jobs = bigquery_service.jobs()
table_name ='datastore_data_%s'%datetime.datetime.utcnow()。strftime(
'%m%d%Y_% H%M% S')
files = [str(f.replace('/ gs /','gs://'))for f in csv_output]
result = jobs.insert(projectId = PROJECT_ID,
$ body $ build_job_data(table_name,files))

result.execute()

def build_job_data(table_name,files):
return {projectId :PROJECT_ID,
configuration:{
load:{
sourceUris:files,
schema:{
fields:[
{
name:querydate,
type:INTEGER,
},
{
name:ticker ,
type:STRING,
},
{
name:created_at,
type:S TRING,
},
{
name:tweet_id,
type:STRING,
},
{ name:text,
type:TEXT,
},
{
name:source,
type: STRING,
}
]
},
destinationTable:{
projectId:PROJECT_ID,
datasetId:BQ_DATASET_ID,
tableId:table_name,
},
maxBadRecords:0,
}
}
}

def datastore_map (entity_type):
data = db.to_dict(entity_类型)
resultlist = [timestamp_to_posix(data.get('querydate')),
data.get('ticker'),
data.get('created_at'),
data.get('tweet_id'),
data.get('text'),
data.get('source')]
result =','。join(['' %s结果列表中字段的%字段])
yield(%s\\\
%result)

def timestamp_to_posix(timestamp):
return int time.mktime(timestamp.timetuple()))
$ b $ class DatastoretoBigQueryStart(webapp2.RequestHandler):
def get(self):
pipeline = DatastoreMapperPipeline(ENTITY_KIND)
pipeline.start()
path = pipeline.base_path +/ status?root =+ pipeline.pipeline_id
self.redirect(路径)
$ b $ class StreamHandler(webapp2。 RequestHandler):

def get(self):

tickers = ['AAPL','GOOG','IBM','BAC',' INTC',
'DELL','C','JPM','WFM','WMT',
'AMZN','HOT','SPG','SWY','HTSI' ,
'DUK','CEG','XOM','F','WFC',
'CSCO','UAL','LUV','DAL','成本',' YUM',
'TLT','HYG','JNK','LQD','MSFT',
'GE','LVS','MGM','TWX','DIS' ,'CMCSA',
'TWC','ORCL','WPO','NYT','GM','JCP',
'LNKD','OPEN','NFLX' SBUX','GMCR',
'SPLS','BBY','BBBY','YHOO','MAR',
'L','LOW','HD','HOV' ,'TOL','NVR','RYL',
'GIS','K','POST','KRFT','CHK','GGP',
'RSE',' RW ('t','AIG','CB','BRK.A','CAT']

(b) //search.twitter.com/search.json?q='
resultcount ='& rpp = 100'
language ='& lang = en'
encoding ='%40 %24'
tickerstring = url + encoding + i + resultcount +语言
tickurl = urllib2.Request(tickerstring)
tweets = urllib2.urlopen(tickurl)
code = tweets。 getcode()

if code == 200:
results = json.load(tweets,'utf-8')
如果结果中包含results:
entries = results [results]
用于输入条目:
tweet = streamdata()
created = entry ['created_at']
tweetid = entry ['id_str']
tweettxt =条目[文字']
tweet.ticker = i
tweet.created_at =创建
tweet.tweet_id = tweetid
tweet.text = tweettxt
tweet.source =Twitter
tweet.put()

class MainHandler(webapp2.RequestHandler):

def get(self):
self.response.out.write ('< a href =/ start>点击此处< / a>启动数据存储到BigQuery管道。 ')
self.response.out.write('< a href =/ add_data>点击此处< / a>开始向数据存储添加数据')


app = webapp2.WSGIApplication([
('/',MainHandler),
('/ start',DatastoretoBigQueryStart),
('/ add_data',StreamHandler)],
debug = True)

任何人可能拥有的见解都会对你有很大的帮助。 p>

非常感谢。

解决方案

您正在将Unicode数据转换为字节串:

  ctx.get_pool(file_pool)。append(self._filename,str(data))

当你这样做而不指定编码时,Python会回到默认值,即ASCII。您需要解决一个不同的编码问题,一个可以处理您的数据包含的所有Unicode代码点。



对于大多数文本来说,UTF-8是一个很好的选择那;如果你有很多非西方文本(阿拉伯文,亚洲文等),那么UTF-16可能更有效率。无论哪种情况,您都必须显式编码:

  ctx.get_pool(file_pool)。append(self._filename ,data.encode('utf8'))

从该文件读回数据时,使用 filedata.decode('utf8')解码为Unicode。



请参阅 Python Unicode HOWTO 了解Python和Unicode的更多信息。


I'm trying to follow along with this Codelab that shows you how to take data from your Google App Engine Datastore and move it through Google Cloud Storage and on to BigQuery by setting up a MapReduce pipeline. I set up a Google App Engine Datastore entity and have a process to collect tweets about certain stocks that I want to collect data on just as a test. I believe I've followed everything as was outlined in the example, but the shards that do all the work of breaking up the data and loading it into Cloud Storage are raising UnicodeEncodeErrors. Here's the log from where I tested the app on the dev app server:

INFO     2012-12-18 20:41:07,645 dev_appserver.py:3103] "POST /mapreduce/worker_callback HTTP/1.1" 500 -
WARNING  2012-12-18 20:41:07,648 taskqueue_stub.py:1981] Task appengine-mrshard-1582400592541472B07B9-0-0 failed to execute. This task will retry in 0.100 seconds
ERROR    2012-12-18 20:41:09,453 webapp2.py:1552] 'ascii' codec can't encode character u'\u2019' in position 80: ordinal not in range(128)
Traceback (most recent call last):
File "C:\Program Files (x86)\Google\google_appengine\lib\webapp2\webapp2.py", line 1535, in __call__
rv = self.handle_exception(request, response, e)
File "C:\Program Files (x86)\Google\google_appengine\lib\webapp2\webapp2.py", line 1529, in __call__
rv = self.router.dispatch(request, response)
File "C:\Program Files (x86)\Google\google_appengine\lib\webapp2\webapp2.py", line 1278, in default_dispatcher
return route.handler_adapter(request, response)
File "C:\Program Files (x86)\Google\google_appengine\lib\webapp2\webapp2.py", line 1102, in __call__
return handler.dispatch()
File "C:\Program Files (x86)\Google\google_appengine\lib\webapp2\webapp2.py", line 572, in dispatch
return self.handle_exception(e, self.app.debug)
File "C:\Program Files (x86)\Google\google_appengine\lib\webapp2\webapp2.py", line 570, in dispatch
return method(*args, **kwargs)
File "C:\Users\Tank\Documents\Aptana Studio 3 Workspace\jibdantest-bq\mapreduce\base_handler.py", line 65, in post
self.handle()
File "C:\Users\Tank\Documents\Aptana Studio 3 Workspace\jibdantest-bq\mapreduce\handlers.py", line 181, in handle
entity, input_reader, ctx, tstate)
File "C:\Users\Tank\Documents\Aptana Studio 3 Workspace\jibdantest-bq\mapreduce\handlers.py", line 298, in process_data
output_writer.write(output, ctx)
File "C:\Users\Tank\Documents\Aptana Studio 3 Workspace\jibdantest-bq\mapreduce\output_writers.py", line 659, in write
ctx.get_pool("file_pool").append(self._filename, str(data))
UnicodeEncodeError: 'ascii' codec can't encode character u'\u2019' in position 80: ordinal not in range(128)

Here's the code:

import json
import webapp2
import urllib2
import time
import calendar
import datetime
import httplib2

from google.appengine.ext import db
from google.appengine.api import taskqueue
from google.appengine.ext import blobstore
from google.appengine.ext.webapp.util import run_wsgi_app
from google.appengine.ext.webapp import blobstore_handlers
from google.appengine.ext.webapp import util
from google.appengine.ext.webapp import template
from google.appengine.api import urlfetch

from mapreduce.lib import files
from mapreduce import base_handler
from mapreduce import mapreduce_pipeline
from apiclient.discovery import build
from oauth2client.appengine import AppAssertionCredentials

SCOPE = 'https://www.googleapis.com/auth/bigquery'
PROJECT_ID = 'project_id' # Your Project ID here
BQ_DATASET_ID = 'datastore_data'
GS_BUCKET = 'bucketname'
ENTITY_KIND = 'main.streamdata'

class streamdata(db.Model):
    querydate = db.DateTimeProperty(auto_now_add = True)
    ticker = db.StringProperty()
    created_at = db.StringProperty()
    tweet_id = db.StringProperty()
    text = db.TextProperty()
    source = db.StringProperty()

class DatastoreMapperPipeline(base_handler.PipelineBase):

    def run(self, entity_type):

        output = yield mapreduce_pipeline.MapperPipeline(
          "Datastore Mapper %s" % entity_type,
          "main.datastore_map",
          "mapreduce.input_readers.DatastoreInputReader",
          output_writer_spec="mapreduce.output_writers.FileOutputWriter",
          params={
              "input_reader":{
                  "entity_kind": entity_type,
                  },
              "output_writer":{
                  "filesystem": "gs",
                  "gs_bucket_name": GS_BUCKET,
                  "output_sharding":"none",
                  }
              },
              shards=10)

        yield CloudStorageToBigQuery(output)

class CloudStorageToBigQuery(base_handler.PipelineBase):

    def run(self, csv_output):

        credentials = AppAssertionCredentials(scope=SCOPE)
        http = credentials.authorize(httplib2.Http())
        bigquery_service = build("bigquery", "v2", http=http)

        jobs = bigquery_service.jobs()
        table_name = 'datastore_data_%s' % datetime.datetime.utcnow().strftime(
            '%m%d%Y_%H%M%S')
        files = [str(f.replace('/gs/', 'gs://')) for f in csv_output]
        result = jobs.insert(projectId=PROJECT_ID,
                            body=build_job_data(table_name,files))

        result.execute()

def build_job_data(table_name, files):
  return {"projectId": PROJECT_ID,
          "configuration":{
              "load": {
                  "sourceUris": files,
                  "schema":{
                      "fields":[
                          {
                              "name":"querydate",
                              "type":"INTEGER",
                          },
                          {
                              "name":"ticker",
                              "type":"STRING",
                          },
                          {
                              "name":"created_at",
                              "type":"STRING",
                          },
                          {
                              "name":"tweet_id",
                              "type":"STRING",
                          },
                          {   "name":"text",
                              "type":"TEXT",
                          },
                          {    
                              "name":"source",
                              "type":"STRING",
                          }
                          ]
                      },
                  "destinationTable":{
                      "projectId": PROJECT_ID,
                      "datasetId": BQ_DATASET_ID,
                      "tableId": table_name,
                      },
                  "maxBadRecords": 0,
                  }
              }
          }

def datastore_map(entity_type):
    data = db.to_dict(entity_type)
    resultlist = [timestamp_to_posix(data.get('querydate')),
                    data.get('ticker'),
                    data.get('created_at'),
                    data.get('tweet_id'),
                    data.get('text'),
                    data.get('source')]
    result = ','.join(['"%s"' % field for field in resultlist])
    yield("%s\n" % result)

def timestamp_to_posix(timestamp):
    return int(time.mktime(timestamp.timetuple()))

class DatastoretoBigQueryStart(webapp2.RequestHandler):
    def get(self):
        pipeline = DatastoreMapperPipeline(ENTITY_KIND)
        pipeline.start()
        path = pipeline.base_path + "/status?root=" + pipeline.pipeline_id
        self.redirect(path)

class StreamHandler(webapp2.RequestHandler):

    def get(self):

        tickers = ['AAPL','GOOG', 'IBM', 'BAC', 'INTC',
                   'DELL', 'C', 'JPM', 'WFM', 'WMT', 
                   'AMZN', 'HOT', 'SPG', 'SWY', 'HTSI', 
                   'DUK', 'CEG', 'XOM', 'F', 'WFC', 
                   'CSCO', 'UAL', 'LUV', 'DAL', 'COST', 'YUM',
                   'TLT', 'HYG', 'JNK', 'LQD', 'MSFT',
                   'GE', 'LVS', 'MGM', 'TWX', 'DIS', 'CMCSA',
                   'TWC', 'ORCL', 'WPO', 'NYT', 'GM', 'JCP', 
                   'LNKD', 'OPEN', 'NFLX', 'SBUX', 'GMCR', 
                   'SPLS', 'BBY', 'BBBY', 'YHOO', 'MAR', 
                   'L', 'LOW', 'HD', 'HOV', 'TOL', 'NVR', 'RYL', 
                   'GIS', 'K', 'POST', 'KRFT', 'CHK', 'GGP', 
                   'RSE', 'RWT', 'AIG', 'CB', 'BRK.A', 'CAT']

        for i in set(tickers):

            url = 'http://search.twitter.com/search.json?q='
            resultcount = '&rpp=100'
            language = '&lang=en'
            encoding = '%40%24'
            tickerstring = url + encoding + i + resultcount + language
            tickurl = urllib2.Request(tickerstring)
            tweets = urllib2.urlopen(tickurl)
            code = tweets.getcode()

            if code == 200:
                results = json.load(tweets, 'utf-8')
                if "results" in results:
                    entries = results["results"]
                    for entry in entries:
                        tweet = streamdata()
                        created = entry['created_at']
                        tweetid = entry['id_str']
                        tweettxt = entry['text']
                        tweet.ticker = i
                        tweet.created_at = created
                        tweet.tweet_id = tweetid
                        tweet.text = tweettxt
                        tweet.source = "Twitter"
                        tweet.put()

class MainHandler(webapp2.RequestHandler):

    def get(self):
        self.response.out.write('<a href="/start">Click here</a> to start the Datastore to BigQuery pipeline. ')
        self.response.out.write('<a href="/add_data">Click here</a> to start adding data to the datastore. ')


app = webapp2.WSGIApplication([
                               ('/', MainHandler),
                               ('/start', DatastoretoBigQueryStart), 
                               ('/add_data', StreamHandler)], 
                              debug=True)

Any insights anyone may have would be a big help.

Many Thanks.

解决方案

You are converting Unicode data to a bytestring:

ctx.get_pool("file_pool").append(self._filename, str(data))

When you do that without specifying an encoding, Python falls back to the default, which is ASCII. You'll need to settle on a different encoding instead, one that can handle all Unicode codepoints your data contains.

For most text, UTF-8 is a good choice for that; if you have a lot of non-western text (Arabic, Asian, etc.) then UTF-16 might be more efficient. In either case, you'll have to explicitly encode:

ctx.get_pool("file_pool").append(self._filename, data.encode('utf8'))

When reading back the data from that file, use filedata.decode('utf8') to decode back to Unicode.

See the Python Unicode HOWTO for more information on Python and Unicode.

这篇关于用于Google App Engine数据存储到BigQuery过程的UnicodeEncodeError的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆