气流mysql到gcp dag错误 [英] Airflow mysql to gcp Dag error

查看:143
本文介绍了气流mysql到gcp dag错误的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我最近开始使用Airflow.我正在为DAG工作:

I'm recently started working with Airflow. I'm working on DAG that:

  1. 查询MySQL数据库
  2. 提取查询并将其作为JSON文件存储在云存储桶中
  3. 将存储的JSON文件上传到BigQuery

Dag导入三个运算符:MySqlOperatorMySqlToGoogleCloudStorageOperatorGoogleCloudStorageToBigQueryOperator

Dag imports three operators: MySqlOperator, MySqlToGoogleCloudStorageOperator and GoogleCloudStorageToBigQueryOperator

我正在使用Airflow 1.8.0,Python 3和Pandas 0.19.0.

I am using Airflow 1.8.0, Python 3, and Pandas 0.19.0.

这是我的Dag代码:

sql2gcp_csv = MySqlToGoogleCloudStorageOperator(

    task_id='sql2gcp_csv',
    sql='airflow_gcp/aws_sql_extract_7days.sql',
    bucket='gs://{{var.value.gcs_bucket}}/{{ ds_nodash }}/',
    filename='{{ ds_nodash }}-account-*.json',
    schema_filename='support/file.json',
    approx_max_file_size_bytes=1900000000,
    mysql_conn_id='aws_mysql',
    google_cloud_storage_conn_id='airflow_gcp',

)

但是,当我运行它时,出现以下错误:

However, when I run it I receive the following error:

[2017-07-20 22:38:07,478] {models.py:1441} INFO - Marking task as FAILED. 

[2017-07-20 22:38:07,490] {models.py:1462} ERROR - a bytes-like object is required, not 'str'

/home/User/airflow/workspace/env/lib/python3.5/site-packages/airflow/models.py:1927: PendingDeprecationWarning: Invalid arguments were passed to MySqlOperator. Support for passing such arguments will be dropped in Airflow 2.0. Invalid arguments were:
*args: ()
**kwargs: {'database': 'test'}
category=PendingDeprecationWarning

/home/User/airflow/workspace/env/lib/python3.5/site-
packages/airflow/ti_deps/deps/base_ti_dep.py:94: PendingDeprecationWarning: generator '_get_dep_statuses' raised StopIteration

for dep_status in self._get_dep_statuses(ti, session, dep_context):
Traceback (most recent call last):

File "/home/User/airflow/workspace/env/bin/airflow", line 28, in <module> args.func(args)

File "/home/User/airflow/workspace/env/lib/python3.5/site-packages/airflow/bin/cli.py", line 422, in run pool=args.pool,

File "/home/User/airflow/workspace/env/lib/python3.5/site-packages/airflow/utils/db.py", line 53, in wrapper result = func(*args, **kwargs)

File "/home/User/airflow/workspace/env/lib/python3.5/site-packages/airflow/models.py", line 1374, in run result = task_copy.execute(context=context)

File "/home/User/airflow/workspace/env/lib/python3.5/site-packages/airflow/contrib/operators/mysql_to_gcs.py", line 91, in execute files_to_upload = self._write_local_data_files(cursor)

File "/home/User/airflow/workspace/env/lib/python3.5/site-packages/airflow/contrib/operators/mysql_to_gcs.py", line 136, in _write_local_data_files 
json.dump(row_dict, tmp_file_handle)

File "/usr/lib/python3.5/json/__init__.py", line 179, in dump 

TypeError: a bytes-like object is required, not 'str'

有人知道为什么会抛出该异常吗?

Does anyone know why this exception is thrown?

推荐答案

根据您的追溯,您的代码在

According to your traceback, your code is breaking at this point. As you can see, it process the code:

json.dump(row_dict, tmp_file_handle)

tmp_file_handleNamedTemporaryFile

tmp_file_handle is a NamedTemporaryFile initialized with default input args, that is, it simulates a file opened with w+b mode (and therefore only accepts bytes-like data as input).

问题在于,在Python 2中,所有字符串都是字节,而在Python 3中,字符串是文本(默认编码为utf-8).

The problem is that in Python 2 all strings are bytes whereas in Python 3 strings are texts (encoded by default as utf-8).

如果您打开Python 2并运行以下代码:

If you open a Python 2 and run this code:

In [1]: from tempfile import NamedTemporaryFile
In [2]: tmp_f = NamedTemporaryFile(delete=True)
In [3]: import json
In [4]: json.dump({'1': 1}, tmp_f)

它工作正常.

但是,如果您打开Python 3并运行相同的代码:

But if you open a Python 3 and run the same code:

In [54]: from tempfile import NamedTemporaryFile
In [55]: tmp_f = NamedTemporaryFile(delete=True)
In [56]: import json
In [57]: json.dump({'1': 1}, tmp_f)
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-57-81743b9013c4> in <module>()
----> 1 json.dump({'1': 1}, tmp_f)

/usr/local/lib/python3.6/json/__init__.py in dump(obj, fp, skipkeys, ensure_ascii, check_circular, allow_nan, cls, indent, separators, default, sort_keys, **kw)
    178     # a debuggability cost
    179     for chunk in iterable:
--> 180         fp.write(chunk)
    181 
    182 

/usr/local/lib/python3.6/tempfile.py in func_wrapper(*args, **kwargs)
    481             @_functools.wraps(func)
    482             def func_wrapper(*args, **kwargs):
--> 483                 return func(*args, **kwargs)
    484             # Avoid closing the file as long as the wrapper is alive,
    485             # see issue #18879.

TypeError: a bytes-like object is required, not 'str'

我们得到与您相同的错误.

We get the same error as yours.

这意味着Python 3仍不完全支持Airflow(如您在测试覆盖率,模块airflow/contrib/operators/mysql_to_gcs.py尚未在python 2或3中进行测试.确认这一点的一种方法是使用python 2运行您的代码,并查看其是否有效.

This means that Airflow is still not fully supported for Python 3 (as you can see in the test coverage, the module airflow/contrib/operators/mysql_to_gcs.py is not yet tested either in python 2 or 3). One way to confirm this would be to run your code using python 2 and see if it works.

我建议在其JIRA 上创建一个问题,要求两个版本的可移植性的Python.

I'd recommend creating an issue on their JIRA requesting portability for both versions of Python.

这篇关于气流mysql到gcp dag错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆