使用Google Cloud DataFlow python sdk读取一组xml文件 [英] Read a set of xml files using Google Cloud DataFlow python sdk

查看:52
本文介绍了使用Google Cloud DataFlow python sdk读取一组xml文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试从GCS存储桶中读取XML文件的集合并对其进行处理,其中集合中的每个元素都是代表整个文件的字符串,但是我找不到如何实现此目标的示例我可以从Apache Beam文档中了解它,该文档主要是关于Java版本的.

I'm trying to read a collection of XML files from a GCS bucket and process them where each element in the collection is a string representing the whole file but I can't find a decent example on how to accomplish this, nor can I understand it from the Apache Beam documentation which is mainly about the Java version.

我当前的管道如下:

p = beam.Pipeline(options=PipelineOptions(pipeline_args))

(p
 | 'Read from a File' >> beam.io.Read(training_files_folder)
 | 'String To BigQuery Row' >> beam.Map(lambda s:
                                        data_ingestion.parse_method(s))
 | 'Write to BigQuery' >> beam.io.Write(
            beam.io.BigQuerySink(
                known_args.output,
                schema='title:STRING,text:STRING,id:STRING',
                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
p.run().wait_until_finish()

我收到的错误消息是:

File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.2.1\helpers\pydev\pydevd.py", line 1664, in <module>
main()

File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.2.1\helpers\pydev\pydevd.py", line 1658, in main
globals = debugger.run(setup['file'], None, None, is_module)

File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.2.1\helpers\pydev\pydevd.py", line 1068, in run
pydev_imports.execfile(file, globals, locals)  # execute the script

File "C:/Users/Tomer/PycharmProjects/hyperpartisan/cloud-version/data_ingestion.py", line 135, in <module>
run()

File "C:/Users/Tomer/PycharmProjects/hyperpartisan/cloud-version/data_ingestion.py", line 130, in run
p.run().wait_until_finish()
File "C:\Users\Tomer\anaconda\envs\hyperpartisan\lib\site-packages\apache_beam\runners\direct\direct_runner.py", line 421, in wait_until_finish
self._executor.await_completion()
File "C:\Users\Tomer\anaconda\envs\hyperpartisan\lib\site-packages\apache_beam\runners\direct\executor.py", line 398, in await_completion
self._executor.await_completion()
File "C:\Users\Tomer\anaconda\envs\hyperpartisan\lib\site-packages\apache_beam\runners\direct\executor.py", line 444, in await_completion
six.reraise(t, v, tb)
File "C:\Users\Tomer\anaconda\envs\hyperpartisan\lib\site-packages\apache_beam\runners\direct\executor.py", line 341, in call
finish_state)
File "C:\Users\Tomer\anaconda\envs\hyperpartisan\lib\site-packages\apache_beam\runners\direct\executor.py", line 366, in attempt_call
side_input_values)
File "C:\Users\Tomer\anaconda\envs\hyperpartisan\lib\site-packages\apache_beam\runners\direct\transform_evaluator.py", line 109, in get_evaluator
input_committed_bundle, side_inputs)
File "C:\Users\Tomer\anaconda\envs\hyperpartisan\lib\site-packages\apache_beam\runners\direct\transform_evaluator.py", line 283, in __init__
self._source.pipeline_options = evaluation_context.pipeline_options
AttributeError: 'str' object has no attribute 'pipeline_options'

我们非常感谢您的协助. 谢谢 墨粉

Any assistance is much appreciated. Thanks Tomer

解决了第一个问题:事实证明这不适用于DirectRunner,将运行器更改为DataFlowRunner并将 Read 替换为 ReadFromText 解决了以下异常:

Solved the first issue: turns out this doesn't work with the DirectRunner, changing the runner to DataFlowRunner and replacing Read with ReadFromText solved the exception:

p = beam.Pipeline(options = PipelineOptions(pipeline_args))

p = beam.Pipeline(options=PipelineOptions(pipeline_args))

(p
 | 'Read from a File' >> beam.io.ReadFromText(training_files_folder)
 | 'String To BigQuery Row' >> beam.Map(lambda s:
                                        data_ingestion.parse_method(s))
 | 'Write to BigQuery' >> beam.io.Write(
            beam.io.BigQuerySink(
                known_args.output,
                schema='title:STRING,text:STRING,id:STRING',
                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
p.run().wait_until_finish() 

但是现在我看到这种方法给我每个文件中的一行作为管道元素,而我希望将整个文件作为字符串作为每个元素. 不知道该怎么做.我发现了这篇文章,但它在Java中并且不确定它在python和gcs版本中是如何工作的.

But now I saw that this approach gives me a line from each file as the pipeline element whereas I wanted to have the whole file as a string as each element. Not sure how to do that. I found this post but it's in java and not sure how it works with python at all and gcs version specifically.

因此,似乎ReadFromText在我的用例中不起作用,否则我不知道如何创建文件管道.

So looks like the ReadFromText won't work for my usecase and I have no idea how to create a pipeline of files otherwise.

解决方案: 多亏了Ankur的帮助,我修改了代码,以包括从MatchResult对象列表转换所需的步骤,这是GCSFileSystem返回的字符串pCollection的结果,每个字符串代表一个文件.

Solution: Thanks to the assist from Ankur, I revised the code to include the required steps to convert from a list of MatchResult objects which is what the GCSFileSystem Returns to a pCollection of Strings, each representing one file.

p = beam.Pipeline(options=PipelineOptions(pipeline_args))
gcs = GCSFileSystem(PipelineOptions(pipeline_args))
gcs_reader = GCSFileReader(gcs)

(p
 | 'Read Files' >> beam.Create([m.metadata_list for m in gcs.match([training_files_folder])])
 | 'metadata_list to filepath' >> beam.FlatMap(lambda metadata_list: [metadata.path for metadata in metadata_list])
 | 'string To BigQuery Row' >> beam.Map(lambda filepath:
                                        data_ingestion.parse_method(gcs_reader.get_string_from_filepath(filepath)))
 | 'Write to BigQuery' >> beam.io.Write(
            beam.io.BigQuerySink(
                known_args.output,
                schema='title:STRING,text:STRING,id:STRING',
                # Creates the table in BigQuery if it does not yet exist.
                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                # Appends data to the BigQuery table
                write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)))
p.run().wait_until_finish()

代码使用此帮助程序类读取gcs文件:

The code uses this helper class to read the gcs files:

class GCSFileReader:
  """Helper class to read gcs files"""
  def __init__(self, gcs):
      self.gcs = gcs

  def get_string_from_filepath(self,filepath):
      with self.gcs.open(filepath) as reader:
          res = reader.read()

      return res

推荐答案

ReadFromText在给定路径中逐行读取文件. 您想要的是文件列表,然后使用GcsFileSystem在ParDo中一次读取一个文件 https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/gcsfilesystem.py 然后将内容写入BigQuery.

ReadFromText reads the files line by line in the given path. What you want is a list of file and then read the one file at a time in ParDo using GcsFileSystem https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/gcsfilesystem.py and then write the contents to BigQuery.

您还可以参考类似主题

You can also refer to mail thread on similar topic https://lists.apache.org/thread.html/85da22a845cef8edd942fcc4906a7b47040a4ae8e10aef4ef00be233@%3Cuser.beam.apache.org%3E

这篇关于使用Google Cloud DataFlow python sdk读取一组xml文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆