Py4j.protocol.Py4JJava错误:调用o133.pyWriteDynamicFrame时出错 [英] py4j.protocol.Py4JJavaError: An error occurred while calling o133.pyWriteDynamicFrame
本文介绍了Py4j.protocol.Py4JJava错误:调用o133.pyWriteDynamicFrame时出错的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
error logs 在AWS GLU中运行自动创建的脚本时出错。因为数据为结构格式,并且存储/更改的表与之前创建的表相同。
Py4j.protocol.Py4JJava错误:调用o133.pyWriteDynamicFrame时出错。 :com.amazonaws.services.glue.util.SchemaException:无法将结构字段标记写入csv请指点我哪里弄错了
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "experimentdb", table_name = "datalakexperiment",
transformation_ctx =
"datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database =
"experimentdb", table_name =
"datalakexperiment", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("id", "string", "id", "string"), ("identifier",
"string", "identifier",
"string"), ("session_count", "long", "session_count", "long"), ("language",
"string", "language",
"string"), ("timezone", "long", "timezone", "long"), ("game_version",
"string", "game_version",
"string"), ("device_os", "string", "device_os", "string"), ("device_type",
"long", "device_type",
"long"), ("device_model", "string", "device_model", "string"), ("ad_id",
"string", "ad_id",
"string"), ("tags.phone_number", "string", "tags.phone_number", "string"),
("tags.real_name",
"string", "tags.real_name", "string"), ("tags.email", "string",
"tags.email",
"string"),
("tags.onboardingStatus", "string", "tags.onboardingStatus", "string"),
("tags.dfuStatus", "string",
"tags.dfuStatus", "string"), ("tags.activityStatus", "string",
"tags.activityStatus", "string"),
("tags.lastOperationPerformed", "string", "tags.lastOperationPerformed",
"string"), ("last_active",
"string", "last_active", "string"), ("playtime", "long", "playtime",
"long"),
("amount_spent",
"double", "amount_spent", "double"), ("created_at", "string", "created_at",
"string"),
("invalid_identifier", "string", "invalid_identifier", "string"),
("badge_count", "long",
"badge_count", "long")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("id",
"string", "id", "string"),
("identifier", "string", "identifier", "string"), ("session_count", "long",
"session_count", "long"),
("language", "string", "language", "string"), ("timezone", "long",
"timezone", "long"),
("game_version", "string", "game_version", "string"), ("device_os",
"string",
"device_os", "string"),
("device_type", "long", "device_type", "long"), ("device_model", "string",
"device_model", "string"),
("ad_id", "string", "ad_id", "string"), ("tags.phone_number", "string",
"tags.phone_number",
"string"), ("tags.real_name", "string", "tags.real_name", "string"),
("tags.email", "string",
"tags.email", "string"), ("tags.onboardingStatus", "string",
"tags.onboardingStatus", "string"),
("tags.dfuStatus", "string", "tags.dfuStatus", "string"),
("tags.activityStatus", "string",
"tags.activityStatus", "string"), ("tags.lastOperationPerformed", "string",
"tags.lastOperationPerformed", "string"), ("last_active", "string",
"last_active", "string"),
("playtime", "long", "playtime", "long"), ("amount_spent", "double",
"amount_spent", "double"),
("created_at", "string", "created_at", "string"), ("invalid_identifier",
"string",
"invalid_identifier",
"string"), ("badge_count", "long", "badge_count", "long")],
transformation_ctx = "applymapping1")
## @type: SelectFields
## @args: [paths = ["id", "identifier", "session_count", "language",
"timezone", "game_version",
"device_os", "device_type", "device_model", "ad_id", "tags", "last_active",
"playtime",
"amount_spent", "created_at", "invalid_identifier", "badge_count"],
transformation_ctx =
"selectfields2"]
## @return: selectfields2
## @inputs: [frame = applymapping1]
selectfields2 = SelectFields.apply(frame = applymapping1, paths = ["id",
"identifier",
"session_count", "language", "timezone", "game_version", "device_os",
"device_type", "device_model",
"ad_id", "tags", "last_active", "playtime", "amount_spent", "created_at",
"invalid_identifier",
"badge_count"], transformation_ctx = "selectfields2")
## @type: ResolveChoice
## @args: [choice = "MATCH_CATALOG", database = "experimentdb", table_name
=
"datalakexperiment",
transformation_ctx = "resolvechoice3"]
## @return: resolvechoice3
## @inputs: [frame = selectfields2]
resolvechoice3 = ResolveChoice.apply(frame = selectfields2, choice =
"MATCH_CATALOG", database =
"experimentdb", table_name = "datalakexperiment", transformation_ctx =
"resolvechoice3")
## @type: DataSink
## @args: [database = "experimentdb", table_name = "datalakexperiment",
transformation_ctx =
"datasink4"]
## @return: datasink4
## @inputs: [frame = resolvechoice3]
datasink4 = glueContext.write_dynamic_frame.from_catalog(frame =
resolvechoice3, database =
"experimentdb", table_name = "datalakexperiment", transformation_ctx =
"datasink4")
job.commit()
推荐答案
表架构中的某个地方有一个结构对象。CSV不支持嵌套结构。因此您可以
- 使用分层输出格式,如JSON
- 展平结构,(更多信息如下)
因此以下是展平结构的解决方案:How to flatten a struct in a Spark dataframe?
以下是我的实现:
def flatten(schema, prefix=None):
"""Flattens out nested schema as CSV doesn't support nesting
NOTE: If different nested schemas have same named columns,the last one found will overwrite any earlier instances of that column"""
fields = []
for field in schema.fields:
name = f"{prefix}.{field.name}" if prefix else field.name
dtype = field.dataType
if isinstance(dtype, ArrayType):
dtype = dtype.elementType
if isinstance(dtype, StructType):
fields += flatten(dtype, prefix=name)
else:
fields.append(name)
return fields
然后,就在您输出之前,在数据帧上调用它,如下所示:
#in your imports:
from pyspark.context import SparkContext
from awsglue.context import GlueContext
#in your process:
spark_context = SparkContext.getOrCreate()
glue_context = GlueContext(spark_context)
resolvechoice3 = resolvechoice3.toDF()#convert to data frame
resolvechoice3 = resolvechoice3.select(flatten(resolvechoice3.schema))#flatten
resolvechoice3 = DynamicFrame.fromDF(resolvechoice3, glue_context, "final_convert")#convert back to dynamic frame
#and then output as usual
datasink4 = glueContext.write_dynamic_frame.from_catalog(frame =
resolvechoice3, database =
"experimentdb", table_name = "datalakexperiment", transformation_ctx =
"datasink4")
job.commit()
这篇关于Py4j.protocol.Py4JJava错误:调用o133.pyWriteDynamicFrame时出错的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文