所有执行者都死了 MinHash LSH PySpark approxSimilarityJoin 自加入 EMR 集群 [英] All executors dead MinHash LSH PySpark approxSimilarityJoin self-join on EMR cluster

查看:26
本文介绍了所有执行者都死了 MinHash LSH PySpark approxSimilarityJoin 自加入 EMR 集群的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在 (name_id, name) 组合的数据帧上调用 Spark 的 MinHashLSH 的 approxSimilarityJoin 时遇到问题.

我尝试解决的问题的总结:

我有一个包含大约 3000 万个唯一 (name_id, name) 组合的公司名称数据框.其中一些名称指的是同一家公司,但 (i) 拼写错误,和/或 (ii) 包含其他名称.对每个组合执行模糊字符串匹配是不可能的.为了减少模糊字符串匹配组合的数量,我在 Spark 中使用了 MinHashLSH.我的预期方法是使用具有相对较大 Jaccard 阈值的 approxSimilarityJoin(自联接),以便我能够对匹配的组合运行模糊匹配算法以进一步改善消歧.

我采取的步骤的总结:

  1. 使用 CountVectorizer 为每个名字创建一个字符计数向量,
  2. 使用 MinHashLSH 及其 approxSimilarityJoin 进行以下设置:
    • numHashTables=100
    • threshold=0.3(approxSimilarityJoin 的 Jaccard 阈值)
  3. 在 approxSimilarityJoin 之后,我删除重复的组合(其中存在匹配的组合 (i,j) 和 (j,i),然后我删除 (j,i))
  4. 删除重复组合后,我使用 FuzzyWuzzy 包运行模糊字符串匹配算法,以减少记录数量并改善名称的消歧.
  5. 最后,我在剩余的边 (i,j) 上运行了 connectedComponents 算法,以匹配属于一起的公司名称.

使用的部分代码:

 id_col = 'id'name_col = '名称'num_hastables = 100max_jaccard = 0.3模糊阈值 = 90模糊方法 = fuzz.token_set_ratio# 使用 minhash 实践计算边边 = MinHashLSH(inputCol='vectorized_char_lst', outputCol='hashes', numHashTables=num_hastables).\适合(数据).\approxSimilarityJoin(data, data, max_jaccard).\select(col('datasetA.'+id_col).alias('src'),col('datasetA.clean').alias('src_name'),col('datasetB.'+id_col).alias('dst'),col('datasetB.clean').alias('dst_name')).\withColumn('comb', sort_array(array(*('src', 'dst')))).\dropDuplicates(['comb']).\rdd.\过滤器(lambda x:fuzzy_method(x['src_name'],x['dst_name'])>=fuzzy_threshold if x['src'] != x['dst'] else False).\toDF().\drop(*('src_name', 'dst_name', 'comb'))

解释

的平面图

== 物理计划 ==*(5) HashAggregate(keys=[datasetA#232, datasetB#263], functions=[])+- 交换哈希分区(datasetA#232, datasetB#263, 200)+- *(4) HashAggregate(keys=[datasetA#232, datasetB#263], functions=[])+- *(4) 项目 [datasetA#232, datasetB#263]+- *(4) BroadcastHashJoin [entry#233, hashValue#234], [entry#264, hashValue#265], Inner, BuildRight, (UDF(datasetA#232.vectorized_char_lst, datasetB#263.vectorized_char_lst) <0.3):- *(4) 项目 [named_struct(id, id#10, name, name#11, clean, clean#90, char_lst, char_lst#95, vectorized_char_lst, vectorized_char_lst#107, hashes, hashes#225) AS datasetA#232,条目#233,hashValue#234]: +- *(4) 过滤器 isnotnull(hashValue#234): +- 生成poseexplode(hashes#225), [id#10, name#11, clean#90, char_lst#95, vectorized_char_lst#107, hashes#225], false, [entry#233, hashValue#234]: +- *(1) 项目 [id#10, name#11, clean#90, char_lst#95, vectorized_char_lst#107, UDF(vectorized_char_lst#107) AS hashes#225]: +- InMemoryTableScan [char_lst#95, clean#90, id#10, name#11, vectorized_char_lst#107]: +- InMemoryRelation [id#10, name#11, clean#90, char_lst#95, vectorized_char_lst#107], StorageLevel(disk, memory, deserialized, 1 replicas): +- *(4) 项目 [id#10, name#11, pythonUDF0#114 AS clean#90, pythonUDF2#116 AS char_lst#95, UDF(pythonUDF2#116) AS vectorized_char_lst#107]: +- BatchEvalPython [<lambda>(name#11), <lambda>(<lambda>(name#11)), <lambda>(<lambda>(name#11))], [id#10,名称#11,pythonUDF0#114,pythonUDF1#115,pythonUDF2#116]: +- SortAggregate(key=[name#11], functions=[first(id#10, false)]): +- *(3) Sort [name#11 ASC NULLS FIRST], false, 0: +- 交换哈希分区(name#11, 200): +- SortAggregate(key=[name#11], functions=[partial_first(id#10, false)]): +- *(2) Sort [name#11 ASC NULLS FIRST], false, 0: +- 交换循环分区(8): +- *(1) 过滤 AtLeastNNulls(n, id#10,name#11): +- *(1) FileScan csv [id#10,name#11] 批处理:false,格式:CSV,位置:InMemoryFileIndex[文件:<path>,PartitionFilters:[],PushedFilters:[],ReadSchema:struct<id:string,name:string>+- BroadcastExchange HashedRelationBroadcastMode(List(input[1, int, false], input[2, vector, true]))+- *(3) Project [named_struct(id, id#10, name, name#11, clean, clean#90, char_lst, char_lst#95, vectorized_char_lst, vectorized_char_lst#107, hashes, hashes#256) AS datasetB#263,条目#264,hashValue#265]+- *(3) 过滤器 isnotnull(hashValue#265)+- 生成poseexplode(hashes#256), [id#10, name#11, clean#90, char_lst#95, vectorized_char_lst#107, hashes#256], false, [entry#264, hashValue#265]+- *(2) 项目 [id#10, name#11, clean#90, char_lst#95, vectorized_char_lst#107, UDF(vectorized_char_lst#107) AS hashes#256]+- InMemoryTableScan [char_lst#95, clean#90, id#10, name#11, vectorized_char_lst#107]+- InMemoryRelation [id#10, name#11, clean#90, char_lst#95, vectorized_char_lst#107], StorageLevel(disk, memory, deserialized, 1 replicas)+- *(4) 项目 [id#10, name#11, pythonUDF0#114 AS clean#90, pythonUDF2#116 AS char_lst#95, UDF(pythonUDF2#116) AS vectorized_char_lst#107]+- BatchEvalPython [<lambda>(name#11), <lambda>(<lambda>(name#11)), <lambda>(<lambda>(name#11))], [id#10、名称#11、pythonUDF0#114、pythonUDF1#115、pythonUDF2#116]+- SortAggregate(key=[name#11], functions=[first(id#10, false)])+- *(3) 排序 [name#11 ASC NULLS FIRST], false, 0+- 交换哈希分区(name#11, 200)+- SortAggregate(key=[name#11], functions=[partial_first(id#10, false)])+- *(2) 排序 [name#11 ASC NULLS FIRST], false, 0+- 交换循环分区(8)+- *(1) 过滤 AtLeastNNulls(n, id#10,name#11)+- *(1) FileScan csv [id#10,name#11] 批处理:false,格式:CSV,位置:InMemoryFileIndex[file:<path>,PartitionFilters:[],PushedFilters:[],ReadSchema:struct<id:string,name:string>

data 的样子:

+--------------+------------+--------------------+--------------------+--------------------+|身份证|姓名|干净|字符_lst|vectorized_char_lst|+-------+--------------------+--------------------+--------------------+--------------------+|3633038|村田机械株式会社|村田机械|[M, U, R, A, T, A...|(33,[0,1,2,3,4,5,...||3632811|社会匿名 D...|社会匿名 D...|[S, O, C, I, E, T...|(33,[0,1,2,3,4,5,...||3632655|富士胶片株式会社|FUJIFILM|[F, U, J, I, F, I...|(33,[3,10,12,13,2...||3633318|海涅光电...|海涅光电...|[H, E, I, N, E, ...|(33,[0,1,2,3,4,5,...||3633523|新光产品有限公司|阳光产品|[S, U, N, B, E, A...|(33,[0,1,2,4,5,6,...||3633300|HIVAL有限公司|艾滋病病毒|[H, I, V, A, L]|(33,[2,3,10,11,21...||3632657|NSK有限公司|NSK|[N, S, K]|(33,[5,6,16],[1.0...||3633240|康复于...|康复于...|[R, E, H, A, B, I...|(33,[0,1,2,3,4,5,...||3632732|STUDIENGESELLSCHA...|STUDIENGESELLSCHA...|[S, T, U, D, I, E...|(33,[0,1,2,3,4,5,...||3632866|能量转换...|能量转换...|[E, N, E, R, G, Y...|(33,[0,1,3,5,6,7,...||3632895|ERGENICS POWER SY...|ERGENICS POWER SY...|[E, R, G, E, N, I...|(33,[0,1,3,4,5,6,...||3632897|摩力能源有限公司|摩力能|[M, O, L, I, , E...|(33,[0,1,3,5,7,8,...||3633275|诺信公司|诺森|[N, O, R, D, S, O...|(33,[5,6,7,8,14],...||3633256|过氧化物化学有限公司|过氧化物|[P, E, R, O, X, I...|(33,[0,3,7,8,9,11...||3632695|动力电池公司|电池|[P, O, W, E, R, ...|(33,[0,1,7,8,9,10...||3633037|ERGENICS INC|人体工程学|[E, R, G, E, N, I...|(33,[0,3,5,6,8,9,...||3632878|福特汽车公司|福特汽车|[F, O, R, D, , M...|(33,[1,4,7,8,13,1...||3632573|安全美国公司|安全美国|[S, A, F, T, , A...|(33,[0,1,2,3,4,6,...||3632852|阿尔坎国际...|阿尔坎国际|[A, L, C, A, N, ...|(33,[0,1,2,3,4,5,...||3632698|克虏伯克虏伯有限公司|KRUPPKOPPERS|[K, R, U, P, P, K...|(33,[0,6,7,8,12,1...||3633150|阿尔坎国际...|阿尔坎国际|[A, L, C, A, N, ...|(33,[0,1,2,3,4,5,...||3632761|美国电话...|美国电话...|[A, M, E, R, I, C...|(33,[0,1,2,3,4,5,...||3632757|HITACHI KOKI COMP...|日立工机|[H, I, T, A, C, H...|(33,[1,2,3,4,7,9,...||3632836|休斯飞机C...|休斯飞机|[H, U, G, H, E, S...|(33,[0,1,2,3,4,6,...||3633152|索西公司|索西|[S, O, S, Y]|(33,[6,7,18],[2.0...||3633052|滨松光子...|滨松光子...|[H, A, M, A, M, A...|(33,[1,2,3,4,5,6,...||3633450|阿克苏诺贝尔公司|阿克苏诺贝尔|[A, K, Z, O, , N...|(33,[0,1,2,5,7,10...||3632713|艾创研究公司|ELTRON 研究|[E, L, T, R, O, N...|(33,[0,1,2,4,5,6,...||3632533|NEC 电子产品...|日电电子|[N, E, C, , E, L...|(33,[0,1,3,4,5,6,...||3632562|TARGETTI SANKEY SPA|TARGETTI SANKEY SPA|[T, A, R, G, E, T...|(33,[0,1,2,3,4,5,...|+-------+--------------------+--------------------+--------------------+--------------------+只显示前 30 行

使用的硬件:

  1. 主节点:m5.2xlarge8 vCore,32 GiB 内存,仅 EBS 存储EBS 存储:128 GiB
  2. 从节点(10x):m5.4xlarge16 个 vCore,64 GiB 内存,仅 EBS 存储EBS 存储:500 GiB

使用的 Spark 提交设置:

spark-submit --master yarn --conf "spark.executor.instances=40" --conf "spark.default.parallelism=640" --conf "spark.shuffle.partitions=2000" --conf "spark.executor.cores=4" --conf "spark.executor.memory=14g" --conf "spark.driver.memory=14g" --conf "spark.driver.maxResultSize=14g" --conf"spark.dynamicAllocation.enabled=false" --packages graphframes:graphframes:0.7.0-spark2.4-s_2.11 run_disambiguation.py

Web UI 中的任务错误

ExecutorLostFailure (executor 21 exited by an running tasks ) Reason: Slave lost

ExecutorLostFailure (executor 31 exited unrelated to the running tasks) Reason: 容器标记为失败:container_1590592506722_0001_02_000002 on host: ip-172-31-47-180.eu-central-1.compute.退出状态:-100.诊断:在*丢失*节点上发布的容器.

(部分)执行者日志:

<预><代码>20/05/27 16:29:09 INFO ShuffleExternalSorter:线程 89 将 1988.0 MB 的排序数据溢出到磁盘(到目前为止 25 次)20/05/27 16:29:13 INFO ShuffleExternalSorter:线程 147 将 1988.0 MB 的排序数据溢出到磁盘(到目前为止 26 次)20/05/27 16:29:15 INFO ShuffleExternalSorter:线程 146 将 1988.0 MB 的排序数据溢出到磁盘(到目前为止 28 次)20/05/27 16:29:17 INFO ShuffleExternalSorter:线程 145 将 1988.0 MB 的排序数据溢出到磁盘(到目前为止 0 次)20/05/27 16:29:28 INFO ShuffleExternalSorter:线程 147 将 1988.0 MB 的排序数据溢出到磁盘(到目前为止 27 次)20/05/27 16:29:28 INFO ShuffleExternalSorter:线程 89 将 1988.0 MB 的排序数据溢出到磁盘(到目前为止 26 次)20/05/27 16:29:33 INFO ShuffleExternalSorter:线程 146 将 1988.0 MB 的排序数据溢出到磁盘(到目前为止 29 次)20/05/27 16:29:38 INFO ShuffleExternalSorter:线程 145 将 1988.0 MB 的排序数据溢出到磁盘(迄今为止 1 次)20/05/27 16:29:42 INFO ShuffleExternalSorter:线程 89 将 1988.0 MB 的排序数据溢出到磁盘(到目前为止 27 次)20/05/27 16:29:46 INFO ShuffleExternalSorter:线程 147 将 1988.0 MB 的排序数据溢出到磁盘(到目前为止 28 次)20/05/27 16:29:53 INFO ShuffleExternalSorter:线程 146 将 1988.0 MB 的排序数据溢出到磁盘(到目前为止 30 次)20/05/27 16:29:57 INFO ShuffleExternalSorter:线程 145 将 1988.0 MB 的排序数据溢出到磁盘(到目前为止 2 次)20/05/27 16:30:00 INFO ShuffleExternalSorter:线程 89 将 1988.0 MB 的排序数据溢出到磁盘(到目前为止 28 次)20/05/27 16:30:05 INFO ShuffleExternalSorter:线程 147 将 1988.0 MB 的排序数据溢出到磁盘(到目前为止 29 次)20/05/27 16:30:10 INFO ShuffleExternalSorter:线程 146 将 1988.0 MB 的排序数据溢出到磁盘(迄今为止 31 次)20/05/27 16:30:15 信息 ShuffleExternalSorter:线程 145 将 1988.0 MB 的排序数据溢出到磁盘(到目前为止 3 次)20/05/27 16:30:19 INFO ShuffleExternalSorter:线程 89 将 1988.0 MB 的排序数据溢出到磁盘(到目前为止 29 次)20/05/27 16:30:22 INFO ShuffleExternalSorter:线程 147 将 1988.0 MB 的排序数据溢出到磁盘(到目前为止 30 次)20/05/27 16:30:29 INFO ShuffleExternalSorter:线程 146 将 1988.0 MB 的排序数据溢出到磁盘(迄今为止 32 次)20/05/27 16:30:32 信息 ShuffleExternalSorter:线程 145 将 1988.0 MB 的排序数据溢出到磁盘(迄今为止 4 次)20/05/27 16:30:39 INFO ShuffleExternalSorter:线程 147 将 1988.0 MB 的排序数据溢出到磁盘(迄今为止 31 次)20/05/27 16:30:39 INFO ShuffleExternalSorter:线程 89 将 1988.0 MB 的排序数据溢出到磁盘(迄今为止 30 次)20/05/27 16:30:46 INFO ShuffleExternalSorter:线程 146 将 1988.0 MB 的排序数据溢出到磁盘(迄今为止 33 次)20/05/27 16:30:47 INFO ShuffleExternalSorter:线程 145 将 1988.0 MB 的排序数据溢出到磁盘(到目前为止 5 次)20/05/27 16:30:55 INFO ShuffleExternalSorter:线程 147 将 1988.0 MB 的排序数据溢出到磁盘(迄今为止 32 次)20/05/27 16:30:59 INFO ShuffleExternalSorter:线程 89 将 1988.0 MB 的排序数据溢出到磁盘(迄今为止 31 次)20/05/27 16:31:03 INFO ShuffleExternalSorter:线程 146 将 1988.0 MB 的排序数据溢出到磁盘(迄今为止 34 次)20/05/27 16:31:06 INFO ShuffleExternalSorter:线程 145 将 1988.0 MB 的排序数据溢出到磁盘(到目前为止 6 次)20/05/27 16:31:13 INFO ShuffleExternalSorter:线程 147 将 1988.0 MB 的排序数据溢出到磁盘(迄今为止 33 次)20/05/27 16:31:14 INFO ShuffleExternalSorter:线程 89 将 1988.0 MB 的排序数据溢出到磁盘(迄今为止 32 次)20/05/27 16:31:22 信息 ShuffleExternalSorter:线程 146 将 1988.0 MB 的排序数据溢出到磁盘(迄今为止 35 次)20/05/27 16:31:24 INFO ShuffleExternalSorter:线程 145 将 1988.0 MB 的排序数据溢出到磁盘(迄今为止 7 次)20/05/27 16:31:30 INFO ShuffleExternalSorter:线程 147 将 1988.0 MB 的排序数据溢出到磁盘(迄今为止 34 次)20/05/27 16:31:32 信息 ShuffleExternalSorter:线程 89 将 1988.0 MB 的排序数据溢出到磁盘(迄今为止 33 次)20/05/27 16:31:41 INFO ShuffleExternalSorter:线程 146 将 1988.0 MB 的排序数据溢出到磁盘(迄今为止 36 次)20/05/27 16:31:44 INFO ShuffleExternalSorter:线程 145 将 1988.0 MB 的排序数据溢出到磁盘(到目前为止 8 次)20/05/27 16:31:47 INFO ShuffleExternalSorter:线程 147 将 1988.0 MB 的排序数据溢出到磁盘(迄今为止 35 次)20/05/27 16:31:48 INFO ShuffleExternalSorter:线程 89 将 1988.0 MB 的排序数据溢出到磁盘(迄今为止 34 次)20/05/27 16:32:02 INFO ShuffleExternalSorter:线程 146 将 1988.0 MB 的排序数据溢出到磁盘(迄今为止 37 次)20/05/27 16:32:03 INFO ShuffleExternalSorter:线程 145 将 1988.0 MB 的排序数据溢出到磁盘(到目前为止 9 次)20/05/27 16:32:04 INFO ShuffleExternalSorter:线程 147 将 1988.0 MB 的排序数据溢出到磁盘(迄今为止 36 次)20/05/27 16:32:08 INFO ShuffleExternalSorter:线程 89 将 1988.0 MB 的排序数据溢出到磁盘(到目前为止 35 次)20/05/27 16:32:19 INFO ShuffleExternalSorter:线程 146 将 1988.0 MB 的排序数据溢出到磁盘(迄今为止 38 次)20/05/27 16:32:20 信息 ShuffleExternalSorter:线程 147 将 1988.0 MB 的排序数据溢出到磁盘(迄今为止 37 次)20/05/27 16:32:21 信息 ShuffleExternalSorter:线程 145 将 1988.0 MB 的排序数据溢出到磁盘(迄今为止 10 次)20/05/27 16:32:26 INFO ShuffleExternalSorter:线程 89 将 1988.0 MB 的排序数据溢出到磁盘(迄今为止 36 次)20/05/27 16:32:37 INFO ShuffleExternalSorter:线程 146 将 1988.0 MB 的排序数据溢出到磁盘(迄今为止 39 次)20/05/27 16:32:37 INFO ShuffleExternalSorter:线程 145 将 1988.0 MB 的排序数据溢出到磁盘(迄今为止 11 次)20/05/27 16:32:38 INFO ShuffleExternalSorter:线程 147 将 1988.0 MB 的排序数据溢出到磁盘(迄今为止 38 次)20/05/27 16:32:45 INFO ShuffleExternalSorter:线程 89 将 1988.0 MB 的排序数据溢出到磁盘(迄今为止 37 次)20/05/27 16:32:51 INFO ShuffleExternalSorter:线程 146 将 1988.0 MB 的排序数据溢出到磁盘(迄今为止 40 次)20/05/27 16:32:56 INFO ShuffleExternalSorter:线程 145 将 1988.0 MB 的排序数据溢出到磁盘(迄今为止 12 次)20/05/27 16:32:58 INFO ShuffleExternalSorter:线程 147 将 1988.0 MB 的排序数据溢出到磁盘(迄今为止 39 次)20/05/27 16:33:03 INFO ShuffleExternalSorter:线程 89 将 1988.0 MB 的排序数据溢出到磁盘(到目前为止 38 次)20/05/27 16:33:08 INFO ShuffleExternalSorter:线程 146 将 1988.0 MB 的排序数据溢出到磁盘(迄今为止 41 次)20/05/27 16:33:13 信息 ShuffleExternalSorter:线程 145 将 1988.0 MB 的排序数据溢出到磁盘(迄今为止 13 次)20/05/27 16:33:15 INFO ShuffleExternalSorter:线程 147 将 1988.0 MB 的排序数据溢出到磁盘(迄今为止 40 次)20/05/27 16:33:20 INFO ShuffleExternalSorter:线程 89 将 1988.0 MB 的排序数据溢出到磁盘(迄今为止 39 次)20/05/27 16:33:26 INFO ShuffleExternalSorter:线程 146 将 1988.0 MB 的排序数据溢出到磁盘(迄今为止 42 次)20/05/27 16:33:30 INFO ShuffleExternalSorter:线程 147 将 1988.0 MB 的排序数据溢出到磁盘(迄今为止 41 次)20/05/27 16:33:31 信息 ShuffleExternalSorter:线程 145 将 1988.0 MB 的排序数据溢出到磁盘(迄今为止 14 次)20/05/27 16:33:36 INFO ShuffleExternalSorter:线程 89 将 1988.0 MB 的排序数据溢出到磁盘(迄今为止 40 次)20/05/27 16:33:46 INFO ShuffleExternalSorter:线程 146 将 1992.0 MB 的排序数据溢出到磁盘(迄今为止 43 次)20/05/27 16:33:47 INFO ShuffleExternalSorter:线程 147 将 1988.0 MB 的排序数据溢出到磁盘(迄今为止 42 次)20/05/27 16:33:51 信息 ShuffleExternalSorter:线程 145 将 1988.0 MB 的排序数据溢出到磁盘(迄今为止 15 次)20/05/27 16:33:54 INFO ShuffleExternalSorter:线程 89 将 1988.0 MB 的排序数据溢出到磁盘(迄今为止 41 次)20/05/27 16:34:03 INFO ShuffleExternalSorter:线程 147 将 1992.0 MB 的排序数据溢出到磁盘(迄今为止 43 次)20/05/27 16:34:04 INFO ShuffleExternalSorter:线程 146 将 1992.0 MB 的排序数据溢出到磁盘(迄今为止 44 次)20/05/27 16:34:08 INFO ShuffleExternalSorter:线程 145 将 1988.0 MB 的排序数据溢出到磁盘(迄今为止 16 次)20/05/27 16:34:14 INFO ShuffleExternalSorter:线程 89 将 1988.0 MB 的排序数据溢出到磁盘(迄今为止 42 次)20/05/27 16:34:16 INFO PythonUDFRunner:时间:总计 = 774701,启动 = 3,初始化 = 10,完成 = 77468820/05/27 16:34:21 INFO ShuffleExternalSorter:线程 147 将 1992.0 MB 的排序数据溢出到磁盘(迄今为止 44 次)20/05/27 16:34:22 INFO ShuffleExternalSorter:线程 145 将 1988.0 MB 的排序数据溢出到磁盘(迄今为止 17 次)20/05/27 16:34:30 信息 PythonUDFRunner:时间:总计 = 773372,启动 = 2,初始化 = 9,完成 = 77336120/05/27 16:34:32 信息 ShuffleExternalSorter:线程 89 将 1992.0 MB 的排序数据溢出到磁盘(迄今为止 43 次)20/05/27 16:34:39 信息 ShuffleExternalSorter:线程 145 将 1988.0 MB 的排序数据溢出到磁盘(迄今为止 18 次)20/05/27 16:34:46 INFO ShuffleExternalSorter:线程 89 将 1992.0 MB 的排序数据溢出到磁盘(迄今为止 44 次)20/05/27 16:34:52 信息 ShuffleExternalSorter:线程 145 将 1988.0 MB 的排序数据溢出到磁盘(迄今为止 19 次)20/05/27 16:35:01 信息 PythonUDFRunner:时间:总计 = 776905,启动 = 3,初始化 = 11,完成 = 77689120/05/27 16:35:05 INFO ShuffleExternalSorter:线程 145 将 1988.0 MB 的排序数据溢出到磁盘(到目前为止 20 次)20/05/27 16:35:19 INFO ShuffleExternalSorter:线程 145 将 1988.0 MB 的排序数据溢出到磁盘(到目前为止 21 次)20/05/27 16:35:35 INFO ShuffleExternalSorter:线程 145 将 1988.0 MB 的排序数据溢出到磁盘(到目前为止 22 次)20/05/27 16:35:52 信息 ShuffleExternalSorter:线程 145 将 1988.0 MB 的排序数据溢出到磁盘(到目前为止 23 次)20/05/27 16:36:10 INFO ShuffleExternalSorter:线程 145 将 1988.0 MB 的排序数据溢出到磁盘(到目前为止 24 次)20/05/27 16:36:29 INFO ShuffleExternalSorter:线程 145 将 1988.0 MB 的排序数据溢出到磁盘(到目前为止 25 次)20/05/27 16:36:47 INFO ShuffleExternalSorter:线程 145 将 1988.0 MB 的排序数据溢出到磁盘(到目前为止 26 次)20/05/27 16:37:06 INFO ShuffleExternalSorter:线程 145 将 1988.0 MB 的排序数据溢出到磁盘(到目前为止 27 次)20/05/27 16:37:25 INFO ShuffleExternalSorter:线程 145 将 1988.0 MB 的排序数据溢出到磁盘(到目前为止 28 次)20/05/27 16:37:44 INFO ShuffleExternalSorter:线程 145 将 1988.0 MB 的排序数据溢出到磁盘(到目前为止 29 次)20/05/27 16:38:03 INFO ShuffleExternalSorter:线程 145 将 1988.0 MB 的排序数据溢出到磁盘(到目前为止 30 次)20/05/27 16:38:22 INFO ShuffleExternalSorter:线程 145 将 1988.0 MB 的排序数据溢出到磁盘(迄今为止 31 次)20/05/27 16:38:41 INFO ShuffleExternalSorter:线程 145 将 1988.0 MB 的排序数据溢出到磁盘(迄今为止 32 次)20/05/27 16:38:59 信息 ShuffleExternalSorter:线程 145 将 1988.0 MB 的排序数据溢出到磁盘(迄今为止 33 次)20/05/27 16:39:19 信息 ShuffleExternalSorter:线程 145 将 1988.0 MB 的排序数据溢出到磁盘(迄今为止 34 次)20/05/27 16:39:39 INFO ShuffleExternalSorter:线程 145 将 1988.0 MB 的排序数据溢出到磁盘(迄今为止 35 次)20/05/27 16:39:58 INFO ShuffleExternalSorter:线程 145 将 1988.0 MB 的排序数据溢出到磁盘(迄今为止 36 次)20/05/27 16:40:18 INFO ShuffleExternalSorter:线程 145 将 1988.0 MB 的排序数据溢出到磁盘(迄今为止 37 次)20/05/27 16:40:38 INFO ShuffleExternalSorter:线程 145 将 1988.0 MB 的排序数据溢出到磁盘(迄今为止 38 次)20/05/27 16:40:57 INFO ShuffleExternalSorter:线程 145 将 1988.0 MB 的排序数据溢出到磁盘(迄今为止 39 次)20/05/27 16:41:16 INFO ShuffleExternalSorter:线程 145 将 1988.0 MB 的排序数据溢出到磁盘(迄今为止 40 次)20/05/27 16:41:35 信息 ShuffleExternalSorter:线程 145 将 1988.0 MB 的排序数据溢出到磁盘(迄今为止 41 次)20/05/27 16:41:55 INFO ShuffleExternalSorter:线程 145 将 1988.0 MB 的排序数据溢出到磁盘(迄今为止 42 次)20/05/27 16:42:19 INFO ShuffleExternalSorter:线程 145 将 1992.0 MB 的排序数据溢出到磁盘(迄今为止 43 次)20/05/27 16:42:41 INFO ShuffleExternalSorter:线程 145 将 1992.0 MB 的排序数据溢出到磁盘(迄今为止 44 次)20/05/27 16:42:59 错误 CoarseGrainedExecutorBackend:收到信号项20/05/27 16:42:59 信息 DiskBlockManager:关闭钩子被调用20/05/27 16:42:59 信息 ShutdownHookManager:关闭钩子被调用20/05/27 16:42:59 INFO ShutdownHookManager:删除目录/mnt/yarn/usercache/hadoop/appcache/application_1590592506722_0001/spark-73af8e3b-f428-47d4-9e19-fed24e

2020-05-27T16:41:16.336+0000: [GC (Allocation Failure) 2020-05-27T16:41:16.336+0000: [ParNew: 272234K->242K(3095900)] 9076907K->8804915K(13188748K), 0.0094895 秒] [时间:用户=0.12 系统=0.00,真实=0.01 秒]2020-05-27T16:41:34.686+0000: [GC (Allocation Failure) 2020-05-27T16:41:34.686+0000: [ParNew: 272242K->257K(305989.097s), 87K89s, 87000](13188748K),0.0084840 秒] [时间:用户=0.09 系统=0.01,真实=0.01 秒]2020-05-27T16:41:35.145+0000:[GC(分配失败)2020-05-27T16:41:35.145+0000:[ParNew:272257K->1382K(30590594K)85984K600000秒]85984K6000(13188748K),0.0096080 秒] [时间:用户=0.12 系统=0.00,真实=0.01 秒]2020-05-27T16:41:55.077+0000:[GC(分配失败)2020-05-27T16:41:55.077+0000:[ParNew:273382K->2683K(3059877000)]870000000000000000000000000000000000000000:[ParNew:273382K->2683K(30590770000)>87000秒](13188748K),0.0097754 秒] [时间:用户=0.12 系统=0.00,真实=0.01 秒]2020-05-27T16:41:55.513+0000: [GC(分配失败)2020-05-27T16:41:55.513+0000: [ParNew: 274683K->3025K(3059895K)3025K(3059895K)>373K379s(13188748K),0.0093892 秒] [时间:用户=0.12 系统=0.00,真实=0.01 秒]2020-05-27T16:42:05.481+0000: [GC(分配失败)2020-05-27T16:42:05.481+0000: [ParNew: 275025K->4102K(305989.5K809s)8098909s](13188748K),0.0093464 秒] [时间:用户=0.12 系统=0.00,真实=0.01 秒]2020-05-27T16:42:18.711+0000: [GC (Allocation Failure) 2020-05-27T16:42:18.711+0000: [ParNew: 276102K->2972K(305902K->2972K(305909.84K)]809080908098090809909(13188748K),0.0099510 秒] [时间:用户=0.13 系统=0.00,真实=0.01 秒]2020-05-27T16:42:36.493+0000: [GC(分配失败)2020-05-27T16:42:36.493+0000: [ParNew: 274972K->3852K(305982K->3852K(305984K)800000000000000005982K->85984K8909s)(13188748K),0.0094897 秒] [时间:用户=0.11 系统=0.00,真实=0.01 秒]2020-05-27T16:42:40.880+0000: [GC (Allocation Failure) 2020-05-27T16:42:40.880+0000: [ParNew: 275852K->2568K(305989.84K);870.87Ks]87000 秒(13188748K),0.0112352 秒] [时间:用户=0.13 系统=0.00,真实=0.01 秒]堆par 新一代总计 305984K,使用 261139K [0x0000000440000000, 0x0000000454c00000, 0x0000000483990000)伊甸园空间 272000K,已使用 95% [0x0000000440000000, 0x000000044fc82cf8, 0x00000004509a0000)从空间 33984K, 7% 使用 [0x00000004509a0000, 0x0000000450c220a8, 0x0000000452ad0000)到空间 33984K, 0% 已使用 [0x0000000452ad0000, 0x0000000452ad0000, 0x0000000454c00000)并发标记清除生成总计 12882764K,已使用 8805314K [0x0000000483990000, 0x0000000795e63000, 0x00000007c0000000)Metaspace 使用 77726K,容量 79553K,提交 79604K,保留 1118208K已用类空间 10289K,容量 10704K,已提交 10740K,保留 1048576K

执行者截图

我的尝试:

  • 改变spark.sql.shuffle.partitions
  • 改变spark.default.parallelism
  • 重新分区数据帧

我该如何解决这个问题?

提前致谢!

蒂斯

解决方案

@lok​​k3r 的回答真的帮助我找到了正确的方向.然而,在我能够无错误地运行程序之前,我还必须做一些其他的事情.我将分享它们以帮助遇到类似问题的人:

  • 首先,我按照@lokk3r 的建议使用了 NGrams 而不是单个字符,以避免在 MinHashLSH 算法中出现极端的数据倾斜.使用 4-gram 时,data 看起来像:

+------------------------------+-------+------------------------------+-------------------------------+------------------------------+|姓名|身份证|干净|ng_char_lst|vectorized_char_lst|+------------------------------+-------+------------------------------+-----------------------------+------------------------------+|SOCIETE ANONYME DITE SAFT|3632811|SOCIETE ANONYME DITE SAFT|[ S O C, S O C I, O C I E,...|(1332,[64,75,82,84,121,223,...||村田机械有限公司|3633038|村田机械|[ M U R, M U R A, U R A T,...|(1332,[55,315,388,437,526,5...||HEINE OPTOTECHNIK GMBH 和 ...|3633318|HEINE OPTOTECHNIK GMBH 和|[ H E I, H E I N, E I N E,...|(1332,[23,72,216,221,229,34...||富士胶片株式会社|3632655|FUJIFILM|[ F U J, F U J I, U J I F,...|(1332,[157,179,882,1028],[1...||阳光产品股份有限公司|3633523|SUNBEAM 产品|[ S U N, S U N B, U N B E,...|(1332,[99,137,165,175,187,1...||STUDIENGESELLSCHAFT KOHLE MBH|3632732|STUDIENGESELLSCHAFT KOHLE MBH|[ S T U, S T U D, T U D I,...|(1332,[13,14,23,25,43,52,57...||...的康复研究所|3633240|...的康复研究所|[ R E H, R E H A, E H A B,...|(1332,[20,44,51,118,308,309...||诺信公司|3633275|NORDSON|[ N O R, N O R D, O R D S,...|(1332,[45,88,582,1282],[1.0...||能量转换装置|3632866|能源转换装置|[ EN E, E N E R, N E R G,...|(1332,[54,76,81,147,202,224...||摩力能源有限公司|3632897|摩力能源|[ M O L, M O L I, O L I,...|(1332,[438,495,717,756,1057...||ERGENICS POWER SYSTEMS INC|3632895|ERGENICS 动力系统|[ E R G, E R G E, R G E N,...|(1332,[6,10,18,21,24,35,375...||POWER CELL INC|3632695|电池|[ P O W, P O W E, O W E R,...|(1332,[6,10,18,35,126,169,3...||过氧化物化学有限公司|3633256|过氧化物|[ P E R, P E R O, E R O X,...|(1332,[326,450,532,889,1073...||福特汽车公司|3632878|福特汽车|[ F O R, F O R D, O R D ,...|(1332,[156,158,186,200,314,...||ERGENICS INC|3633037|ERGENICS|[ E R G, E R G E, R G E N,...|(1332,[375,642,812,866,1269...||SAFT AMERICA INC|3632573|安全美国|[ S A F, S A F T, A F T ,...|(1332,[498,552,1116],[1.0,1...||阿尔坎国际有限公司|3632598|ALCAN INTERNATIONAL|[ A L C, A L C A, L C A N,...|(1332,[20,434,528,549,571,7...||KRUPPKOPPERS GMBH|3632698|KRUPPKOPPERS|[ K R U, K R U P, R U P P,...|(1332,[664,795,798,1010,114...||休斯飞机公司|3632752|休斯飞机|[ H U G, H U G H, U G H E,...|(1332,[605,632,705,758,807,...||美国电话和电话...|3632761|美国电话和电话...|[ A M E, A M E R, M E R I,...|(1332,[19,86,91,126,128,134...|+------------------------------+-------+------------------------------+-----------------------------+------------------------------+

请注意,我在名称上添加了前导和尾随空格,以确保名称中的单词顺序对于 NGrams 无关紧要:'XX YY' 有 3-grams 'XX', 'X Y', ' YY',而 'YY XX' 有 3-grams 'YY', 'Y X', 'XX'.这意味着两者共享 6 个唯一 NGrams 中的 0 个.如果我们使用前导和尾随空格:' XX YY ' 有 3-grams ' XX', 'XX', 'X Y', ' YY', 'YY ',而 ' YY XX ' 有 3-grams ' YY', 'YY ', 'Y X', 'XX', 'XX'.这意味着两者共享 6 个唯一 NGrams 中的 4 个.这意味着在 MinHashLSH 期间,两条记录在同一个桶中结束的可能性要大得多.

  • 我尝试了 n 的不同值 - NGrams 的输入参数.我发现 n=2n=3 仍然会产生如此多的数据偏差,以至于一些 Spark 作业花费的时间太长,而其他作业则在几秒钟内完成.因此,您最终会在程序继续之前永远等待.我现在使用 n=4,这仍然会产生很大的偏差,但它是可行的.

  • 为了进一步减少数据倾斜的影响,我在 Spark 的 CountVectorizer 方法中对经常出现的 NGrams 使用了一些额外的过滤.我已经设置了 minDF=2 以便它过滤掉仅出现在单个名称中的 NGrams.I did this because you cannot match those names based on a NGram that occurs only in one name anyways.In addition, I set maxDF=0.001 such that it filters out NGrams that are occuring in more than 0.1% of the names.This means for approximately 30 million names, that NGrams that occur more frequently than in 30000 names are filtered out.I figured that a too frequently occuring NGram will not provide usefull information on which names can be matched anyways.

  • I reduce the number of unique names (30 million first) to 15 million by filtering out the non-Latin (extended) names. I noticed that (e.g. Arabic and Chinese) characters caused a big skew in the data as well. Since I am not primarily interested in disambiguating these company names, I disregarded them from the data set. I filtered using the following regex match:

re.fullmatch('[\u0020-\u007F\u00A0-\u00FF\u0100-\u017F\u0180-\u024F]+'.encode(), string_to_filter.encode())

  • This is a little bit a straight forward advise, but I ran into some problems by not seing it. Make sure you run a filter on dataset before feeding it to the MinHashLSH algorithm to filter out records that have no NGrams remaining due to the settings minDF and maxDF or just because it is a small name. Obviously this will not work for the MinHashLSH algorithm.

  • Finally, regarding the settings of the spark-submit command and the hardware settings of the EMR cluster, I found that I didn't need a larger cluster as some of the answers on the forums suggested. All the above changes made the program run perfectly on a cluster with the settings as provided in my original post. Reducing the spark.shuffle.partitions, the spark.driver.memory and the spark.driver.maxResultSize substantially improved the running time of the program. The spark-submit I submitted was:

spark-submit --master yarn --conf "spark.executor.instances=40" --conf "spark.default.parallelism=640" --conf "spark.executor.cores=4" --conf "spark.executor.memory=12g" --conf "spark.driver.memory=8g" --conf "spark.driver.maxResultSize=8g" --conf "spark.dynamicAllocation.enabled=false" --packages graphframes:graphframes:0.7.0-spark2.4-s_2.11 run_disambiguation.py

I run into problems when calling Spark's MinHashLSH's approxSimilarityJoin on a dataframe of (name_id, name) combinations.

A summary of the problem I try to solve:

I have a dataframe of around 30 million unique (name_id, name) combinations for company names. Some of those names refer to the same company, but are (i) either misspelled, and/or (ii) include additional names. Performing fuzzy string matching for every combination is not possible. To reduce the number of fuzzy string matching combinations, I use MinHashLSH in Spark. My intended approach is to use a approxSimilarityJoin (self-join) with a relatively large Jaccard threshold, such that I am able to run a fuzzy matching algorithm on the matched combinations to further improve the disambiguation.

A summary of the steps I took:

  1. Used CountVectorizer to create a vector of character counts for every name,
  2. Used MinHashLSH and its approxSimilarityJoin with the following settings:
    • numHashTables=100
    • threshold=0.3 (Jaccard threshold for approxSimilarityJoin)
  3. After the approxSimilarityJoin, I remove duplicate combinations (for which holds that there exists a matched combination (i,j) and (j,i), then I remove (j,i))
  4. After removing the duplicate combinations, I run a fuzzy string matching algorithm using the FuzzyWuzzy package to reduce the number of records and improve the disambiguation of the names.
  5. Eventually I run a connectedComponents algorithm on the remaining edges (i,j) to match which company names belong together.

Part of code used:

    id_col = 'id'
    name_col = 'name'
    num_hastables = 100
    max_jaccard = 0.3
    fuzzy_threshold = 90
    fuzzy_method = fuzz.token_set_ratio

    # Calculate edges using minhash practices
    edges = MinHashLSH(inputCol='vectorized_char_lst', outputCol='hashes', numHashTables=num_hastables).\
        fit(data).\
        approxSimilarityJoin(data, data, max_jaccard).\
        select(col('datasetA.'+id_col).alias('src'),
               col('datasetA.clean').alias('src_name'),
               col('datasetB.'+id_col).alias('dst'),
               col('datasetB.clean').alias('dst_name')).\
        withColumn('comb', sort_array(array(*('src', 'dst')))).\
        dropDuplicates(['comb']).\
        rdd.\
        filter(lambda x: fuzzy_method(x['src_name'], x['dst_name']) >= fuzzy_threshold if x['src'] != x['dst'] else False).\
        toDF().\
        drop(*('src_name', 'dst_name', 'comb'))

Explain plan of edges

== Physical Plan ==
*(5) HashAggregate(keys=[datasetA#232, datasetB#263], functions=[])
+- Exchange hashpartitioning(datasetA#232, datasetB#263, 200)
   +- *(4) HashAggregate(keys=[datasetA#232, datasetB#263], functions=[])
      +- *(4) Project [datasetA#232, datasetB#263]
         +- *(4) BroadcastHashJoin [entry#233, hashValue#234], [entry#264, hashValue#265], Inner, BuildRight, (UDF(datasetA#232.vectorized_char_lst, datasetB#263.vectorized_char_lst) < 0.3)
            :- *(4) Project [named_struct(id, id#10, name, name#11, clean, clean#90, char_lst, char_lst#95, vectorized_char_lst, vectorized_char_lst#107, hashes, hashes#225) AS datasetA#232, entry#233, hashValue#234]
            :  +- *(4) Filter isnotnull(hashValue#234)
            :     +- Generate posexplode(hashes#225), [id#10, name#11, clean#90, char_lst#95, vectorized_char_lst#107, hashes#225], false, [entry#233, hashValue#234]
            :        +- *(1) Project [id#10, name#11, clean#90, char_lst#95, vectorized_char_lst#107, UDF(vectorized_char_lst#107) AS hashes#225]
            :           +- InMemoryTableScan [char_lst#95, clean#90, id#10, name#11, vectorized_char_lst#107]
            :                 +- InMemoryRelation [id#10, name#11, clean#90, char_lst#95, vectorized_char_lst#107], StorageLevel(disk, memory, deserialized, 1 replicas)
            :                       +- *(4) Project [id#10, name#11, pythonUDF0#114 AS clean#90, pythonUDF2#116 AS char_lst#95, UDF(pythonUDF2#116) AS vectorized_char_lst#107]
            :                          +- BatchEvalPython [<lambda>(name#11), <lambda>(<lambda>(name#11)), <lambda>(<lambda>(name#11))], [id#10, name#11, pythonUDF0#114, pythonUDF1#115, pythonUDF2#116]
            :                             +- SortAggregate(key=[name#11], functions=[first(id#10, false)])
            :                                +- *(3) Sort [name#11 ASC NULLS FIRST], false, 0
            :                                   +- Exchange hashpartitioning(name#11, 200)
            :                                      +- SortAggregate(key=[name#11], functions=[partial_first(id#10, false)])
            :                                         +- *(2) Sort [name#11 ASC NULLS FIRST], false, 0
            :                                            +- Exchange RoundRobinPartitioning(8)
            :                                               +- *(1) Filter AtLeastNNulls(n, id#10,name#11)
            :                                                  +- *(1) FileScan csv [id#10,name#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:<path>, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:string,name:string>
            +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, int, false], input[2, vector, true]))
               +- *(3) Project [named_struct(id, id#10, name, name#11, clean, clean#90, char_lst, char_lst#95, vectorized_char_lst, vectorized_char_lst#107, hashes, hashes#256) AS datasetB#263, entry#264, hashValue#265]
                  +- *(3) Filter isnotnull(hashValue#265)
                     +- Generate posexplode(hashes#256), [id#10, name#11, clean#90, char_lst#95, vectorized_char_lst#107, hashes#256], false, [entry#264, hashValue#265]
                        +- *(2) Project [id#10, name#11, clean#90, char_lst#95, vectorized_char_lst#107, UDF(vectorized_char_lst#107) AS hashes#256]
                           +- InMemoryTableScan [char_lst#95, clean#90, id#10, name#11, vectorized_char_lst#107]
                                 +- InMemoryRelation [id#10, name#11, clean#90, char_lst#95, vectorized_char_lst#107], StorageLevel(disk, memory, deserialized, 1 replicas)
                                       +- *(4) Project [id#10, name#11, pythonUDF0#114 AS clean#90, pythonUDF2#116 AS char_lst#95, UDF(pythonUDF2#116) AS vectorized_char_lst#107]
                                          +- BatchEvalPython [<lambda>(name#11), <lambda>(<lambda>(name#11)), <lambda>(<lambda>(name#11))], [id#10, name#11, pythonUDF0#114, pythonUDF1#115, pythonUDF2#116]
                                             +- SortAggregate(key=[name#11], functions=[first(id#10, false)])
                                                +- *(3) Sort [name#11 ASC NULLS FIRST], false, 0
                                                   +- Exchange hashpartitioning(name#11, 200)
                                                      +- SortAggregate(key=[name#11], functions=[partial_first(id#10, false)])
                                                         +- *(2) Sort [name#11 ASC NULLS FIRST], false, 0
                                                            +- Exchange RoundRobinPartitioning(8)
                                                               +- *(1) Filter AtLeastNNulls(n, id#10,name#11)
                                                                  +- *(1) FileScan csv [id#10,name#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:<path>, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:string,name:string>

How data looks:

+-------+--------------------+--------------------+--------------------+--------------------+
|     id|                name|               clean|            char_lst| vectorized_char_lst|
+-------+--------------------+--------------------+--------------------+--------------------+
|3633038|MURATA MACHINERY LTD|    MURATA MACHINERY|[M, U, R, A, T, A...|(33,[0,1,2,3,4,5,...|
|3632811|SOCIETE ANONYME D...|SOCIETE ANONYME D...|[S, O, C, I, E, T...|(33,[0,1,2,3,4,5,...|
|3632655|FUJIFILM CORPORATION|            FUJIFILM|[F, U, J, I, F, I...|(33,[3,10,12,13,2...|
|3633318|HEINE OPTOTECHNIK...|HEINE OPTOTECHNIK...|[H, E, I, N, E,  ...|(33,[0,1,2,3,4,5,...|
|3633523|SUNBEAM PRODUCTS INC|    SUNBEAM PRODUCTS|[S, U, N, B, E, A...|(33,[0,1,2,4,5,6,...|
|3633300|           HIVAL LTD|               HIVAL|     [H, I, V, A, L]|(33,[2,3,10,11,21...|
|3632657|             NSK LTD|                 NSK|           [N, S, K]|(33,[5,6,16],[1.0...|
|3633240|REHABILITATION IN...|REHABILITATION IN...|[R, E, H, A, B, I...|(33,[0,1,2,3,4,5,...|
|3632732|STUDIENGESELLSCHA...|STUDIENGESELLSCHA...|[S, T, U, D, I, E...|(33,[0,1,2,3,4,5,...|
|3632866|ENERGY CONVERSION...|ENERGY CONVERSION...|[E, N, E, R, G, Y...|(33,[0,1,3,5,6,7,...|
|3632895|ERGENICS POWER SY...|ERGENICS POWER SY...|[E, R, G, E, N, I...|(33,[0,1,3,4,5,6,...|
|3632897| MOLI ENERGY LIMITED|         MOLI ENERGY|[M, O, L, I,  , E...|(33,[0,1,3,5,7,8,...|
|3633275| NORDSON CORPORATION|             NORDSON|[N, O, R, D, S, O...|(33,[5,6,7,8,14],...|
|3633256|  PEROXIDCHEMIE GMBH|       PEROXIDCHEMIE|[P, E, R, O, X, I...|(33,[0,3,7,8,9,11...|
|3632695|      POWER CELL INC|          POWER CELL|[P, O, W, E, R,  ...|(33,[0,1,7,8,9,10...|
|3633037|        ERGENICS INC|            ERGENICS|[E, R, G, E, N, I...|(33,[0,3,5,6,8,9,...|
|3632878|  FORD MOTOR COMPANY|          FORD MOTOR|[F, O, R, D,  , M...|(33,[1,4,7,8,13,1...|
|3632573|    SAFT AMERICA INC|        SAFT AMERICA|[S, A, F, T,  , A...|(33,[0,1,2,3,4,6,...|
|3632852|ALCAN INTERNATION...| ALCAN INTERNATIONAL|[A, L, C, A, N,  ...|(33,[0,1,2,3,4,5,...|
|3632698|   KRUPPKOPPERS GMBH|        KRUPPKOPPERS|[K, R, U, P, P, K...|(33,[0,6,7,8,12,1...|
|3633150|ALCAN INTERNATION...| ALCAN INTERNATIONAL|[A, L, C, A, N,  ...|(33,[0,1,2,3,4,5,...|
|3632761|AMERICAN TELEPHON...|AMERICAN TELEPHON...|[A, M, E, R, I, C...|(33,[0,1,2,3,4,5,...|
|3632757|HITACHI KOKI COMP...|        HITACHI KOKI|[H, I, T, A, C, H...|(33,[1,2,3,4,7,9,...|
|3632836|HUGHES AIRCRAFT C...|     HUGHES AIRCRAFT|[H, U, G, H, E, S...|(33,[0,1,2,3,4,6,...|
|3633152|            SOSY INC|                SOSY|        [S, O, S, Y]|(33,[6,7,18],[2.0...|
|3633052|HAMAMATSU PHOTONI...|HAMAMATSU PHOTONI...|[H, A, M, A, M, A...|(33,[1,2,3,4,5,6,...|
|3633450|       AKZO NOBEL NV|          AKZO NOBEL|[A, K, Z, O,  , N...|(33,[0,1,2,5,7,10...|
|3632713| ELTRON RESEARCH INC|     ELTRON RESEARCH|[E, L, T, R, O, N...|(33,[0,1,2,4,5,6,...|
|3632533|NEC ELECTRONICS C...|     NEC ELECTRONICS|[N, E, C,  , E, L...|(33,[0,1,3,4,5,6,...|
|3632562| TARGETTI SANKEY SPA| TARGETTI SANKEY SPA|[T, A, R, G, E, T...|(33,[0,1,2,3,4,5,...|
+-------+--------------------+--------------------+--------------------+--------------------+
only showing top 30 rows

Hardware used:

  1. Master node: m5.2xlarge 8 vCore, 32 GiB memory, EBS only storage EBS Storage:128 GiB
  2. Slave nodes (10x): m5.4xlarge 16 vCore, 64 GiB memory, EBS only storage EBS Storage:500 GiB

Spark-submit settings used:

spark-submit --master yarn --conf "spark.executor.instances=40" --conf "spark.default.parallelism=640" --conf "spark.shuffle.partitions=2000" --conf "spark.executor.cores=4" --conf "spark.executor.memory=14g" --conf "spark.driver.memory=14g" --conf "spark.driver.maxResultSize=14g" --conf "spark.dynamicAllocation.enabled=false" --packages graphframes:graphframes:0.7.0-spark2.4-s_2.11 run_disambiguation.py

Task errors from Web UI

ExecutorLostFailure (executor 21 exited caused by one of the running tasks) Reason: Slave lost

ExecutorLostFailure (executor 31 exited unrelated to the running tasks) Reason: Container marked as failed: container_1590592506722_0001_02_000002 on host: ip-172-31-47-180.eu-central-1.compute.internal. Exit status: -100. Diagnostics: Container released on a *lost* node.

(Part of) executor logs:


20/05/27 16:29:09 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (25  times so far)
20/05/27 16:29:13 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1988.0 MB to disk (26  times so far)
20/05/27 16:29:15 INFO ShuffleExternalSorter: Thread 146 spilling sort data of 1988.0 MB to disk (28  times so far)
20/05/27 16:29:17 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (0  time so far)
20/05/27 16:29:28 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1988.0 MB to disk (27  times so far)
20/05/27 16:29:28 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (26  times so far)
20/05/27 16:29:33 INFO ShuffleExternalSorter: Thread 146 spilling sort data of 1988.0 MB to disk (29  times so far)
20/05/27 16:29:38 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (1  time so far)
20/05/27 16:29:42 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (27  times so far)
20/05/27 16:29:46 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1988.0 MB to disk (28  times so far)
20/05/27 16:29:53 INFO ShuffleExternalSorter: Thread 146 spilling sort data of 1988.0 MB to disk (30  times so far)
20/05/27 16:29:57 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (2  times so far)
20/05/27 16:30:00 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (28  times so far)
20/05/27 16:30:05 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1988.0 MB to disk (29  times so far)
20/05/27 16:30:10 INFO ShuffleExternalSorter: Thread 146 spilling sort data of 1988.0 MB to disk (31  times so far)
20/05/27 16:30:15 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (3  times so far)
20/05/27 16:30:19 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (29  times so far)
20/05/27 16:30:22 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1988.0 MB to disk (30  times so far)
20/05/27 16:30:29 INFO ShuffleExternalSorter: Thread 146 spilling sort data of 1988.0 MB to disk (32  times so far)
20/05/27 16:30:32 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (4  times so far)
20/05/27 16:30:39 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1988.0 MB to disk (31  times so far)
20/05/27 16:30:39 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (30  times so far)
20/05/27 16:30:46 INFO ShuffleExternalSorter: Thread 146 spilling sort data of 1988.0 MB to disk (33  times so far)
20/05/27 16:30:47 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (5  times so far)
20/05/27 16:30:55 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1988.0 MB to disk (32  times so far)
20/05/27 16:30:59 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (31  times so far)
20/05/27 16:31:03 INFO ShuffleExternalSorter: Thread 146 spilling sort data of 1988.0 MB to disk (34  times so far)
20/05/27 16:31:06 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (6  times so far)
20/05/27 16:31:13 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1988.0 MB to disk (33  times so far)
20/05/27 16:31:14 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (32  times so far)
20/05/27 16:31:22 INFO ShuffleExternalSorter: Thread 146 spilling sort data of 1988.0 MB to disk (35  times so far)
20/05/27 16:31:24 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (7  times so far)
20/05/27 16:31:30 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1988.0 MB to disk (34  times so far)
20/05/27 16:31:32 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (33  times so far)
20/05/27 16:31:41 INFO ShuffleExternalSorter: Thread 146 spilling sort data of 1988.0 MB to disk (36  times so far)
20/05/27 16:31:44 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (8  times so far)
20/05/27 16:31:47 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1988.0 MB to disk (35  times so far)
20/05/27 16:31:48 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (34  times so far)
20/05/27 16:32:02 INFO ShuffleExternalSorter: Thread 146 spilling sort data of 1988.0 MB to disk (37  times so far)
20/05/27 16:32:03 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (9  times so far)
20/05/27 16:32:04 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1988.0 MB to disk (36  times so far)
20/05/27 16:32:08 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (35  times so far)
20/05/27 16:32:19 INFO ShuffleExternalSorter: Thread 146 spilling sort data of 1988.0 MB to disk (38  times so far)
20/05/27 16:32:20 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1988.0 MB to disk (37  times so far)
20/05/27 16:32:21 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (10  times so far)
20/05/27 16:32:26 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (36  times so far)
20/05/27 16:32:37 INFO ShuffleExternalSorter: Thread 146 spilling sort data of 1988.0 MB to disk (39  times so far)
20/05/27 16:32:37 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (11  times so far)
20/05/27 16:32:38 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1988.0 MB to disk (38  times so far)
20/05/27 16:32:45 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (37  times so far)
20/05/27 16:32:51 INFO ShuffleExternalSorter: Thread 146 spilling sort data of 1988.0 MB to disk (40  times so far)
20/05/27 16:32:56 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (12  times so far)
20/05/27 16:32:58 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1988.0 MB to disk (39  times so far)
20/05/27 16:33:03 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (38  times so far)
20/05/27 16:33:08 INFO ShuffleExternalSorter: Thread 146 spilling sort data of 1988.0 MB to disk (41  times so far)
20/05/27 16:33:13 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (13  times so far)
20/05/27 16:33:15 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1988.0 MB to disk (40  times so far)
20/05/27 16:33:20 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (39  times so far)
20/05/27 16:33:26 INFO ShuffleExternalSorter: Thread 146 spilling sort data of 1988.0 MB to disk (42  times so far)
20/05/27 16:33:30 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1988.0 MB to disk (41  times so far)
20/05/27 16:33:31 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (14  times so far)
20/05/27 16:33:36 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (40  times so far)
20/05/27 16:33:46 INFO ShuffleExternalSorter: Thread 146 spilling sort data of 1992.0 MB to disk (43  times so far)
20/05/27 16:33:47 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1988.0 MB to disk (42  times so far)
20/05/27 16:33:51 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (15  times so far)
20/05/27 16:33:54 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (41  times so far)
20/05/27 16:34:03 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1992.0 MB to disk (43  times so far)
20/05/27 16:34:04 INFO ShuffleExternalSorter: Thread 146 spilling sort data of 1992.0 MB to disk (44  times so far)
20/05/27 16:34:08 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (16  times so far)
20/05/27 16:34:14 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (42  times so far)
20/05/27 16:34:16 INFO PythonUDFRunner: Times: total = 774701, boot = 3, init = 10, finish = 774688
20/05/27 16:34:21 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1992.0 MB to disk (44  times so far)
20/05/27 16:34:22 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (17  times so far)
20/05/27 16:34:30 INFO PythonUDFRunner: Times: total = 773372, boot = 2, init = 9, finish = 773361
20/05/27 16:34:32 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1992.0 MB to disk (43  times so far)
20/05/27 16:34:39 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (18  times so far)
20/05/27 16:34:46 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1992.0 MB to disk (44  times so far)
20/05/27 16:34:52 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (19  times so far)
20/05/27 16:35:01 INFO PythonUDFRunner: Times: total = 776905, boot = 3, init = 11, finish = 776891
20/05/27 16:35:05 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (20  times so far)
20/05/27 16:35:19 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (21  times so far)
20/05/27 16:35:35 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (22  times so far)
20/05/27 16:35:52 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (23  times so far)
20/05/27 16:36:10 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (24  times so far)
20/05/27 16:36:29 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (25  times so far)
20/05/27 16:36:47 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (26  times so far)
20/05/27 16:37:06 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (27  times so far)
20/05/27 16:37:25 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (28  times so far)
20/05/27 16:37:44 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (29  times so far)
20/05/27 16:38:03 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (30  times so far)
20/05/27 16:38:22 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (31  times so far)
20/05/27 16:38:41 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (32  times so far)
20/05/27 16:38:59 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (33  times so far)
20/05/27 16:39:19 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (34  times so far)
20/05/27 16:39:39 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (35  times so far)
20/05/27 16:39:58 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (36  times so far)
20/05/27 16:40:18 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (37  times so far)
20/05/27 16:40:38 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (38  times so far)
20/05/27 16:40:57 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (39  times so far)
20/05/27 16:41:16 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (40  times so far)
20/05/27 16:41:35 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (41  times so far)
20/05/27 16:41:55 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (42  times so far)
20/05/27 16:42:19 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1992.0 MB to disk (43  times so far)
20/05/27 16:42:41 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1992.0 MB to disk (44  times so far)
20/05/27 16:42:59 ERROR CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM
20/05/27 16:42:59 INFO DiskBlockManager: Shutdown hook called
20/05/27 16:42:59 INFO ShutdownHookManager: Shutdown hook called
20/05/27 16:42:59 INFO ShutdownHookManager: Deleting directory /mnt/yarn/usercache/hadoop/appcache/application_1590592506722_0001/spark-73af8e3b-f428-47d4-9e13-fed4e19cc2cd

2020-05-27T16:41:16.336+0000: [GC (Allocation Failure) 2020-05-27T16:41:16.336+0000: [ParNew: 272234K->242K(305984K), 0.0094375 secs] 9076907K->8804915K(13188748K), 0.0094895 secs] [Times: user=0.12 sys=0.00, real=0.01 secs] 
2020-05-27T16:41:34.686+0000: [GC (Allocation Failure) 2020-05-27T16:41:34.686+0000: [ParNew: 272242K->257K(305984K), 0.0084179 secs] 9076915K->8804947K(13188748K), 0.0084840 secs] [Times: user=0.09 sys=0.01, real=0.01 secs] 
2020-05-27T16:41:35.145+0000: [GC (Allocation Failure) 2020-05-27T16:41:35.145+0000: [ParNew: 272257K->1382K(305984K), 0.0095541 secs] 9076947K->8806073K(13188748K), 0.0096080 secs] [Times: user=0.12 sys=0.00, real=0.01 secs] 
2020-05-27T16:41:55.077+0000: [GC (Allocation Failure) 2020-05-27T16:41:55.077+0000: [ParNew: 273382K->2683K(305984K), 0.0097177 secs] 9078073K->8807392K(13188748K), 0.0097754 secs] [Times: user=0.12 sys=0.00, real=0.01 secs] 
2020-05-27T16:41:55.513+0000: [GC (Allocation Failure) 2020-05-27T16:41:55.513+0000: [ParNew: 274683K->3025K(305984K), 0.0093345 secs] 9079392K->8807734K(13188748K), 0.0093892 secs] [Times: user=0.12 sys=0.00, real=0.01 secs] 
2020-05-27T16:42:05.481+0000: [GC (Allocation Failure) 2020-05-27T16:42:05.481+0000: [ParNew: 275025K->4102K(305984K), 0.0092950 secs] 9079734K->8808830K(13188748K), 0.0093464 secs] [Times: user=0.12 sys=0.00, real=0.01 secs] 
2020-05-27T16:42:18.711+0000: [GC (Allocation Failure) 2020-05-27T16:42:18.711+0000: [ParNew: 276102K->2972K(305984K), 0.0098928 secs] 9080830K->8807700K(13188748K), 0.0099510 secs] [Times: user=0.13 sys=0.00, real=0.01 secs] 
2020-05-27T16:42:36.493+0000: [GC (Allocation Failure) 2020-05-27T16:42:36.493+0000: [ParNew: 274972K->3852K(305984K), 0.0094324 secs] 9079700K->8808598K(13188748K), 0.0094897 secs] [Times: user=0.11 sys=0.00, real=0.01 secs] 
2020-05-27T16:42:40.880+0000: [GC (Allocation Failure) 2020-05-27T16:42:40.880+0000: [ParNew: 275852K->2568K(305984K), 0.0111794 secs] 9080598K->8807882K(13188748K), 0.0112352 secs] [Times: user=0.13 sys=0.00, real=0.01 secs] 
Heap
 par new generation   total 305984K, used 261139K [0x0000000440000000, 0x0000000454c00000, 0x0000000483990000)
  eden space 272000K,  95% used [0x0000000440000000, 0x000000044fc82cf8, 0x00000004509a0000)
  from space 33984K,   7% used [0x00000004509a0000, 0x0000000450c220a8, 0x0000000452ad0000)
  to   space 33984K,   0% used [0x0000000452ad0000, 0x0000000452ad0000, 0x0000000454c00000)
 concurrent mark-sweep generation total 12882764K, used 8805314K [0x0000000483990000, 0x0000000795e63000, 0x00000007c0000000)
 Metaspace       used 77726K, capacity 79553K, committed 79604K, reserved 1118208K
  class space    used 10289K, capacity 10704K, committed 10740K, reserved 1048576K

Screenshot of executors

What I tried:

  • Changing spark.sql.shuffle.partitions
  • Changing spark.default.parallelism
  • Repartition the dataframe

How can I solve this issue?

Thanks in advance!

Thijs

解决方案

The answer of @lokk3r really helped me in the right direction here. However, there were some other things that I had to do before I was able to run the program without errors. I will share them to help people out that are having similar problems:

  • First of all, I used NGrams as @lokk3r suggested instead of just single characters to avoid extreme data skew inside the MinHashLSH algorithm. When using 4-grams, data looks like:

+------------------------------+-------+------------------------------+------------------------------+------------------------------+
|                          name|     id|                         clean|                   ng_char_lst|           vectorized_char_lst|
+------------------------------+-------+------------------------------+------------------------------+------------------------------+
|     SOCIETE ANONYME DITE SAFT|3632811|     SOCIETE ANONYME DITE SAFT|[  S O C, S O C I, O C I E,...|(1332,[64,75,82,84,121,223,...|
|          MURATA MACHINERY LTD|3633038|              MURATA MACHINERY|[  M U R, M U R A, U R A T,...|(1332,[55,315,388,437,526,5...|
|HEINE OPTOTECHNIK GMBH AND ...|3633318|    HEINE OPTOTECHNIK GMBH AND|[  H E I, H E I N, E I N E,...|(1332,[23,72,216,221,229,34...|
|          FUJIFILM CORPORATION|3632655|                      FUJIFILM|[  F U J, F U J I, U J I F,...|(1332,[157,179,882,1028],[1...|
|          SUNBEAM PRODUCTS INC|3633523|              SUNBEAM PRODUCTS|[  S U N, S U N B, U N B E,...|(1332,[99,137,165,175,187,1...|
| STUDIENGESELLSCHAFT KOHLE MBH|3632732| STUDIENGESELLSCHAFT KOHLE MBH|[  S T U, S T U D, T U D I,...|(1332,[13,14,23,25,43,52,57...|
|REHABILITATION INSTITUTE OF...|3633240|REHABILITATION INSTITUTE OF...|[  R E H, R E H A, E H A B,...|(1332,[20,44,51,118,308,309...|
|           NORDSON CORPORATION|3633275|                       NORDSON|[  N O R, N O R D, O R D S,...|(1332,[45,88,582,1282],[1.0...|
|     ENERGY CONVERSION DEVICES|3632866|     ENERGY CONVERSION DEVICES|[  E N E, E N E R, N E R G,...|(1332,[54,76,81,147,202,224...|
|           MOLI ENERGY LIMITED|3632897|                   MOLI ENERGY|[  M O L, M O L I, O L I  ,...|(1332,[438,495,717,756,1057...|
|    ERGENICS POWER SYSTEMS INC|3632895|        ERGENICS POWER SYSTEMS|[  E R G, E R G E, R G E N,...|(1332,[6,10,18,21,24,35,375...|
|                POWER CELL INC|3632695|                    POWER CELL|[  P O W, P O W E, O W E R,...|(1332,[6,10,18,35,126,169,3...|
|            PEROXIDCHEMIE GMBH|3633256|                 PEROXIDCHEMIE|[  P E R, P E R O, E R O X,...|(1332,[326,450,532,889,1073...|
|            FORD MOTOR COMPANY|3632878|                    FORD MOTOR|[  F O R, F O R D, O R D  ,...|(1332,[156,158,186,200,314,...|
|                  ERGENICS INC|3633037|                      ERGENICS|[  E R G, E R G E, R G E N,...|(1332,[375,642,812,866,1269...|
|              SAFT AMERICA INC|3632573|                  SAFT AMERICA|[  S A F, S A F T, A F T  ,...|(1332,[498,552,1116],[1.0,1...|
|   ALCAN INTERNATIONAL LIMITED|3632598|           ALCAN INTERNATIONAL|[  A L C, A L C A, L C A N,...|(1332,[20,434,528,549,571,7...|
|             KRUPPKOPPERS GMBH|3632698|                  KRUPPKOPPERS|[  K R U, K R U P, R U P P,...|(1332,[664,795,798,1010,114...|
|       HUGHES AIRCRAFT COMPANY|3632752|               HUGHES AIRCRAFT|[  H U G, H U G H, U G H E,...|(1332,[605,632,705,758,807,...|
|AMERICAN TELEPHONE AND TELE...|3632761|AMERICAN TELEPHONE AND TELE...|[  A M E, A M E R, M E R I,...|(1332,[19,86,91,126,128,134...|
+------------------------------+-------+------------------------------+------------------------------+------------------------------+

Note that I added leading and trailing white spaces on the names, to make sure that the order of words in the name does not matter for the NGrams: 'XX YY' has 3-grams 'XX ', 'X Y', ' YY', while 'YY XX' has 3-grams 'YY ', 'Y X', ' XX'. This means that both share 0 out of 6 unique NGrams. If we use leading and trailing white spaces: ' XX YY ' has 3-grams ' XX', 'XX ', 'X Y', ' YY', 'YY ', while ' YY XX ' has 3-grams ' YY', 'YY ', 'Y X', ' XX', 'XX '. This means both share 4 out of 6 unique NGrams. This means that there is much more probability that both records end in the same bucket during MinHashLSH.

  • I experimented with different values of n - the input parameter for NGrams. I found that both n=2 and n=3 still gives so much data skew that a few Spark jobs take way too long while others are done within seconds. So you end up waiting forever before the program continues. I now use n=4, and that still gives substantial skew but it is workable.

  • To reduce the effects of the data skew even more, I used some additional filtering of too (in)frequently occuring NGrams in the CountVectorizer method of Spark. I have set minDF=2 such that it filters out NGrams that are occuring in only a single name. I did this because you cannot match those names based on a NGram that occurs only in one name anyways. In addition, I set maxDF=0.001 such that it filters out NGrams that are occuring in more than 0.1% of the names. This means for approximately 30 million names, that NGrams that occur more frequently than in 30000 names are filtered out. I figured that a too frequently occuring NGram will not provide usefull information on which names can be matched anyways.

  • I reduce the number of unique names (30 million first) to 15 million by filtering out the non-Latin (extended) names. I noticed that (e.g. Arabic and Chinese) characters caused a big skew in the data as well. Since I am not primarily interested in disambiguating these company names, I disregarded them from the data set. I filtered using the following regex match:

re.fullmatch('[\u0020-\u007F\u00A0-\u00FF\u0100-\u017F\u0180-\u024F]+'.encode(), string_to_filter.encode())

  • This is a little bit a straight forward advise, but I ran into some problems by not seing it. Make sure you run a filter on dataset before feeding it to the MinHashLSH algorithm to filter out records that have no NGrams remaining due to the settings minDF and maxDF or just because it is a small name. Obviously this will not work for the MinHashLSH algorithm.

  • Finally, regarding the settings of the spark-submit command and the hardware settings of the EMR cluster, I found that I didn't need a larger cluster as some of the answers on the forums suggested. All the above changes made the program run perfectly on a cluster with the settings as provided in my original post. Reducing the spark.shuffle.partitions, the spark.driver.memory and the spark.driver.maxResultSize substantially improved the running time of the program. The spark-submit I submitted was:

spark-submit --master yarn --conf "spark.executor.instances=40" --conf "spark.default.parallelism=640" --conf "spark.executor.cores=4" --conf "spark.executor.memory=12g" --conf "spark.driver.memory=8g" --conf "spark.driver.maxResultSize=8g" --conf "spark.dynamicAllocation.enabled=false" --packages graphframes:graphframes:0.7.0-spark2.4-s_2.11 run_disambiguation.py

这篇关于所有执行者都死了 MinHash LSH PySpark approxSimilarityJoin 自加入 EMR 集群的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆