当我添加requirements.txt [Python]时,数据流失败 [英] Dataflow fails when I add requirements.txt [Python]

查看:92
本文介绍了当我添加requirements.txt [Python]时,数据流失败的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

因此,当我尝试使用DataflowRunner运行数据流并包含如下所示的requirements.txt时

So when I try to run dataflow with the DataflowRunner and include the requirements.txt which looks like this

google-cloud-storage==1.28.1
pandas==1.0.3
smart-open==2.0.0

每次在此行都失败

INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS upload to gs://..../beamapp-.../numpy-1.18.2.zip...
Traceback (most recent call last):
File "Database.py", line 107, in <module>
run()
File "Database.py", line 101, in run
| 'Write CSV' >> beam.ParDo(WriteCSVFIle(options.output_bucket, 
pandora_options.output_folder))
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/pipeline.py", line 503, in __exit__
    self.run().wait_until_finish()
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/pipeline.py", line 483, in run
    self._options).run(False)
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/pipeline.py", line 496, in run
    return self.runner.run_pipeline(self, self._options)
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.py", line 548, in run_pipeline
    self.dataflow_client.create_job(self.job), self)
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/utils/retry.py", line 234, in wrapper
    return fun(*args, **kwargs)
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/runners/dataflow/internal/apiclient.py", line 624, in create_job
    self.create_job_description(job)
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/runners/dataflow/internal/apiclient.py", line 680, in create_job_description
    resources = self._stage_resources(job.options)
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/runners/dataflow/internal/apiclient.py", line 577, in _stage_resources
    staging_location=google_cloud_options.staging_location)
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/runners/portability/stager.py", line 182, in stage_job_resources
    pkg, FileSystems.join(staging_location, os.path.basename(pkg)))
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/runners/dataflow/internal/apiclient.py", line 942, in stage_artifact
    local_path_to_artifact, artifact_name)
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/utils/retry.py", line 234, in wrapper
    return fun(*args, **kwargs)
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/runners/dataflow/internal/apiclient.py", line 564, in _gcs_file_copy
    self.stage_file(to_folder, to_name, f, total_size=total_size)
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/runners/dataflow/internal/apiclient.py", line 602, in stage_file
    response = self._storage_client.objects.Insert(request, upload=upload)
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py", line 1156, in Insert
    upload=upload, upload_config=upload_config)
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/base_api.py", line 715, in _RunMethod
    http_request, client=self.client)
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/transfer.py", line 908, in InitializeUpload
    return self.StreamInChunks()
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/transfer.py", line 1020, in StreamInChunks
    additional_headers=additional_headers)
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/transfer.py", line 957, in __StreamMedia
    response = send_func(self.stream.tell())
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/transfer.py", line 943, in CallSendChunk
    start, additional_headers=additional_headers)
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/transfer.py", line 1120, in __SendChunk
    return self.__SendMediaRequest(request, end)
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/transfer.py", line 1033, in __SendMediaRequest
    retries=self.num_retries, check_response_func=CheckResponse)
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/http_wrapper.py", line 356, in MakeRequest
    max_retry_wait, total_wait_sec))
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/http_wrapper.py", line 304, in HandleExceptionsAndRebuildHttpConnections
    raise retry_args.exc
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/http_wrapper.py", line 346, in MakeRequest
    check_response_func=check_response_func)
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/http_wrapper.py", line 396, in _MakeRequestNoRetry
    redirections=redirections, connection_type=connection_type)
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/oauth2client/transport.py", line 169, in new_request
    redirections, connection_type)
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/oauth2client/transport.py", line 169, in new_request
    redirections, connection_type)
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/httplib2/__init__.py", line 1991, in request
    cachekey,
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/httplib2/__init__.py", line 1690, in _request
    content,
httplib2.RedirectMissingLocation: Redirected but the response is missing a Location: header.

这是我正在运行的命令

python Database.py     
--runner DataflowRunner     
--project XXX     
--staging_location gs://.../staging     
--temp_location gs://.../temp     
--template_location gs://.../Template     
--requirements_file requirements.txt

如果我删除--requirements_file requirements.txt,它会完成,但是当我尝试运行该作业时,它会失败,因为找不到包.

if I remove the --requirements_file requirements.txt it finishes but when I try to run the job it fails because it can't find the packages.

  • 我正在使用云存储列出存储桶中的所有文件,因此,如果您有另一种不涉及云存储的解决方案,将不胜感激

这是我的dataflow-requirements-cache文件夹.在清理之前,我有多个文件具有不同的版本,例如

This is my dataflow-requirements-cache folder. Before cleaning it up I had multiple files with different versions e.g.

botocore-1.16.16.tar.gz
botocore-1.16.17.tar.gz
botocore-1.16.18.tar.gz

清理后,它看起来像这样(尝试上传numpy时仍然失败)

After I cleaned it up it looks like this, (it still failed while trying to upload numpy)

numpy-1.18.4.zip
urllib3-1.25.9.tar.gz
smart_open-2.0.0.tar.gz
six-1.15.0.tar.gz
setuptools-47.1.0.zip
s3transfer-0.3.3.tar.gz
rsa-4.0.tar.gz
requests-2.23.0.tar.gz
pytz-2020.1.tar.gz
python-dateutil-2.8.1.tar.gz
pyasn1-modules-0.2.8.tar.gz
pyasn1-0.4.8.tar.gz
protobuf-3.12.2.tar.gz
pandas-1.0.3.tar.gz
jmespath-0.10.0.tar.gz
idna-2.9.tar.gz
googleapis-common-protos-1.51.0.tar.gz
google-resumable-media-0.5.0.tar.gz
google-cloud-storage-1.28.1.tar.gz
google-cloud-core-1.3.0.tar.gz
google-auth-1.15.0.tar.gz
google-api-core-1.17.0.tar.gz
docutils-0.15.2.tar.gz
chardet-3.0.4.tar.gz
certifi-2020.4.5.1.tar.gz
cachetools-4.1.0.tar.gz
botocore-1.16.18.tar.gz
boto3-1.13.18.tar.gz
boto-2.49.0.tar.gz

----编辑---- 完整输出

---- EDIT ---- The full output

(airflow) afragotsis-mac:pandora_database afragotsis$ python PandoraDatabase.py \
>     --runner DataflowRunner \
>     --project XXX \
>     --staging_location gs://.../dataflow-template/PandoraDatabase/staging \
>     --temp_location gs://.../dataflow-template/PandoraDatabase/temp \
>     --template_location gs://.../dataflow-template/PandoraDatabase/pandoraTemplate \
>     --requirements_file requirements.txt \
>     --save_main_session True
WARNING:apache_beam.options.pipeline_options:--region not set; will default to us-central1. Future releases of Beam will require the user to set --region explicitly, or else have a default set via the gcloud tool. https://cloud.google.com/compute/docs/regions-zones
INFO:apache_beam.internal.gcp.auth:Setting socket default timeout to 60 seconds.
INFO:apache_beam.internal.gcp.auth:socket default timeout is 60.0 seconds.
INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS upload to gs://.../dataflow-template/PandoraDatabase/staging/beamapp-afragotsis-0529200636-871276.1590782796.871390/pipeline.pb...
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
INFO:oauth2client.client:Refreshing access_token
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
INFO:oauth2client.client:Refreshing access_token
INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS upload to gs://.../dataflow-template/PandoraDatabase/staging/beamapp-afragotsis-0529200636-871276.1590782796.871390/pipeline.pb in 0 seconds.
INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS upload to gs://.../dataflow-template/PandoraDatabase/staging/beamapp-afragotsis-0529200636-871276.1590782796.871390/requirements.txt...
INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS upload to gs://.../dataflow-template/PandoraDatabase/staging/beamapp-afragotsis-0529200636-871276.1590782796.871390/requirements.txt in 0 seconds.
INFO:apache_beam.runners.portability.stager:Executing command: ['/Users/afragotsis/opt/anaconda3/envs/airflow/bin/python', '-m', 'pip', 'download', '--dest', '/var/folders/zj/dqg766ks0cx663lg7brll7b80000gn/T/dataflow-requirements-cache', '-r', 'requirements.txt', '--exists-action', 'i', '--no-binary', ':all:']
INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS upload to gs://.../dataflow-template/PandoraDatabase/staging/beamapp-afragotsis-0529200636-871276.1590782796.871390/rsa-4.0.tar.gz...
INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS upload to gs://.../dataflow-template/PandoraDatabase/staging/beamapp-afragotsis-0529200636-871276.1590782796.871390/rsa-4.0.tar.gz in 0 seconds.
INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS upload to gs://.../dataflow-template/PandoraDatabase/staging/beamapp-afragotsis-0529200636-871276.1590782796.871390/urllib3-1.25.9.tar.gz...
INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS upload to gs://.../dataflow-template/PandoraDatabase/staging/beamapp-afragotsis-0529200636-871276.1590782796.871390/urllib3-1.25.9.tar.gz in 0 seconds.
INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS upload to gs://.../dataflow-template/PandoraDatabase/staging/beamapp-afragotsis-0529200636-871276.1590782796.871390/boto3-1.13.19.tar.gz...
INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS upload to gs://.../dataflow-template/PandoraDatabase/staging/beamapp-afragotsis-0529200636-871276.1590782796.871390/boto3-1.13.19.tar.gz in 0 seconds.
INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS upload to gs://.../dataflow-template/PandoraDatabase/staging/beamapp-afragotsis-0529200636-871276.1590782796.871390/pyasn1-modules-0.2.8.tar.gz...
INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS upload to gs://.../dataflow-template/PandoraDatabase/staging/beamapp-afragotsis-0529200636-871276.1590782796.871390/pyasn1-modules-0.2.8.tar.gz in 0 seconds.
INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS upload to gs://.../dataflow-template/PandoraDatabase/staging/beamapp-afragotsis-0529200636-871276.1590782796.871390/numpy-1.18.4.zip...
Traceback (most recent call last):
  File "PandoraDatabase.py", line 125, in <module>
    run()
  File "PandoraDatabase.py", line 119, in run
    | 'Write CSV' >> beam.ParDo(WriteCSVFIle(pandora_options.output_bucket, pandora_options.output_folder))
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/pipeline.py", line 503, in __exit__
    self.run().wait_until_finish()
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/pipeline.py", line 483, in run
    self._options).run(False)
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/pipeline.py", line 496, in run
    return self.runner.run_pipeline(self, self._options)
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.py", line 548, in run_pipeline
    self.dataflow_client.create_job(self.job), self)
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/utils/retry.py", line 234, in wrapper
    return fun(*args, **kwargs)
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/runners/dataflow/internal/apiclient.py", line 624, in create_job
    self.create_job_description(job)
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/runners/dataflow/internal/apiclient.py", line 680, in create_job_description
    resources = self._stage_resources(job.options)
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/runners/dataflow/internal/apiclient.py", line 577, in _stage_resources
    staging_location=google_cloud_options.staging_location)
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/runners/portability/stager.py", line 182, in stage_job_resources
    pkg, FileSystems.join(staging_location, os.path.basename(pkg)))
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/runners/dataflow/internal/apiclient.py", line 942, in stage_artifact
    local_path_to_artifact, artifact_name)
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/utils/retry.py", line 234, in wrapper
    return fun(*args, **kwargs)
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/runners/dataflow/internal/apiclient.py", line 564, in _gcs_file_copy
    self.stage_file(to_folder, to_name, f, total_size=total_size)
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/runners/dataflow/internal/apiclient.py", line 602, in stage_file
    response = self._storage_client.objects.Insert(request, upload=upload)
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py", line 1156, in Insert
    upload=upload, upload_config=upload_config)
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/base_api.py", line 715, in _RunMethod
    http_request, client=self.client)
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/transfer.py", line 908, in InitializeUpload
    return self.StreamInChunks()
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/transfer.py", line 1020, in StreamInChunks
    additional_headers=additional_headers)
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/transfer.py", line 957, in __StreamMedia
    response = send_func(self.stream.tell())
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/transfer.py", line 943, in CallSendChunk
    start, additional_headers=additional_headers)
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/transfer.py", line 1120, in __SendChunk
    return self.__SendMediaRequest(request, end)
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/transfer.py", line 1033, in __SendMediaRequest
    retries=self.num_retries, check_response_func=CheckResponse)
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/http_wrapper.py", line 356, in MakeRequest
    max_retry_wait, total_wait_sec))
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/http_wrapper.py", line 304, in HandleExceptionsAndRebuildHttpConnections
    raise retry_args.exc
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/http_wrapper.py", line 346, in MakeRequest
    check_response_func=check_response_func)
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/http_wrapper.py", line 396, in _MakeRequestNoRetry
    redirections=redirections, connection_type=connection_type)
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/oauth2client/transport.py", line 169, in new_request
    redirections, connection_type)
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/oauth2client/transport.py", line 169, in new_request
    redirections, connection_type)
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/httplib2/__init__.py", line 1991, in request
    cachekey,
  File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/httplib2/__init__.py", line 1690, in _request
    content,
httplib2.RedirectMissingLocation: Redirected but the response is missing a Location: header.

dataflow-requirements-cache的完整路径

the full path of the dataflow-requirements-cache

/private/var/folders/zj/dqg766ks0cx663lg7brll7b80000gn/T/dataflow-requirements-cache

尝试上传numpy时总是失败

it always fails when it tries to upload numpy

推荐答案

好的,所以无论我尝试什么,都无法使其与需求文件一起使用.所以我尝试了安装文件.所以现在命令看起来像这样

ok so no matter what I tried I couldn't make it work with the requirements file. So I tried the setup file. So now the command looks like this

python Database.py     
--runner DataflowRunner     
--project XXX     
--staging_location gs://.../staging     
--temp_location gs://.../temp
--template_location gs://.../Template       
--setup_file /Users/.../setup.py \
--save_main_session True 

安装文件是这个

import setuptools

REQUIRED_PACKAGES = [
          'google-cloud-storage==1.28.1',
          'pandas==1.0.3',
          'smart-open==2.0.0'
      ]

PACKAGE_NAME = 'my_package'
PACKAGE_VERSION = '0.0.1'

setuptools.setup(
    name=PACKAGE_NAME,
    version=PACKAGE_VERSION,
    description='Example project',
    install_requires=REQUIRED_PACKAGES,
    packages=setuptools.find_packages(),
)

这篇关于当我添加requirements.txt [Python]时,数据流失败的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆