如何避免在使用 GeoSpark 的范围查询中超出 gc 开销限制? [英] How to avoid gc overhead limit exceeded in a range query with GeoSpark?

查看:22
本文介绍了如何避免在使用 GeoSpark 的范围查询中超出 gc 开销限制?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用的是带有 GeoSpark 1.2.0 扩展的 Spark 2.4.3.

I am using Spark 2.4.3 with the extension of GeoSpark 1.2.0.

我有两个表作为范围距离连接.一张表 (t1) 如果 ~ 100K 行只有一列是 Geospark 的几何.另一个表 (t2) 大约有 30M 行,由 Int 值和 Geospark 的几何列组成.

I have two tables to join as range distance. One table (t1) if ~ 100K rows with one column only that is a Geospark's geometry. The other table (t2) is ~ 30M rows and it is composed by an Int value and a Geospark's geometry column.

我想要做的只是一个简单的:

What I am trying to do is just a simple:

    val spark = SparkSession
      .builder()
//      .master("local[*]")
      .config("spark.serializer", classOf[KryoSerializer].getName)
      .config("spark.kryo.registrator", classOf[GeoSparkKryoRegistrator].getName)
      .config("geospark.global.index", "true")
      .config("geospark.global.indextype", "rtree")
      .config("geospark.join.gridtype", "rtree")
      .config("geospark.join.numpartition", 200)
      .config("spark.sql.parquet.filterPushdown", "true")
//      .config("spark.sql.shuffle.partitions", 10000)
      .config("spark.sql.autoBroadcastJoinThreshold", -1)
      .appName("PropertyMaster.foodDistanceEatout")
      .getOrCreate()

GeoSparkSQLRegistrator.registerAll(spark)

spark.sparkContext.setLogLevel("ERROR")

spark.read
  .load(s"$dataPath/t2")
  .repartition(200)
  .createOrReplaceTempView("t2")

spark.read
  .load(s"$dataPath/t1")
  .repartition(200)
  .cache()
  .createOrReplaceTempView("t1")

val query =
  """
    |select /*+ BROADCAST(t1) */
    |  t2.cid, ST_Distance(t1.geom, t2.geom) as distance
    |  from t2, t1 where ST_Distance(t1.geom, t2.geom) <= 3218.69""".stripMargin

spark.sql(query)
  .repartition(200)
  .write.mode(SaveMode.Append)
  .option("path", s"$dataPath/my_output.csv")
  .format("csv").save()

我尝试了不同的配置,当我在本地或在我的笔记本电脑上的本地集群上运行它时(tot mem 16GB 和 8 个内核),但没有任何运气,因为程序在 GeoSpark 的Distinct at Join"时崩溃,并且有很多改组.但是我无法从 SparkSQL 语法中删除改组.我想在最大的表上添加一个额外的列 id,例如每 200 行左右添加一个相同的整数,然后重新分区,但也没有用.

I tried different configurations, cboth when I run it locally or on my local cluster on my laptop (tot mem 16GB and 8 cores) but without any luck as the program crashes at "Distinct at Join" for GeoSpark with lots of shuffling. However I am not able to remove the shuffling from SparkSQL syntax. I thought to add an extra column id on the biggest table as for example same integer every 200 rows or so and repartition by that, but didn't work too.

我期待一个用于 GeoSpark 索引的分区器,但我不确定它是否有效.

I was expecting a partitioner for GeoSpark indexing but I am not sure it is working.

有什么想法吗?

推荐答案

我自己找到了一个答案,因为 GC 开销的问题是由于分区以及 GeoSpark 的 Partitioner 所需的内存(基于索引)以及由于已解决的长时间地理查询计算导致的超时,添加了 GeoSpark 网站本身建议的以下参数:

I have found an answer myself, as the problem of the GC overhead was due to partitioning but also the memory needed for the Partitioner by GeoSpark (based on index) and the timeout due to long geoquery calculations that have been solved adding the following parameters as suggested by GeoSpark website itself:

spark.executor.memory 4g
spark.driver.memory 10g
spark.network.timeout 10000s
spark.driver.maxResultSize 5g

这篇关于如何避免在使用 GeoSpark 的范围查询中超出 gc 开销限制?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆