气流mysql到gcp dag错误 [英] Airflow mysql to gcp Dag error
问题描述
我最近开始使用Airflow.我正在为DAG工作:
I'm recently started working with Airflow. I'm working on DAG that:
- 查询MySQL数据库
- 提取查询并将其作为JSON文件存储在云存储桶中
- 将存储的JSON文件上传到BigQuery
Dag导入三个运算符:MySqlOperator
,MySqlToGoogleCloudStorageOperator
和GoogleCloudStorageToBigQueryOperator
Dag imports three operators: MySqlOperator
, MySqlToGoogleCloudStorageOperator
and GoogleCloudStorageToBigQueryOperator
我正在使用Airflow 1.8.0,Python 3和Pandas 0.19.0.
I am using Airflow 1.8.0, Python 3, and Pandas 0.19.0.
这是我的Dag代码:
sql2gcp_csv = MySqlToGoogleCloudStorageOperator(
task_id='sql2gcp_csv',
sql='airflow_gcp/aws_sql_extract_7days.sql',
bucket='gs://{{var.value.gcs_bucket}}/{{ ds_nodash }}/',
filename='{{ ds_nodash }}-account-*.json',
schema_filename='support/file.json',
approx_max_file_size_bytes=1900000000,
mysql_conn_id='aws_mysql',
google_cloud_storage_conn_id='airflow_gcp',
)
但是,当我运行它时,出现以下错误:
However, when I run it I receive the following error:
[2017-07-20 22:38:07,478] {models.py:1441} INFO - Marking task as FAILED.
[2017-07-20 22:38:07,490] {models.py:1462} ERROR - a bytes-like object is required, not 'str'
/home/User/airflow/workspace/env/lib/python3.5/site-packages/airflow/models.py:1927: PendingDeprecationWarning: Invalid arguments were passed to MySqlOperator. Support for passing such arguments will be dropped in Airflow 2.0. Invalid arguments were:
*args: ()
**kwargs: {'database': 'test'}
category=PendingDeprecationWarning
/home/User/airflow/workspace/env/lib/python3.5/site-
packages/airflow/ti_deps/deps/base_ti_dep.py:94: PendingDeprecationWarning: generator '_get_dep_statuses' raised StopIteration
for dep_status in self._get_dep_statuses(ti, session, dep_context):
Traceback (most recent call last):
File "/home/User/airflow/workspace/env/bin/airflow", line 28, in <module> args.func(args)
File "/home/User/airflow/workspace/env/lib/python3.5/site-packages/airflow/bin/cli.py", line 422, in run pool=args.pool,
File "/home/User/airflow/workspace/env/lib/python3.5/site-packages/airflow/utils/db.py", line 53, in wrapper result = func(*args, **kwargs)
File "/home/User/airflow/workspace/env/lib/python3.5/site-packages/airflow/models.py", line 1374, in run result = task_copy.execute(context=context)
File "/home/User/airflow/workspace/env/lib/python3.5/site-packages/airflow/contrib/operators/mysql_to_gcs.py", line 91, in execute files_to_upload = self._write_local_data_files(cursor)
File "/home/User/airflow/workspace/env/lib/python3.5/site-packages/airflow/contrib/operators/mysql_to_gcs.py", line 136, in _write_local_data_files
json.dump(row_dict, tmp_file_handle)
File "/usr/lib/python3.5/json/__init__.py", line 179, in dump
TypeError: a bytes-like object is required, not 'str'
有人知道为什么会抛出该异常吗?
Does anyone know why this exception is thrown?
推荐答案
According to your traceback, your code is breaking at this point. As you can see, it process the code:
json.dump(row_dict, tmp_file_handle)
tmp_file_handle
是NamedTemporaryFile
tmp_file_handle
is a NamedTemporaryFile
initialized with default input args, that is, it simulates a file opened with w+b
mode (and therefore only accepts bytes-like data as input).
问题在于,在Python 2中,所有字符串都是字节,而在Python 3中,字符串是文本(默认编码为utf-8
).
The problem is that in Python 2 all strings are bytes whereas in Python 3 strings are texts (encoded by default as utf-8
).
如果您打开Python 2并运行以下代码:
If you open a Python 2 and run this code:
In [1]: from tempfile import NamedTemporaryFile
In [2]: tmp_f = NamedTemporaryFile(delete=True)
In [3]: import json
In [4]: json.dump({'1': 1}, tmp_f)
它工作正常.
但是,如果您打开Python 3并运行相同的代码:
But if you open a Python 3 and run the same code:
In [54]: from tempfile import NamedTemporaryFile
In [55]: tmp_f = NamedTemporaryFile(delete=True)
In [56]: import json
In [57]: json.dump({'1': 1}, tmp_f)
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-57-81743b9013c4> in <module>()
----> 1 json.dump({'1': 1}, tmp_f)
/usr/local/lib/python3.6/json/__init__.py in dump(obj, fp, skipkeys, ensure_ascii, check_circular, allow_nan, cls, indent, separators, default, sort_keys, **kw)
178 # a debuggability cost
179 for chunk in iterable:
--> 180 fp.write(chunk)
181
182
/usr/local/lib/python3.6/tempfile.py in func_wrapper(*args, **kwargs)
481 @_functools.wraps(func)
482 def func_wrapper(*args, **kwargs):
--> 483 return func(*args, **kwargs)
484 # Avoid closing the file as long as the wrapper is alive,
485 # see issue #18879.
TypeError: a bytes-like object is required, not 'str'
我们得到与您相同的错误.
We get the same error as yours.
这意味着Python 3仍不完全支持Airflow(如您在测试覆盖率,模块airflow/contrib/operators/mysql_to_gcs.py
尚未在python 2或3中进行测试.确认这一点的一种方法是使用python 2运行您的代码,并查看其是否有效.
This means that Airflow is still not fully supported for Python 3 (as you can see in the test coverage, the module airflow/contrib/operators/mysql_to_gcs.py
is not yet tested either in python 2 or 3). One way to confirm this would be to run your code using python 2 and see if it works.
我建议在其JIRA 上创建一个问题,要求两个版本的可移植性的Python.
I'd recommend creating an issue on their JIRA requesting portability for both versions of Python.
这篇关于气流mysql到gcp dag错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!