Spark Streaming - 4 核和 16 核的处理时间相同.为什么? [英] Spark Streaming - Same processing time for 4 cores and 16 cores. Why?

查看:26
本文介绍了Spark Streaming - 4 核和 16 核的处理时间相同.为什么?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

场景:我正在用 Spark Streaming 做一些测试.大约 100 条记录的文件每 25 秒出现一次.

Scenario: I am doing some testing with spark streaming. The files with around 100 records comes in every 25 seconds.

问题:程序中使用 local[*] 的 4 核 pc 的处理平均需要 23 秒.当我将相同的应用程序部署到具有 16 核的服务器时,我期望处理时间有所改善.但是,我看到它仍然在 16 个内核中花费相同的时间(还检查了 ubuntu 中的 cpu 使用情况,并且 cpu 正在得到充分利用).所有的配置都是spark默认提供的.

Problem: The processing is taking on average 23 seconds for 4 core pc using local[*] in the program. When i deploy the same app to server with 16 cores i was expecting an improvement in processing time. However, i see it is still taking same time in 16 cores as well (also checked cpu usages in ubuntu and cpu is being fully utilized). All the configurations are default provided by spark.

问题:处理时间不应该随着流作业可用内核数量的增加而减少吗?

Question: Should not processing time decrease with increase in number of cores available for the streaming job?

代码:

  val conf = new SparkConf()
  .setMaster("local[*]")
  .setAppName(this.getClass.getCanonicalName)
  .set("spark.hadoop.validateOutputSpecs", "false")

val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(25))
val sqc = new SQLContext(sc)

val gpsLookUpTable = MapInput.cacheMappingTables(sc, sqc).persist(StorageLevel.MEMORY_AND_DISK_SER_2)
val broadcastTable = sc.broadcast(gpsLookUpTable)
jsonBuilder.append("[")
ssc.textFileStream("hdfs://localhost:9000/inputDirectory/")
  .foreachRDD { rdd =>
  if (!rdd.partitions.isEmpty) {

    val header = rdd.first().split(",")
    val rowsWithoutHeader = Utils.dropHeader(rdd)
rowsWithoutHeader.foreach { row =>
      jsonBuilder.append("{")
      val singleRowArray = row.split(",")
      (header, singleRowArray).zipped
        .foreach { (x, y) =>
        jsonBuilder.append(convertToStringBasedOnDataType(x, y))
        // GEO Hash logic here
        if (x.equals("GPSLat") || x.equals("Lat")) {
          lattitude = y.toDouble
        }
        else if (x.equals("GPSLon") || x.equals("Lon")) {
          longitude = y.toDouble
          if (x.equals("Lon")) {
            // This section is used to convert GPS Look Up to GPS LookUP with Hash
            jsonBuilder.append(convertToStringBasedOnDataType("geoCode", GeoHash.encode(lattitude, longitude)))
          }
          else {
            val selectedRow = broadcastTable.value
              .filter("geoCode LIKE '" + GeoHash.subString(lattitude, longitude) + "%'")
              .withColumn("Distance", calculateDistance(col("Lat"), col("Lon")))
              .orderBy("Distance")
              .select("TrackKM", "TrackName").take(1)
            if (selectedRow.length != 0) {
              jsonBuilder.append(convertToStringBasedOnDataType("TrackKm", selectedRow(0).get(0)))
              jsonBuilder.append(convertToStringBasedOnDataType("TrackName", selectedRow(0).get(1)))
            }
            else {
              jsonBuilder.append(convertToStringBasedOnDataType("TrackKm", "NULL"))
              jsonBuilder.append(convertToStringBasedOnDataType("TrackName", "NULL"))
            }
          }
        }
      }
      jsonBuilder.setLength(jsonBuilder.length - 1)
      jsonBuilder.append("},")
    }
  sc.parallelize(Seq(jsonBuilder.toString)).repartition(1).saveAsTextFile("hdfs://localhost:9000/outputDirectory")

推荐答案

听起来您只使用了一个线程,如果是这种情况,应用程序是在 4 核还是 16 核的机器上运行都无关紧要.

It sounds like you are using only one thread, whether the application runs on a machine with 4 or 16 cores won't matter if that is the case.

听起来好像有 1 个文件进来,1 个文件是 1 个 100 行的 RDD 分区.您遍历该 RDD 中的行并附加 jsonBuilder.最后调用 repartition(1) 这将使文件的写入成为单线程.

It sounds like 1 file comes in, that 1 file is 1 RDD partition with 100 rows. You iterate over the rows in that RDD and append the jsonBuilder. At the end you call repartition(1) which will make the writing of the file single threaded.

您可以在拿起文件后将数据集修复为 12 个 RDD 分区,以确保其他线程在行上工作.但除非我遗漏了一些东西,否则你很幸运这不会发生.如果两个线程同时调用 jsonBuilder.append("{") 会发生什么?他们不会创建无效的 JSON.我可能在这里遗漏了一些东西.

You could reparation your data-set to 12 RDD partitions after you pick up the file, to ensure that other threads work on the rows. But unless I am missing something you are lucky this isn't happening. What happens if two threads are calling jsonBuilder.append("{") at the same time? Won't they create invalid JSON. I could be missing something here.

您可以通过添加这样的日志来测试我对您的应用程序的单线程性是否正确:

You could test to see if I am correct about the single threaded-ness of your application by adding logging like this:

scala> val rdd1 = sc.parallelize(1 to 10).repartition(1)
rdd1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[5] at repartition at <console>:21

scala> rdd1.foreach{ r => {println(s"${Thread.currentThread.getName()} => $r")} }
Executor task launch worker-40 => 1
Executor task launch worker-40 => 2
Executor task launch worker-40 => 3
Executor task launch worker-40 => 4
Executor task launch worker-40 => 5
Executor task launch worker-40 => 6
Executor task launch worker-40 => 7
Executor task launch worker-40 => 8
Executor task launch worker-40 => 9
Executor task launch worker-40 => 10

scala> val rdd3 = sc.parallelize(1 to 10).repartition(3)
rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[40] at repartition at <console>:21

scala> rdd3.foreach{ r => {println(s"${Thread.currentThread.getName()} => $r")} }
Executor task launch worker-109 => 1
Executor task launch worker-108 => 2
Executor task launch worker-95 => 3
Executor task launch worker-95 => 4
Executor task launch worker-109 => 5
Executor task launch worker-108 => 6
Executor task launch worker-108 => 7
Executor task launch worker-95 => 8
Executor task launch worker-109 => 9
Executor task launch worker-108 => 10

这篇关于Spark Streaming - 4 核和 16 核的处理时间相同.为什么?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆