Spark 2.2.0-如何向DynamoDB写入/读取DataFrame [英] Spark 2.2.0 - How to write/read DataFrame to DynamoDB

查看:108
本文介绍了Spark 2.2.0-如何向DynamoDB写入/读取DataFrame的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我希望我的Spark应用程序从DynamoDB中读取一个表,进行处理,然后将结果写入DynamoDB中.

I want my Spark application to read a table from DynamoDB, do stuff, then write the result in DynamoDB.

现在,我可以将表作为 hadoopRDD 从DynamoDB读取到Spark中,并将其转换为DataFrame.但是,我必须使用正则表达式从 AttributeValue 中提取值.有没有更好/更优雅的方式?在AWS API中找不到任何内容.

Right now, I can read the table from DynamoDB into Spark as a hadoopRDD and convert it to a DataFrame. However, I had to use a regular expression to extract the value from AttributeValue. Is there a better/more elegant way? Couldn't find anything in the AWS API.

package main.scala.util

import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.rdd.RDD
import scala.util.matching.Regex
import java.util.HashMap

import com.amazonaws.services.dynamodbv2.model.AttributeValue
import org.apache.hadoop.io.Text;
import org.apache.hadoop.dynamodb.DynamoDBItemWritable
/* Importing DynamoDBInputFormat and DynamoDBOutputFormat */
import org.apache.hadoop.dynamodb.read.DynamoDBInputFormat
import org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.io.LongWritable

object Tester {

  // {S: 298905396168806365,} 
  def extractValue : (String => String) = (aws:String) => {
    val pat_value = "\\s(.*),".r

    val matcher = pat_value.findFirstMatchIn(aws)
                matcher match {
                case Some(number) => number.group(1).toString
                case None => ""
        }
  }


   def main(args: Array[String]) {
    val spark = SparkSession.builder().getOrCreate()
    val sparkContext = spark.sparkContext

      import spark.implicits._

      // UDF to extract Value from AttributeValue 
      val col_extractValue = udf(extractValue)

  // Configure connection to DynamoDB
  var jobConf_add = new JobConf(sparkContext.hadoopConfiguration)
      jobConf_add.set("dynamodb.input.tableName", "MyTable")
      jobConf_add.set("dynamodb.output.tableName", "MyTable")
      jobConf_add.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat")
      jobConf_add.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat")


      // org.apache.spark.rdd.RDD[(org.apache.hadoop.io.Text, org.apache.hadoop.dynamodb.DynamoDBItemWritable)]
      var hadooprdd_add = sparkContext.hadoopRDD(jobConf_add, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable])

      // Convert HadoopRDD to RDD
      val rdd_add: RDD[(String, String)] = hadooprdd_add.map {
      case (text, dbwritable) => (dbwritable.getItem().get("PIN").toString(), dbwritable.getItem().get("Address").toString())
      }

      // Convert RDD to DataFrame and extract Values from AttributeValue
      val df_add = rdd_add.toDF()
                  .withColumn("PIN", col_extractValue($"_1"))
                  .withColumn("Address", col_extractValue($"_2"))
                  .select("PIN","Address")
   }
}

将DataFrame写入DynamoDB

stackoverflow和其他地方的许多答案仅指向 emr-dynamodb-hadoop github.这些资源都没有实际演示如何写入DynamoDB.

Write the DataFrame to DynamoDB

Many answers in stackoverflow and elsewhere only point to the blog post and the emr-dynamodb-hadoop github. None of those resources actually demonstrate how to write to DynamoDB.

我尝试转换我的 DataFrame RDD [Row] 失败.

df_add.rdd.saveAsHadoopDataset(jobConf_add)

将这个DataFrame写入DynamoDB的步骤是什么?(如果您告诉我如何控制 overwrite putItem ;则为奖励积分)

What are the steps to write this DataFrame to DynamoDB? (Bonus Points if you tell me how to control overwrite vs putItem ;)

注意: df_add 具有与DynamoDB中的 MyTable 相同的架构.

Note: df_add has the same schema as MyTable in DynamoDB.

编辑:我正在遵循中的建议此答案指向将Spark SQL用于ETL :

// Format table to DynamoDB format
  val output_rdd =  df_add.as[(String,String)].rdd.map(a => {
    var ddbMap = new HashMap[String, AttributeValue]()

    // Field PIN
    var PINValue = new AttributeValue() // New AttributeValue
    PINValue.setS(a._1)                 // Set value of Attribute as String. First element of tuple
    ddbMap.put("PIN", PINValue)         // Add to HashMap

    // Field Address
    var AddValue = new AttributeValue() // New AttributeValue
    AddValue.setS(a._2)                 // Set value of Attribute as String
    ddbMap.put("Address", AddValue)     // Add to HashMap

    var item = new DynamoDBItemWritable()
    item.setItem(ddbMap)

    (new Text(""), item)
  })             

  output_rdd.saveAsHadoopDataset(jobConf_add) 

但是,现在我得到了 java.lang.ClassCastException:尽管遵循了文档,但是java.lang.String无法转换为org.apache.hadoop.io.Text ...有什么建议吗?

However, now I am getting java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.hadoop.io.Text despite following the documentation ... Do you have any suggestion ?

编辑2 :在

在拥有DataFrame之后,执行转换以使RDD与DynamoDB自定义输出格式知道如何编写的类型相匹配.自定义输出格式需要一个包含Text和 DynamoDBItemWritable 类型的元组.

考虑到这一点,下面的代码正是AWS博客所建议的,除了我将 output_df 转换为rdd之外,否则 saveAsHadoopDataset 不起作用.现在,我在线程"main" scala.reflect.internal.Symbols $ CyclicReference中得到了 Exception:涉及对象InterfaceAudience的非法循环引用.我在绳子的尽头!

Taking this into account, the code below is exactly what theAWS blog post suggest, except I cast output_df as an rdd otherwise saveAsHadoopDataset doesn't work. And now, I am getting Exception in thread "main" scala.reflect.internal.Symbols$CyclicReference: illegal cyclic reference involving object InterfaceAudience. I am at the end of my rope!

      // Format table to DynamoDB format
  val output_df =  df_add.map(a => {
    var ddbMap = new HashMap[String, AttributeValue]()

    // Field PIN
    var PINValue = new AttributeValue() // New AttributeValue
    PINValue.setS(a.get(0).toString())                 // Set value of Attribute as String
    ddbMap.put("PIN", PINValue)         // Add to HashMap

    // Field Address
    var AddValue = new AttributeValue() // New AttributeValue
    AddValue.setS(a.get(1).toString())                 // Set value of Attribute as String
    ddbMap.put("Address", AddValue)     // Add to HashMap

    var item = new DynamoDBItemWritable()
    item.setItem(ddbMap)

    (new Text(""), item)
  })             

  output_df.rdd.saveAsHadoopDataset(jobConf_add)   

推荐答案

我关注着将Spark SQL用于ETL"链接,并发现了相同的非法循环引用"异常.如下所示,针对该异常的解决方案非常简单(但是花了我2天的时间才能弄清楚).关键是要在数据框的RDD上使用映射功能,而不是在数据框本身上.

I was following that "Using Spark SQL for ETL" link, and found the same "illegal cyclic reference" exception. The solution for that exception is quite simple (but it cost me 2 days to figure out) as below. The key point is to use map function on the RDD of the dataframe, not the dataframe itself.

val ddbConf = new JobConf(spark.sparkContext.hadoopConfiguration)
ddbConf.set("dynamodb.output.tableName", "<myTableName>")
ddbConf.set("dynamodb.throughput.write.percent", "1.5")
ddbConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat")
ddbConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat")


val df_ddb =  spark.read.option("header","true").parquet("<myInputFile>")
val schema_ddb = df_ddb.dtypes

var ddbInsertFormattedRDD = df_ddb.rdd.map(a => {
    val ddbMap = new HashMap[String, AttributeValue]()

    for (i <- 0 to schema_ddb.length - 1) {
        val value = a.get(i)
        if (value != null) {
            val att = new AttributeValue()
            att.setS(value.toString)
            ddbMap.put(schema_ddb(i)._1, att)
        }
    }

    val item = new DynamoDBItemWritable()
    item.setItem(ddbMap)

    (new Text(""), item)
}
)

ddbInsertFormattedRDD.saveAsHadoopDataset(ddbConf)

这篇关于Spark 2.2.0-如何向DynamoDB写入/读取DataFrame的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆