使用AWS Glue或PySpark过滤DynamicFrame [英] Filtering DynamicFrame with AWS Glue or PySpark

查看:105
本文介绍了使用AWS Glue或PySpark过滤DynamicFrame的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在AWS Glue数据目录中有一个名为"mytable"的表.该表位于本地Oracle数据库连接"mydb"中.

I have a table in my AWS Glue Data Catalog called 'mytable'. This table is in an on-premises Oracle database connection 'mydb'.

我想将生成的DynamicFrame过滤到X_DATETIME_INSERT列(为时间戳)大于特定时间(在这种情况下为'2018-05-07 04:00:00')的行中.然后,我尝试对行进行计数,以确保计数低(该表大约为40,000行,但只有少数几行符合过滤条件).

I'd like to filter the resulting DynamicFrame to only rows where the X_DATETIME_INSERT column (which is a timestamp) is greater than a certain time (in this case, '2018-05-07 04:00:00'). Afterwards, I'm trying to count the rows to ensure that the count is low (the table is about 40,000 rows, but only a few rows should meet the filter criteria).

这是我当前的代码:

import boto3
from datetime import datetime
import logging
import os
import pg8000
import pytz
import sys
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from base64 import b64decode
from pyspark.context import SparkContext
from pyspark.sql.functions import lit
## @params: [TempDir, JOB_NAME]
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "mydb", table_name = "mytable", transformation_ctx = "datasource0")

# Try Glue native filtering    
filtered_df = Filter.apply(frame = datasource0, f = lambda x: x["X_DATETIME_INSERT"] > '2018-05-07 04:00:00')
filtered_df.count()

此代码运行20分钟并超时.我尝试了其他变体:

This code runs for 20 minutes and times out. I've tried other variations:

df = datasource0.toDF()
df.where(df.X_DATETIME_INSERT > '2018-05-07 04:00:00').collect()

还有

df.filter(df["X_DATETIME_INSERT"].gt(lit("'2018-05-07 04:00:00'")))

哪个失败了.我究竟做错了什么?我有Python经验,但对Glue和PySpark还是陌生的.

Which have failed. What am I doing wrong? I'm experienced in Python but new to Glue and PySpark.

推荐答案

AWS Glue将整个数据集从JDBC源加载到temp s3文件夹中,然后进行过滤.如果您的数据在s3中而不是Oracle中,并按某些键进行分区(即/year/month/day),则可以使用

AWS Glue loads entire dataset from your JDBC source into temp s3 folder and applies filtering afterwards. If your data was in s3 instead of Oracle and partitioned by some keys (ie. /year/month/day) then you could use pushdown-predicate feature to load a subset of data:

val partitionPredicate = s"to_date(concat(year, '-', month, '-', day)) BETWEEN '${fromDate}' AND '${toDate}'"

val df = glueContext.getCatalogSource(
   database = "githubarchive_month",
   tableName = "data",
   pushDownPredicate = partitionPredicate).getDynamicFrame()

不幸的是,这不适用于JDBC数据源.

Unfortunately, this doesn't work for JDBC data sources yet.

这篇关于使用AWS Glue或PySpark过滤DynamicFrame的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆