Spark 2.2.0-如何向DynamoDB写入/读取DataFrame [英] Spark 2.2.0 - How to write/read DataFrame to DynamoDB
问题描述
我希望我的Spark应用程序从DynamoDB中读取一个表,进行处理,然后将结果写入DynamoDB中.
I want my Spark application to read a table from DynamoDB, do stuff, then write the result in DynamoDB.
现在,我可以将表作为 hadoopRDD
从DynamoDB读取到Spark中,并将其转换为DataFrame.但是,我必须使用正则表达式从 AttributeValue
中提取值.有没有更好/更优雅的方式?在AWS API中找不到任何内容.
Right now, I can read the table from DynamoDB into Spark as a hadoopRDD
and convert it to a DataFrame. However, I had to use a regular expression to extract the value from AttributeValue
. Is there a better/more elegant way? Couldn't find anything in the AWS API.
package main.scala.util
import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.rdd.RDD
import scala.util.matching.Regex
import java.util.HashMap
import com.amazonaws.services.dynamodbv2.model.AttributeValue
import org.apache.hadoop.io.Text;
import org.apache.hadoop.dynamodb.DynamoDBItemWritable
/* Importing DynamoDBInputFormat and DynamoDBOutputFormat */
import org.apache.hadoop.dynamodb.read.DynamoDBInputFormat
import org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.io.LongWritable
object Tester {
// {S: 298905396168806365,}
def extractValue : (String => String) = (aws:String) => {
val pat_value = "\\s(.*),".r
val matcher = pat_value.findFirstMatchIn(aws)
matcher match {
case Some(number) => number.group(1).toString
case None => ""
}
}
def main(args: Array[String]) {
val spark = SparkSession.builder().getOrCreate()
val sparkContext = spark.sparkContext
import spark.implicits._
// UDF to extract Value from AttributeValue
val col_extractValue = udf(extractValue)
// Configure connection to DynamoDB
var jobConf_add = new JobConf(sparkContext.hadoopConfiguration)
jobConf_add.set("dynamodb.input.tableName", "MyTable")
jobConf_add.set("dynamodb.output.tableName", "MyTable")
jobConf_add.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat")
jobConf_add.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat")
// org.apache.spark.rdd.RDD[(org.apache.hadoop.io.Text, org.apache.hadoop.dynamodb.DynamoDBItemWritable)]
var hadooprdd_add = sparkContext.hadoopRDD(jobConf_add, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable])
// Convert HadoopRDD to RDD
val rdd_add: RDD[(String, String)] = hadooprdd_add.map {
case (text, dbwritable) => (dbwritable.getItem().get("PIN").toString(), dbwritable.getItem().get("Address").toString())
}
// Convert RDD to DataFrame and extract Values from AttributeValue
val df_add = rdd_add.toDF()
.withColumn("PIN", col_extractValue($"_1"))
.withColumn("Address", col_extractValue($"_2"))
.select("PIN","Address")
}
}
将DataFrame写入DynamoDB
stackoverflow和其他地方的许多答案仅指向 emr-dynamodb-hadoop github.这些资源都没有实际演示如何写入DynamoDB.
Write the DataFrame to DynamoDB
Many answers in stackoverflow and elsewhere only point to the blog post and the emr-dynamodb-hadoop github. None of those resources actually demonstrate how to write to DynamoDB.
我尝试转换我的 DataFrame
到 RDD [Row]
失败.
df_add.rdd.saveAsHadoopDataset(jobConf_add)
将这个DataFrame写入DynamoDB的步骤是什么?(如果您告诉我如何控制 overwrite
与 putItem
;则为奖励积分)
What are the steps to write this DataFrame to DynamoDB? (Bonus Points if you tell me how to control overwrite
vs putItem
;)
注意: df_add
具有与DynamoDB中的 MyTable
相同的架构.
Note: df_add
has the same schema as MyTable
in DynamoDB.
编辑:我正在遵循中的建议此答案指向将Spark SQL用于ETL :
// Format table to DynamoDB format
val output_rdd = df_add.as[(String,String)].rdd.map(a => {
var ddbMap = new HashMap[String, AttributeValue]()
// Field PIN
var PINValue = new AttributeValue() // New AttributeValue
PINValue.setS(a._1) // Set value of Attribute as String. First element of tuple
ddbMap.put("PIN", PINValue) // Add to HashMap
// Field Address
var AddValue = new AttributeValue() // New AttributeValue
AddValue.setS(a._2) // Set value of Attribute as String
ddbMap.put("Address", AddValue) // Add to HashMap
var item = new DynamoDBItemWritable()
item.setItem(ddbMap)
(new Text(""), item)
})
output_rdd.saveAsHadoopDataset(jobConf_add)
但是,现在我得到了 java.lang.ClassCastException:尽管遵循了文档,但是java.lang.String无法转换为org.apache.hadoop.io.Text
...有什么建议吗?
However, now I am getting java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.hadoop.io.Text
despite following the documentation ... Do you have any suggestion ?