如何配置胶水书本与Scala代码一起使用? [英] How configure glue bookmars to work with scala code?

查看:117
本文介绍了如何配置胶水书本与Scala代码一起使用?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

考虑scala代码:

import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.util.{GlueArgParser, Job, JsonOptions}
import org.apache.spark.SparkContext

import scala.collection.JavaConverters.mapAsJavaMapConverter

object MyGlueJob {

  def main(sysArgs: Array[String]) {
    val spark: SparkContext = SparkContext.getOrCreate()
    val glueContext: GlueContext = new GlueContext(spark)

    val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray)
    Job.init(args("JOB_NAME"), glueContext, args.asJava)

    val input = glueContext
      .getCatalogSource(database = "my_data_base", tableName = "my_json_gz_partition_table")
      .getDynamicFrame()

    val processed = input.applyMapping(
      Seq(
        ("id",                                        "string", "id", "string"),
        ("my_date",                                   "string", "my_date", "string")
      ))
    glueContext.getSinkWithFormat(
      connectionType = "s3",
      options = JsonOptions(Map("path" -> "s3://my_path", "partitionKeys" -> List("my_date"))),
      format = "orc", transformationContext = ""
    ).writeDynamicFrame(processed)
    Job.commit
  }
}

输入是具有gzip压缩的json分区文件,按日期列进行了分区. 一切正常-数据以json格式读取并以orc格式编写.

The input is partitioned json file with gzip compression which are partitioned by date column. Everything works - the data is read in json format and written in orc.

但是,当尝试使用相同的数据运行作业时,它将再次读取并写入重复的数据.在此作业中启用了书签.调用了方法Job.initJob.commit.怎么了?

But when try to run job with same data it read it again and writes duplicated data. The bookmarks is enabled in this job. Methos Job.init and Job.commit are invocated. What is wrong?

已更新

我在getCatalogSourcegetSinkWithFormat中添加了transformationContext参数:

        val input = glueContext
      .getCatalogSource(database = "my_data_base", tableName = "my_json_gz_partition_table", transformationContext = "transformationContext1")
      .getDynamicFrame()

和:

    glueContext.getSinkWithFormat(
      connectionType = "s3",
      options = JsonOptions(Map("path" -> "s3://my_path", "partitionKeys" -> List("my_date"))),
      format = "orc", transformationContext = "transformationContext2"
    ).writeDynamicFrame(processed)

现在魔术以这种方式起作用":

Now magic "works" in that way:

  1. 第一次运行-好的
  2. 第二次运行(具有相同数据或相同数据并有新数据)-失败,并显示错误(以后出现)

同样,该错误在第二(和后续)运行之后发生. 消息Skipping Partition {"my_date": "2017-10-10"}也会出现在日志中.

Again the error happens after second (and subsequent) runs. Also the message Skipping Partition {"my_date": "2017-10-10"} appears in logs.

ERROR ApplicationMaster: User class threw exception: org.apache.spark.sql.AnalysisException: Partition column my_date not found in schema StructType(); org.apache.spark.sql.AnalysisException: Partition column my_date not found in schema StructType();
at org.apache.spark.sql.execution.datasources.PartitioningUtils$$anonfun$partitionColumnsSchema$1$$anonfun$apply$11.apply(PartitioningUtils.scala:439)
at org.apache.spark.sql.execution.datasources.PartitioningUtils$$anonfun$partitionColumnsSchema$1$$anonfun$apply$11.apply(PartitioningUtils.scala:439)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.execution.datasources.PartitioningUtils$$anonfun$partitionColumnsSchema$1.apply(PartitioningUtils.scala:438)
at org.apache.spark.sql.execution.datasources.PartitioningUtils$$anonfun$partitionColumnsSchema$1.apply(PartitioningUtils.scala:437)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at org.apache.spark.sql.execution.datasources.PartitioningUtils$.partitionColumnsSchema(PartitioningUtils.scala:437)
at org.apache.spark.sql.execution.datasources.PartitioningUtils$.validatePartitionColumn(PartitioningUtils.scala:420)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:443)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215)
at com.amazonaws.services.glue.SparkSQLDataSink.writeDynamicFrame(DataSink.scala:123)
at MobileArcToRaw$.main(script_2018-01-18-08-14-38.scala:99)

胶水书签到底是怎么回事???哦

What is really going on with glue bookmarks??? Oo

推荐答案

您是否尝试将源和接收器的transformationContext值设置为相同?当前,您在上次更新中将它们设置为不同的值.

Have you tried setting the transformationContext value to be the same for both the source and the sink? They are currently set to different values in your last update.

transformationContext = "transformationContext1"

transformationContext = "transformationContext2"

我也使用胶水和书签对此进行了挣扎.我正在尝试执行类似的任务,其中读取分区的JSON文件,该文件按年,月和日进行分区,每天都有新文件到达.我的工作是进行转换以提取一部分数据,然后将其沉入S3上的分区Parquet文件中.

I have struggled with this as well using Glue and bookmarks. I'm trying to perform a similar task where I read in partitioned JSON files that are partitioned by year, month and day with new files arriving every day. My job runs a transform to pull out a subset of the data and then sink into partitioned Parquet files on S3.

我正在使用Python,因此我对DynamicFrame的初始实例如下所示:

I'm using Python so my initial instantiation of the DynamicFrame looked like this:

dyf = glue_context.create_dynamic_frame.from_catalog(database="dev-db", table_name="raw", transformation_ctx="raw")

然后像下面这样沉入S3:

And a sink to S3 at the end like this:

glue_context.write_dynamic_frame.from_options( frame=select_out, connection_type='s3', connection_options={'path': output_dir, 'partitionKeys': ['year', 'month', 'day']}, format='parquet', transformation_ctx="dev-transactions" )

glue_context.write_dynamic_frame.from_options( frame=select_out, connection_type='s3', connection_options={'path': output_dir, 'partitionKeys': ['year', 'month', 'day']}, format='parquet', transformation_ctx="dev-transactions" )

最初,我运行了作业,并在启用书签的情况下正确生成了Parquet.然后,我添加了新的一天的数据,更新了输入表上的分区并重新运行.第二项工作将因以下错误而失败:

Initially I ran the job and the Parquet was generated correctly with bookmarks enabled. I then added a new day of data, updated the partitions on the input table and re-ran. The second job would fail with errors like this:

pyspark.sql.utils.AnalysisException: u"cannot resolve 'year' given input columns: [];;\n'Project ['year, 'month, 'day, 'data']

更改transformation_ctx相同(在我的情况下为dev-transactions),使该过程仅在处理正在处理的增量分区和为新分区生成Parquet的情况下才能正常工作.

Changing the transformation_ctx to be the same (dev-transactions in my case) enabled the process to work correctly with only the incremental partitions being processed and Parquet generated for the new partitions.

有关一般书签以及如何使用转换上下文变量的文档非常少.

The documentation is very sparse regarding Bookmarks in general and how the transformation context variable is used.

Python文档只说:(

The Python docs just say: (https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-glue-context.html):

transformation_ctx –要使用的转换上下文(可选).

transformation_ctx – The transformation context to use (optional).

Scala文档说(

The Scala docs say (https://docs.aws.amazon.com/glue/latest/dg/glue-etl-scala-apis-glue-gluecontext.html):

transformationContext —与作业书签要使用的接收器关联的转换上下文.默认设置为空.

transformationContext — Transformation context associated with the sink to be used by job bookmarks. Set to empty by default.

由于文档的解释工作不力,我可以观察到的最好结果是,转换上下文用于在已处理的源数据和接收器数据之间形成链接,并且定义了不同的上下文会阻止书签按预期方式工作

Best I can observe, since the docs do a poor job of explaining, is that the transformation context is used to form a linkage between what source and sink data have been processed and that having different contexts defined prevents Bookmarks from working as expected.

这篇关于如何配置胶水书本与Scala代码一起使用?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆