AttributeError: 'module' 对象没有属性 'ensure_str' [英] AttributeError: 'module' object has no attribute 'ensure_str'

查看:32
本文介绍了AttributeError: 'module' 对象没有属性 'ensure_str'的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我尝试通过 Beam 将数据从一个 bigquery 传输到另一个 bigquery,但是出现以下错误:

I try to transfer data from one bigquery to anther through Beam, however, the following error comes up:

WARNING:root:Retry with exponential backoff: waiting for 4.12307941111 seconds before retrying get_query_location because we caught exception: AttributeError: 'module' object has no attribute 'ensure_str'
 Traceback for above exception (most recent call last):
  File "/usr/local/lib/python2.7/site-packages/apache_beam/utils/retry.py", line 197, in wrapper
    return fun(*args, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 261, in get_query_location
    response = self.client.jobs.Insert(request)
  File "/usr/local/lib/python2.7/site-packages/apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_client.py", line 342, in Insert
    upload=upload, upload_config=upload_config)
  File "/usr/local/lib/python2.7/site-packages/apitools/base/py/base_api.py", line 703, in _RunMethod
    download)
  File "/usr/local/lib/python2.7/site-packages/apitools/base/py/base_api.py", line 674, in PrepareHttpRequest
    method_config.query_params, request, global_params)
  File "/usr/local/lib/python2.7/site-packages/apitools/base/py/base_api.py", line 551, in __ConstructQueryParams
    global_params, self.__client.global_params)
  File "/usr/local/lib/python2.7/site-packages/apitools/base/py/base_api.py", line 357, in global_params
    return encoding.CopyProtoMessage(self._default_global_params)
  File "/usr/local/lib/python2.7/site-packages/apitools/base/py/encoding_helper.py", line 112, in CopyProtoMessage
    return JsonToMessage(type(message), MessageToJson(message))
  File "/usr/local/lib/python2.7/site-packages/apitools/base/py/encoding_helper.py", line 123, in JsonToMessage
    return _ProtoJsonApiTools.Get().decode_message(message_type, message)
  File "/usr/local/lib/python2.7/site-packages/apitools/base/py/encoding_helper.py", line 309, in decode_message
    message_type, result)
  File "/usr/local/lib/python2.7/site-packages/apitools/base/protorpclite/protojson.py", line 209, in decode_message
    encoded_message = six.ensure_str(encoded_message)

这是我的代码:

class SplitBDoFn(beam.DoFn):
  word_tag = 'word_tag'
  def process(self, element):
    if element:
      yield pvalue.TaggedOutput(self.word_tag, element)

def run(argv=None):
  parser = argparse.ArgumentParser()
  known_args, pipeline_args = parser.parse_known_args(argv)
  pipeline_args.extend([
      '--runner=DirectRunner', 
      '--project=myproject',
      '--gcs_location=US',
      '--staging_location=gs://test-bucket/stage',
      '--temp_location=gs://test-bucket/temp',
      '--job_name=test-job',
  ])

  pipeline_options = PipelineOptions(pipeline_args)
  pipeline_options.view_as(SetupOptions).save_main_session = True
  pipeline_options.view_as(StandardOptions).streaming = True
  with beam.Pipeline(options = pipeline_options) as p:
    bq_source = beam.io.BigQuerySource(query = 'select * from myproject:raw_data.events where utc_date = "2019-07-20"')
    bq_data = p | beam.io.Read(bq_source)

    multiple_lines = (
        bq_data
        | 'SplitBDoFn' >> (beam.ParDo(SplitBDoFn()).with_outputs(
                                      SplitBDoFn.word_tag)))

    word_tag = multiple_lines.word_tag

    (word_tag
        | "output_word_tag" >> beam.io.WriteToBigQuery(
                              table = 'test',
                              dataset = 'temp',
                              project = 'myproject',
                              schema = data_schema,
                              # validate = True,
                              write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND,
                              create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED
                        ))

梁版本:2.13.0

之前有人遇到过这个问题吗?或者我的代码有什么错误?

Could someone meet this issue before? or any mistake in my code?

推荐答案

看起来 ensure_str 在其 1.12.0 版本中被添加到六个,并且应该通过 apitools.

It looks like ensure_str was added to six in their 1.12.0 version, and that should be pooled in via apitools.

我怀疑根本原因是您的虚拟环境中安装了旧版本的 6(1.11 或更旧).您可以在再次尝试管道之前尝试创建一个新的 virtualenv,或者运行 快速启动示例?

I suspect the root cause is that you have an older version of six (1.11 or older) installed in your virtualenvironment. Can you try creating a new virtualenv before trying your pipeline again, or running the quick-start example?

这篇关于AttributeError: 'module' 对象没有属性 'ensure_str'的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆