星火流 - 相同的处理时间为4芯和16芯。为什么? [英] Spark Streaming - Same processing time for 4 cores and 16 cores. Why?

查看:5361
本文介绍了星火流 - 相同的处理时间为4芯和16芯。为什么?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

场景:我做了一些测试火花流。有100条记录中的文件进来每25秒。

问题:处理使用承担平均得分超过23秒,4核PC的本地[*]在程序中。当我与16个内核部署相同的应用程序服务器,我期待在处理时间的改善。但是,我看到它仍然采取同时在16个核心以及(还检查了CPU占用率在Ubuntu和CPU被充分利用)。所有配置都火花提供的默认。

问题:
如果不处理与可用于数据流作业,增加核心数量减少的时间?

code:

  VAL的conf =新SparkConf()
  .setMaster(本地[*])
  .setAppName(this.getClass.getCanonicalName)
  .SET(spark.hadoop.validateOutputSpecs,假)VAL SC =新SparkContext(CONF)
VAL SSC =新的StreamingContext(SC,秒(25))
VAL SQC =新SQLContext(SC)VAL gpsLookUpTable = MapInput.cacheMappingTables(SC,SQC).persist(StorageLevel.MEMORY_AND_DISK_SER_2)
VAL broadcastTable = sc.broadcast(gpsLookUpTable)
jsonBuilder.append([)
ssc.textFileStream(HDFS://本地主机:9000 / inputDirectory /)
  .foreachRDD {RDD = GT;
  如果(!rdd.partitions.isEmpty){    VAL头= rdd.first()。分裂()
    VAL rowsWithoutHeader = Utils.dropHeader(RDD)
rowsWithoutHeader.foreach {行=>
      jsonBuilder.append({)
      VAL singleRowArray = row.split(,)
      (头,singleRowArray).zipped
        .foreach {(X,Y)=>
        jsonBuilder.append(convertToStringBasedOnDataType(X,Y))
        // GEO哈希这里的逻辑
        如果(x.equals(GPSLat)|| x.equals(纬度)){
          lattitude = y.toDouble
        }
        否则如果(x.equals(GPSLon)|| x.equals(经度)){
          经度= y.toDouble
          如果(x.equals(经度)){
            //这部分是用来转换GPS仰望GPS查找与哈希
            jsonBuilder.append(convertToStringBasedOnDataType(地理code,GeoHash.en code(lattitude,经度)))
          }
          其他{
            VAL selectedRow = broadcastTable.value
              .filter(地理code LIKE'+ GeoHash.subString(lattitude,经度)+%')
              .withColumn(距离,calculateDistance(COL(纬度),列(经度)))
              .orderBy(距离)
              。选择(TrackKM,TRACKNAME)。取(1)
            如果(selectedRow.length!= 0){
              jsonBuilder.append(convertToStringBasedOnDataType(TrackKm,selectedRow(0)获得(0)))
              jsonBuilder.append(convertToStringBasedOnDataType(TRACKNAME,selectedRow(0)获得(1)))
            }
            其他{
              jsonBuilder.append(convertToStringBasedOnDataType(TrackKm,空))
              jsonBuilder.append(convertToStringBasedOnDataType(TRACKNAME,空))
            }
          }
        }
      }
      jsonBuilder.setLength(jsonBuilder.length - 1)
      jsonBuilder.append(})
    }
  sc.parallelize(Seq(jsonBuilder.toString)).repartition(1).saveAsTextFile(\"hdfs://localhost:9000/outputDirectory\")


解决方案

这听起来像你只使用一个线程,应用程序是否有4个或16个内核的机器上运行,如果是这样的话并不重要。

这听起来像1文件进来,即1个文件为1 RDD分区100行。您遍历在RDD行并追加 jsonBuilder 。在最后你叫重新分区(1)这将使该文件的书写单线程的。

您,赔偿可以拿起你以后该文件,以确保其他线程的工作行的数据设定为12 RDD分区。但是,除非我失去了一些东西你是幸运的这不会发生。如果两个线程正在调用会发生什么 jsonBuilder.append({)在同一时间?他们会不会产生无效JSON。我可以在这里失去了一些东西。

您可以测试,看看我对你的应用程序的单线程岬正确的加入日志是这样的:

 斯卡拉> VAL RDD1集= sc.parallelize(1〜10).repartition(1)
RDD1集:;控制台> org.apache.spark.rdd.RDD [INT] =在AT&LT再分配MapPartitionsRDD [5]:21斯卡拉> rdd1.foreach {R => {的println(S$ {Thread.currentThread.getName()} => $ R)}}
执行器发射任务工人40 => 1
执行器发射任务工人40 => 2
执行器发射任务工人40 => 3
执行器发射任务工人40 => 4
执行器发射任务工人40 =>五
执行器发射任务工人40 => 6
执行器发射任务工人40 => 7
执行器发射任务工人40 => 8
执行器发射任务工人40 => 9
执行器发射任务工人40 => 10斯卡拉> VAL rdd3 = sc.parallelize(1〜10).repartition(3)
rdd3:org.apache.spark.rdd.RDD [INT] = MapPartitionsRDD [40]在AT&LT再分配;控制台>:21斯卡拉> rdd3.foreach {R => {的println(S$ {Thread.currentThread.getName()} => $ R)}}
执行器发射任务工人109 => 1
执行器发射任务工人108 => 2
执行器发射任务工人95 => 3
执行器发射任务工人95 => 4
执行器发射任务工人109 =>五
执行器发射任务工人108 => 6
执行器发射任务工人108 => 7
执行器发射任务工人95 => 8
执行器发射任务工人109 => 9
执行器发射任务工人108 => 10

Scenario: I am doing some testing with spark streaming. The files with around 100 records comes in every 25 seconds.

Problem: The processing is taking on average 23 seconds for 4 core pc using local[*] in the program. When i deploy the same app to server with 16 cores i was expecting an improvement in processing time. However, i see it is still taking same time in 16 cores as well (also checked cpu usages in ubuntu and cpu is being fully utilized). All the configurations are default provided by spark.

Question: Should not processing time decrease with increase in number of cores available for the streaming job?

Code:

  val conf = new SparkConf()
  .setMaster("local[*]")
  .setAppName(this.getClass.getCanonicalName)
  .set("spark.hadoop.validateOutputSpecs", "false")

val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(25))
val sqc = new SQLContext(sc)

val gpsLookUpTable = MapInput.cacheMappingTables(sc, sqc).persist(StorageLevel.MEMORY_AND_DISK_SER_2)
val broadcastTable = sc.broadcast(gpsLookUpTable)
jsonBuilder.append("[")
ssc.textFileStream("hdfs://localhost:9000/inputDirectory/")
  .foreachRDD { rdd =>
  if (!rdd.partitions.isEmpty) {

    val header = rdd.first().split(",")
    val rowsWithoutHeader = Utils.dropHeader(rdd)
rowsWithoutHeader.foreach { row =>
      jsonBuilder.append("{")
      val singleRowArray = row.split(",")
      (header, singleRowArray).zipped
        .foreach { (x, y) =>
        jsonBuilder.append(convertToStringBasedOnDataType(x, y))
        // GEO Hash logic here
        if (x.equals("GPSLat") || x.equals("Lat")) {
          lattitude = y.toDouble
        }
        else if (x.equals("GPSLon") || x.equals("Lon")) {
          longitude = y.toDouble
          if (x.equals("Lon")) {
            // This section is used to convert GPS Look Up to GPS LookUP with Hash
            jsonBuilder.append(convertToStringBasedOnDataType("geoCode", GeoHash.encode(lattitude, longitude)))
          }
          else {
            val selectedRow = broadcastTable.value
              .filter("geoCode LIKE '" + GeoHash.subString(lattitude, longitude) + "%'")
              .withColumn("Distance", calculateDistance(col("Lat"), col("Lon")))
              .orderBy("Distance")
              .select("TrackKM", "TrackName").take(1)
            if (selectedRow.length != 0) {
              jsonBuilder.append(convertToStringBasedOnDataType("TrackKm", selectedRow(0).get(0)))
              jsonBuilder.append(convertToStringBasedOnDataType("TrackName", selectedRow(0).get(1)))
            }
            else {
              jsonBuilder.append(convertToStringBasedOnDataType("TrackKm", "NULL"))
              jsonBuilder.append(convertToStringBasedOnDataType("TrackName", "NULL"))
            }
          }
        }
      }
      jsonBuilder.setLength(jsonBuilder.length - 1)
      jsonBuilder.append("},")
    }
  sc.parallelize(Seq(jsonBuilder.toString)).repartition(1).saveAsTextFile("hdfs://localhost:9000/outputDirectory")

解决方案

It sounds like you are using only one thread, whether the application runs on a machine with 4 or 16 cores won't matter if that is the case.

It sounds like 1 file comes in, that 1 file is 1 RDD partition with 100 rows. You iterate over the rows in that RDD and append the jsonBuilder. At the end you call repartition(1) which will make the writing of the file single threaded.

You could reparation your data-set to 12 RDD partitions after you pick up the file, to ensure that other threads work on the rows. But unless I am missing something you are lucky this isn't happening. What happens if two threads are calling jsonBuilder.append("{") at the same time? Won't they create invalid JSON. I could be missing something here.

You could test to see if I am correct about the single threaded-ness of your application by adding logging like this:

scala> val rdd1 = sc.parallelize(1 to 10).repartition(1)
rdd1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[5] at repartition at <console>:21

scala> rdd1.foreach{ r => {println(s"${Thread.currentThread.getName()} => $r")} }
Executor task launch worker-40 => 1
Executor task launch worker-40 => 2
Executor task launch worker-40 => 3
Executor task launch worker-40 => 4
Executor task launch worker-40 => 5
Executor task launch worker-40 => 6
Executor task launch worker-40 => 7
Executor task launch worker-40 => 8
Executor task launch worker-40 => 9
Executor task launch worker-40 => 10

scala> val rdd3 = sc.parallelize(1 to 10).repartition(3)
rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[40] at repartition at <console>:21

scala> rdd3.foreach{ r => {println(s"${Thread.currentThread.getName()} => $r")} }
Executor task launch worker-109 => 1
Executor task launch worker-108 => 2
Executor task launch worker-95 => 3
Executor task launch worker-95 => 4
Executor task launch worker-109 => 5
Executor task launch worker-108 => 6
Executor task launch worker-108 => 7
Executor task launch worker-95 => 8
Executor task launch worker-109 => 9
Executor task launch worker-108 => 10

这篇关于星火流 - 相同的处理时间为4芯和16芯。为什么?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆