在没有EMR的情况下运行本地DynamoDB Spark作业 [英] Run local DynamoDB spark job without EMR

查看:109
本文介绍了在没有EMR的情况下运行本地DynamoDB Spark作业的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想在不使用EMR群集的情况下运行本地Dynamodb火花作业,
从某些表中读取数据并将其写入镶木地板/ CSV文件。
我没有找到任何支持它的spark-dynamo连接器,也许您有任何想法?

I want to run local Dynamodb spark job without using EMR cluster, that read data from some table and write it to parquet / CSV file. I didn't found any spark-dynamo connector that supports that, maybe you have any ideas?

我的代码示例:

import org.apache.hadoop.dynamodb.DynamoDBItemWritable
import org.apache.hadoop.dynamodb.read.DynamoDBInputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.sql.SparkSession

object copyDynamoTable extends App {
  val spark = SparkSession
    .builder()
    .appName("test")
    .master("local")
    .getOrCreate()

  val jobConf = new JobConf(spark.sparkContext.hadoopConfiguration)
  jobConf.set("dynamodb.servicename", "dynamodb")
  jobConf.set("dynamodb.input.tableName", "hen.poc.client") // Pointing to DynamoDB table
  jobConf.set("dynamodb.endpoint", "dynamodb.us-east-1.amazonaws.com")
  jobConf.set("dynamodb.regionid", "us-east-1")
  jobConf.set("dynamodb.throughput.read", "1")
  jobConf.set("dynamodb.throughput.read.percent", "1")
  jobConf.set("dynamodb.version", "2011-12-05")

  jobConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat")
  jobConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat")

  val orders = spark.sparkContext.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable])

  println(orders.count)

我收到以下异常:

18/09/05 17:06:41 INFO util.TaskCalculator: Cluster has 1 active nodes.
18/09/05 17:06:41 WARN util.ClusterTopologyNodeCapacityProvider: Exception when trying to determine instance types
java.nio.file.NoSuchFileException: /mnt/var/lib/info/job-flow.json
    at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
    at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
    at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
    at sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214)
    at java.nio.file.Files.newByteChannel(Files.java:361)
    at java.nio.file.Files.newByteChannel(Files.java:407)
    at java.nio.file.Files.readAllBytes(Files.java:3152)
    at org.apache.hadoop.dynamodb.util.ClusterTopologyNodeCapacityProvider.readJobFlowJsonString(ClusterTopologyNodeCapacityProvider.java:103)
    at org.apache.hadoop.dynamodb.util.ClusterTopologyNodeCapacityProvider.getCoreNodeMemoryMB(ClusterTopologyNodeCapacityProvider.java:42)
    at org.apache.hadoop.dynamodb.util.TaskCalculator.getMaxMapTasks(TaskCalculator.java:54)
    at org.apache.hadoop.dynamodb.DynamoDBUtil.calcMaxMapTasks(DynamoDBUtil.java:265)
    at org.apache.hadoop.dynamodb.read.AbstractDynamoDBInputFormat.getSplits(AbstractDynamoDBInputFormat.java:47)
    at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:200)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
    at org.apache.spark.rdd.RDD.count(RDD.scala:1162)
    at com.data.spark.dynamodb.copyDynamoTable$.delayedEndpoint$com$riskified$data$spark$dynamodb$copyDynamoTable$1(copyDynamoTable.scala:30)
    at com.data.spark.dynamodb.copyDynamoTable$delayedInit$body.apply(copyDynamoTable.scala:9)
    at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.App$$anonfun$main$1.apply(App.scala:76)
    at scala.App$$anonfun$main$1.apply(App.scala:76)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
    at scala.App$class.main(App.scala:76)
    at com.data.spark.dynamodb.copyDynamoTable$.main(copyDynamoTable.scala:9)
    at com.data.spark.dynamodb.copyDynamoTable.main(copyDynamoTable.scala)
Exception in thread "main" java.lang.ArithmeticException: / by zero


推荐答案

这是EMR群集上存在的文件。这是为了尝试确定要针对哪种实例类型运行以确定某些作业设置(例如内存)。显然是在本地运行的,因此您没有此文件,因此可以预期。

This is a file that is present on an EMR cluster. This is to try to determine what instance type it is running against to determine some job settings such as memory. Obviously running locally you wouldn't have this file so this is expected.

请按照下面的波纹管线程进行操作:

Please follow the bellow thread :

emr / github.com / issues / 50

这篇关于在没有EMR的情况下运行本地DynamoDB Spark作业的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆