在没有EMR的情况下运行本地DynamoDB Spark作业 [英] Run local DynamoDB spark job without EMR
问题描述
我想在不使用EMR群集的情况下运行本地Dynamodb火花作业,
从某些表中读取数据并将其写入镶木地板/ CSV文件。
我没有找到任何支持它的spark-dynamo连接器,也许您有任何想法?
I want to run local Dynamodb spark job without using EMR cluster, that read data from some table and write it to parquet / CSV file. I didn't found any spark-dynamo connector that supports that, maybe you have any ideas?
我的代码示例:
import org.apache.hadoop.dynamodb.DynamoDBItemWritable
import org.apache.hadoop.dynamodb.read.DynamoDBInputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.sql.SparkSession
object copyDynamoTable extends App {
val spark = SparkSession
.builder()
.appName("test")
.master("local")
.getOrCreate()
val jobConf = new JobConf(spark.sparkContext.hadoopConfiguration)
jobConf.set("dynamodb.servicename", "dynamodb")
jobConf.set("dynamodb.input.tableName", "hen.poc.client") // Pointing to DynamoDB table
jobConf.set("dynamodb.endpoint", "dynamodb.us-east-1.amazonaws.com")
jobConf.set("dynamodb.regionid", "us-east-1")
jobConf.set("dynamodb.throughput.read", "1")
jobConf.set("dynamodb.throughput.read.percent", "1")
jobConf.set("dynamodb.version", "2011-12-05")
jobConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat")
jobConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat")
val orders = spark.sparkContext.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable])
println(orders.count)
我收到以下异常:
18/09/05 17:06:41 INFO util.TaskCalculator: Cluster has 1 active nodes.
18/09/05 17:06:41 WARN util.ClusterTopologyNodeCapacityProvider: Exception when trying to determine instance types
java.nio.file.NoSuchFileException: /mnt/var/lib/info/job-flow.json
at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
at sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214)
at java.nio.file.Files.newByteChannel(Files.java:361)
at java.nio.file.Files.newByteChannel(Files.java:407)
at java.nio.file.Files.readAllBytes(Files.java:3152)
at org.apache.hadoop.dynamodb.util.ClusterTopologyNodeCapacityProvider.readJobFlowJsonString(ClusterTopologyNodeCapacityProvider.java:103)
at org.apache.hadoop.dynamodb.util.ClusterTopologyNodeCapacityProvider.getCoreNodeMemoryMB(ClusterTopologyNodeCapacityProvider.java:42)
at org.apache.hadoop.dynamodb.util.TaskCalculator.getMaxMapTasks(TaskCalculator.java:54)
at org.apache.hadoop.dynamodb.DynamoDBUtil.calcMaxMapTasks(DynamoDBUtil.java:265)
at org.apache.hadoop.dynamodb.read.AbstractDynamoDBInputFormat.getSplits(AbstractDynamoDBInputFormat.java:47)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:200)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
at org.apache.spark.rdd.RDD.count(RDD.scala:1162)
at com.data.spark.dynamodb.copyDynamoTable$.delayedEndpoint$com$riskified$data$spark$dynamodb$copyDynamoTable$1(copyDynamoTable.scala:30)
at com.data.spark.dynamodb.copyDynamoTable$delayedInit$body.apply(copyDynamoTable.scala:9)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at com.data.spark.dynamodb.copyDynamoTable$.main(copyDynamoTable.scala:9)
at com.data.spark.dynamodb.copyDynamoTable.main(copyDynamoTable.scala)
Exception in thread "main" java.lang.ArithmeticException: / by zero
推荐答案
这是EMR群集上存在的文件。这是为了尝试确定要针对哪种实例类型运行以确定某些作业设置(例如内存)。显然是在本地运行的,因此您没有此文件,因此可以预期。
This is a file that is present on an EMR cluster. This is to try to determine what instance type it is running against to determine some job settings such as memory. Obviously running locally you wouldn't have this file so this is expected.
请按照下面的波纹管线程进行操作:
Please follow the bellow thread :
emr / github.com / issues / 50
这篇关于在没有EMR的情况下运行本地DynamoDB Spark作业的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!