简单的RDD在Spark中写入DynamoDB [英] Simple RDD write to DynamoDB in Spark

查看:246
本文介绍了简单的RDD在Spark中写入DynamoDB的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

试图将基本的RDD数据集导入DynamoDB时就陷入困境。这是代码:

  import org.apache.hadoop.mapred.JobConf 

var rdd = sc.parallelize(Array((,Map(col1 - > Map(s - >abc),col2 - > Map(n - >123)) ))
$ b $ jobConf = new JobConf(sc.hadoopConfiguration)
jobConf.set(dynamodb.output.tableName,table_x)
jobConf.set( mapred.output.format.class,org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat)

rdd.saveAsHadoopDataset(jobConf)

这是我得到的错误:

  16 / 02/28 15:40:38 WARN TaskSetManager:在阶段1.0(TID 18,IP-172-31-9-224.eu-west-1.compute.internal)中丢失的任务7.0:java.lang.ClassCastException:java。 lang.String不能转换为org.apache.hadoop.io.Text 
在org.apache.hadoop.dynamodb.write.DefaultDynamoDBRecordWriter.convertValueToDynamoDBItem(DefaultDynamoDBRecordWriter.java:10)
在org.apache。 hadoop.dynamodb.write.AbstractDynamoDBRecordWriter.write(AbstractDynamoD BRecordWriter.java:90)
at org.apache.spark.SparkHadoopWriter.write(SparkHadoopWriter.scala:96)
at org.apache.spark.rdd.PairRDDFunctions $$ anonfun $ saveAsHadoopDataset $ 1 $$ anonfun $ 13 $$ anonfun $ apply $ 6.apply $ mcV $ sp(PairRDDFunctions.scala:1199)
at org.apache.spark.rdd.PairRDDFunctions $$ anonfun $ saveAsHadoopDataset $ 1 $$ anonfun $ 13 $$ anonfun $ apply $ 6 .apply(PairRDDFunctions.scala:1197)
at org.apache.spark.rdd.PairRDDFunctions $$ anonfun $ saveAsHadoopDataset $ 1 $$ anonfun $ 13 $$ anonfun $ apply $ 6.apply(PairRDDFunctions.scala:1197)
at org.apache.spark.util.Utils $ .tryWithSafeFinally(Utils.scala:1250)
at org.apache.spark.rdd.PairRDDFunctions $$ anonfun $ saveAsHadoopDataset $ 1 $$ anonfun $ 13.apply(PairRDDFunctions .scala:1205)
at org.apache.spark.rdd.PairRDDFunctions $$ anonfun $ saveAsHadoopDataset $ 1 $$ anonfun $ 13.apply(PairRDDFunctions.scala:1185)
at org.apache.spark.scheduler .ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.ap ache.spark.executor.Executor $ TaskRunner.run(Executor.scala:213)$ b $在java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
在java.util.concurrent。 ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:615)$ b $在java.lang.Thread.run(Thread.java:745)

我可以做些什么来解决这个问题?

解决方案

您需要将您的对象转换为文本对象。



我建议你看看这里:

https://aws.amazon.com/blogs/big-data/using-spark- sql-for-etl /


Just got stuck on trying to import a basic RDD dataset to DynamoDB. This is the code:

import org.apache.hadoop.mapred.JobConf

var rdd = sc.parallelize(Array(("", Map("col1" -> Map("s" -> "abc"), "col2" -> Map("n" -> "123")))))

var jobConf = new JobConf(sc.hadoopConfiguration)
jobConf.set("dynamodb.output.tableName", "table_x")
jobConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat")

rdd.saveAsHadoopDataset(jobConf)

And this is the error I get:

16/02/28 15:40:38 WARN TaskSetManager: Lost task 7.0 in stage 1.0 (TID 18, ip-172-31-9-224.eu-west-1.compute.internal): java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.hadoop.io.Text
at org.apache.hadoop.dynamodb.write.DefaultDynamoDBRecordWriter.convertValueToDynamoDBItem(DefaultDynamoDBRecordWriter.java:10)
at org.apache.hadoop.dynamodb.write.AbstractDynamoDBRecordWriter.write(AbstractDynamoDBRecordWriter.java:90)
at org.apache.spark.SparkHadoopWriter.write(SparkHadoopWriter.scala:96)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply$mcV$sp(PairRDDFunctions.scala:1199)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply(PairRDDFunctions.scala:1197)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply(PairRDDFunctions.scala:1197)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1250)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1205)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1185)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

What can I do to fix this?

解决方案

You need to convert your objects to Text objects.

I suggest you have a look here:

https://aws.amazon.com/blogs/big-data/using-spark-sql-for-etl/

这篇关于简单的RDD在Spark中写入DynamoDB的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆