高效的字符串后缀检测 [英] Efficient string suffix detection

查看:26
本文介绍了高效的字符串后缀检测的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 PySpark 处理一个庞大的数据集,我想根据另一个数据框中的字符串过滤数据框.例如,

dd = spark.createDataFrame(["something.google.com","something.google.com.somethingelse.ac.uk","something.good.com.cy", "something.good.com.cy.mal.org"], StringType()).toDF('域')+------------------------------------------------+|域 |+------------------------------------------------+|something.google.com ||something.google.com.somethingelse.ac.uk||something.good.com.cy ||something.good.com.cy.mal.org |+------------------------------------------------+dd1 = spark.createDataFrame(["google.com", "good.com.cy"], StringType()).toDF('gooddomains')+-----------+|好域名|+-----------+|google.com ||good.com.cy|+-----------+

我假设 domainsgooddomains 是有效的域名.

我想要做的是过滤掉dd中不以dd1结尾的匹配字符串.所以在上面的例子中,我想过滤掉第 1 行和第 3 行,以

结尾

+----------------------------------------+|域 |+------------------------------------------------+|something.google.com.somethingelse.ac.uk||something.good.com.cy.mal.org |+------------------------------------------------+

我当前的解决方案(如下所示)最多只能处理 3 个单词"的域.如果我在 dd1(即白名单)中添加说 verygood.co.ac.uk,那么它会失败.

def split_filter(x, whitelist):splitted1 = x.select(F.split(x['domains'], '\.').alias('splitted_domains'))last_two = splitted1.select(F.concat(splitted1.splitted_domains[F.size(splitted1.splitted_domains)-2], \F.lit('.'), \splitted1.splitted_domains[F.size(splitted1.splitted_domains)-1]).alias('last_two'))last_three = splitted1.select(F.concat(splitted1.splitted_domains[F.size(splitted1.splitted_domains)-3], \F.lit('.'), \splitted1.splitted_domains[F.size(splitted1.splitted_domains)-2], \F.lit('.'), \splitted1.splitted_domains[F.size(splitted1.splitted_domains)-1]).alias('last_three'))x = x.withColumn('id', F.monotonically_increasing_id())last_two = last_two.withColumn('id', F.monotonically_increasing_id())last_three = last_three.withColumn('id', F.monotonically_increasing_id())final_d = x.join(last_two, ['id']).join(last_three, ['id'])df1 = final_d.join(whitelist, final_d['last_two'] == whitelist['domains'], how = 'left_anti')df2 = df1.join(whitelist, df1['last_three'] == whitelist['domains'], how = 'left_anti')返回 df2.drop('id')

我使用的是 Spark 2.3.0 和 Python 2.7.5.

解决方案

让我们扩展 domains 以获得更好的覆盖范围:

domains = spark.createDataFrame(["something.google.com", # 好的"something.google.com.somethingelse.ac.uk", # 不行"something.good.com.cy", # 好的"something.good.com.cy.mal.org", # 不正常"something.bad.com.cy", # 不正常"omgalsogood.com.cy", # 不正常"good.com.cy", # 好的"sogood.example.com", # OK 匹配较短的冗余,较长的不匹配"notsoreal.googleecom" # 不正常], "字符串").toDF('域')good_domains = spark.createDataFrame(["google.com", "good.com.cy", "alsogood.com.cy","good.example.com", "example.com" # 冗余案例], "字符串").toDF('gooddomains')

现在... 一个简单的解决方案,仅使用 Spark SQL 原语,是稍微简化您当前的方法.由于您已经声明可以安全地假设这些是有效的公共域,因此我们可以定义这样的函数:

from pyspark.sql.functions import col, regexp_extract定义后缀(c):返回 regexp_extract(c, "([^.]+\\.[^.]+$)", 1)

提取顶级域和一级子域:

domains_with_suffix = (domains.withColumn("suffix", suffix("domains")).alias("域"))good_domains_with_suffix = (good_domains.withColumn("suffix", suffix("gooddomains")).alias("good_domains"))domain_with_suffix.show()

+--------------------+--------------------+|域|后缀|+--------------------+--------------------+|something.google.com|google.com||something.google....|ac.uk||something.good.co...|com.cy||something.good.co...|Mal.org||something.bad.com.cy|com.cy||omgalsogood.com.cy|com.cy||good.com.cy|com.cy||sogood.example.com|例子.com||notsoreal.googleecom|notsoreal.googleecom|+--------------------+--------------------+

现在我们可以外连接了:

from pyspark.sql.functions import (col, concat, lit, monotonically_increasing_id, sum as sum_)候选人 = (domains_with_suffix.加入(good_domains_with_suffix,col("domains.suffix") == col("good_domains.suffix"),剩下"))

并过滤结果:

is_good_expr = (col("good_domains.suffix").isNotNull() &# 匹配后缀(# 完全符合(col("domains") == col("gooddomains")) |# 子域匹配col("domains").endswith(concat(lit("."), col("gooddomains")))))not_good_domains =(候选人.groupBy("domains") # .groupBy("suffix", "domains") - 见讨论.agg((sum_(is_good_expr.cast("integer")) > 0).alias("any_good")).filter(~col("any_good")).drop("any_good"))not_good_domains.show(truncate=False)

+----------------------------------------+|域 |+------------------------------------------------+|omgalsogood.com.cy ||notsoreal.googleecom ||something.good.com.cy.mal.org ||something.google.com.somethingelse.ac.uk||something.bad.com.cy |+------------------------------------------------+

这比 LIKE 直接连接所需的笛卡尔积更好,但是对蛮力不满意,在最坏的情况下需要两次洗牌 - 一次用于 join(如果 good_domains 小到可以broadcasted,则可以跳过此步骤>),另一个用于 group_by + agg.

不幸的是,Spark SQL 不允许自定义分区器对两者仅使用一个 shuffle(但是可以使用 复合键 在 RDD API 中)和优化器还不够智能,优化 join(_, "key1").groupBy("key1", _).>

如果你能接受一些假阴性,你就可以概率论.首先让我们构建概率计数器(这里使用 bountertoolz)

from pyspark.sql.functions import concat_ws, reverse, split从 bounter 导入 bounter从 toolz.curried 导入身份,partition_all# 这仅用于测试玩具样例,实际中使用更现实的值大小_mb = 20块大小 = 100def reverse_domain(c):返回 concat_ws(".", reverse(split(c, "\\.")))def合并(acc, xs):acc.update(xs)返回acc计数器 = sc.broadcast((good_domains.select(reverse_domain("gooddomains")).rdd.flatMap(身份)# 将数据分组以减少更新调用的次数.mapPartitions(partition_all(chunk_size))# 使用树聚合来减轻驱动程序的压力,# 当分区数很大时*# 您可以使用深度参数进行进一步调整.treeAggregate(bounter(need_iteration=False, size_mb=size_mb), 合并, 合并)))

接下来定义一个像这样的用户定义函数

from pyspark.sql.functions import pandas_udf, PandasUDFType从 toolz 导入累积def is_good_counter(计数器):def is_good_(x):返回任何(计数器值中的 x对于累积中的 x(lambda x, y: "{}.{}".format(x, y), x.split(".")))@pandas_udf("boolean", PandasUDFType.SCALAR)定义_(xs):返回 xs.apply(is_good_)返回 _

并过滤:

domains.filter(~is_good_counter(counter)(reverse_domain("domains"))).show(截断=假)

+----------------------------------------+|域 |+------------------------------------------------+|something.google.com.somethingelse.ac.uk||something.good.com.cy.mal.org ||something.bad.com.cy ||omgalsogood.com.cy ||notsoreal.googleecom |+------------------------------------------------+

在 Scala 中,这可以通过 bloomFilter

完成

import org.apache.spark.sql.Column导入 org.apache.spark.sql.functions._导入 org.apache.spark.util.sketch.BloomFilterdef reverseDomain(c: Column) = concat_ws(".", reverse(split(c, "\\.")))val 检查器 = good_domains.stat.bloomFilter(//根据数据调整值reverseDomain($"gooddomains"), 1000, 0.001)def isGood(checker: BloomFilter) = udf((s: String) =>s.split('.').toStream.scanLeft("") {case ("", x) =>Xcase (acc, x) =>s"${acc}.${x}"}.tail.exists(checker mayContain _))domain.filter(!isGood(checker)(reverseDomain($"domains"))).show(false)

+----------------------------------------+|域 |+------------------------------------------------+|something.google.com.somethingelse.ac.uk||something.good.com.cy.mal.org ||something.bad.com.cy ||omgalsogood.com.cy ||notsoreal.googleecom |+------------------------------------------------+

如果需要,调用这样的代码应该不难来自 Python.

由于近似性质,这可能仍然不能完全令人满意.如果您需要准确的结果,您可以尝试利用数据的冗余性质,例如使用 trie(此处使用 datrie 实现).

如果 good_domains 相对较小,您可以创建单个模型,方法与概率变体类似:

导入字符串进口数据def seq_op(acc, x):acc[x] = 真返回accdef comb_op(acc1, acc2):acc1.update(acc2)返回acc1trie = sc.broadcast((good_domains.select(reverse_domain("gooddomains")).rdd.flatMap(身份)# string.printable 如果你需要标准域就有点过分了# 如果您允许国际化域名,这还不够.# 在后一种情况下,你必须调整 `alphabet`# 或使用不同的 trie 实现..treeAggregate(datrie.Trie(string.printable), seq_op, comb_op)))

定义用户定义函数:

def is_good_trie(trie):def is_good_(x):如果不是 x:返回错误别的:返回任何(x == 匹配或 x[len(match)] == "."用于匹配 trie.value.iter_prefixes(x))@pandas_udf("boolean", PandasUDFType.SCALAR)定义_(xs):返回 xs.apply(is_good_)返回 _

并将其应用于数据:

domains.filter(~is_good_trie(trie)(reverse_domain("domains"))).show(截断=假)

+----------------------------------------+|域 |+------------------------------------------------+|something.google.com.somethingelse.ac.uk||something.good.com.cy.mal.org ||something.bad.com.cy ||omgalsogood.com.cy ||notsoreal.googleecom |+------------------------------------------------+

这种特定方法的工作假设是所有 good_domains 都可以压缩到单个 trie 中,但可以轻松扩展以处理不满足此假设的情况.例如,您可以为每个顶级域或后缀(如幼稚解决方案中所定义)构建一个单独的特里树

(good_domains.select(suffix("gooddomains"), reverse_domain("gooddomains")).rdd.aggregateByKey(datrie.Trie(string.printable), seq_op, comb_op))

然后,要么从序列化版本按需加载模型,要么使用RDD操作.

这两种非原生方法可以根据数据、业务需求(如近似解的误报容忍度)和可用资源(驱动程序内存、执行程序内存、后缀的基数,访问分布式 POSIX 兼容的分布式文件系统,等等).在将这些应用于 DataFramesRDD(内存使用、通信和序列化开销)之间进行选择时,还需要考虑一些权衡.

<小时>

* 参见了解 Spark 中的 treeReduce()

I am working with PySpark on a huge dataset, where I want to filter the data frame based on strings in another data frame. For example,

dd = spark.createDataFrame(["something.google.com","something.google.com.somethingelse.ac.uk","something.good.com.cy", "something.good.com.cy.mal.org"], StringType()).toDF('domains')
+----------------------------------------+
|domains                                 |
+----------------------------------------+
|something.google.com                    |
|something.google.com.somethingelse.ac.uk|
|something.good.com.cy                   |
|something.good.com.cy.mal.org           |
+----------------------------------------+  

dd1 =  spark.createDataFrame(["google.com", "good.com.cy"], StringType()).toDF('gooddomains')
+-----------+
|gooddomains|
+-----------+
|google.com |
|good.com.cy|
+-----------+

I assume that domains and gooddomains are valid domain names.

What I want to do is filter out the matching strings in dd that do not end with dd1. So in the above example, I want to filter out row 1 and row 3, to end up with

+----------------------------------------+
|domains                                 |
+----------------------------------------+
|something.google.com.somethingelse.ac.uk|
|something.good.com.cy.mal.org           |
+----------------------------------------+  

My current solution (as shown below) can only account for domains up to 3 'words'. If I were to add say, verygood.co.ac.uk in dd1 (i.e. whitelist), then It will fail.

def split_filter(x, whitelist):
    splitted1 = x.select(F.split(x['domains'], '\.').alias('splitted_domains'))
    last_two = splitted1.select(F.concat(splitted1.splitted_domains[F.size(splitted1.splitted_domains)-2], \
       F.lit('.'), \
       splitted1.splitted_domains[F.size(splitted1.splitted_domains)-1]).alias('last_two'))
    last_three = splitted1.select(F.concat(splitted1.splitted_domains[F.size(splitted1.splitted_domains)-3], \
       F.lit('.'), \
       splitted1.splitted_domains[F.size(splitted1.splitted_domains)-2], \
       F.lit('.'), \
       splitted1.splitted_domains[F.size(splitted1.splitted_domains)-1]).alias('last_three'))
    x = x.withColumn('id', F.monotonically_increasing_id())
    last_two = last_two.withColumn('id', F.monotonically_increasing_id())
    last_three = last_three.withColumn('id', F.monotonically_increasing_id())
    final_d = x.join(last_two, ['id']).join(last_three, ['id'])
    df1 = final_d.join(whitelist, final_d['last_two'] == whitelist['domains'], how = 'left_anti')
    df2 = df1.join(whitelist, df1['last_three'] == whitelist['domains'], how = 'left_anti')
    return df2.drop('id')

I am using Spark 2.3.0 with Python 2.7.5.

解决方案

Let's extend the domains for slightly better coverage:

domains = spark.createDataFrame([
    "something.google.com",  # OK
    "something.google.com.somethingelse.ac.uk", # NOT OK 
    "something.good.com.cy", # OK 
    "something.good.com.cy.mal.org",  # NOT OK
    "something.bad.com.cy",  # NOT OK
    "omgalsogood.com.cy", # NOT OK
    "good.com.cy",   # OK 
    "sogood.example.com",  # OK Match for shorter redundant, mismatch on longer
    "notsoreal.googleecom" # NOT OK
], "string").toDF('domains')

good_domains =  spark.createDataFrame([
    "google.com", "good.com.cy", "alsogood.com.cy",
    "good.example.com", "example.com"  # Redundant case
], "string").toDF('gooddomains')

Now... A naive solution, using only Spark SQL primitives, is to simplify your current approach a bit. Since you've stated that it is safe to assume that these are valid public domains, we can define a function like this:

from pyspark.sql.functions import col, regexp_extract

def suffix(c): 
    return regexp_extract(c, "([^.]+\\.[^.]+$)", 1) 

which extract top level domain and first level subdomain:

domains_with_suffix = (domains
    .withColumn("suffix", suffix("domains"))
    .alias("domains"))
good_domains_with_suffix = (good_domains
    .withColumn("suffix", suffix("gooddomains"))
    .alias("good_domains"))

domains_with_suffix.show()

+--------------------+--------------------+
|             domains|              suffix|
+--------------------+--------------------+
|something.google.com|          google.com|
|something.google....|               ac.uk|
|something.good.co...|              com.cy|
|something.good.co...|             mal.org|
|something.bad.com.cy|              com.cy|
|  omgalsogood.com.cy|              com.cy|
|         good.com.cy|              com.cy|
|  sogood.example.com|         example.com|
|notsoreal.googleecom|notsoreal.googleecom|
+--------------------+--------------------+

Now we can outer join:

from pyspark.sql.functions import (
    col, concat, lit, monotonically_increasing_id, sum as sum_
)

candidates = (domains_with_suffix
    .join(
        good_domains_with_suffix,
        col("domains.suffix") == col("good_domains.suffix"), 
        "left"))

and filter the result:

is_good_expr = (
    col("good_domains.suffix").isNotNull() &      # Match on suffix
    (

        # Exact match
        (col("domains") == col("gooddomains")) |
        # Subdomain match
        col("domains").endswith(concat(lit("."), col("gooddomains")))
    )
)

not_good_domains = (candidates
    .groupBy("domains")  # .groupBy("suffix", "domains") - see the discussion
    .agg((sum_(is_good_expr.cast("integer")) > 0).alias("any_good"))
    .filter(~col("any_good"))
    .drop("any_good"))

not_good_domains.show(truncate=False)     

+----------------------------------------+
|domains                                 |
+----------------------------------------+
|omgalsogood.com.cy                      |
|notsoreal.googleecom                    |
|something.good.com.cy.mal.org           |
|something.google.com.somethingelse.ac.uk|
|something.bad.com.cy                    |
+----------------------------------------+

This is better than a Cartesian product required for direct join with LIKE, but is unsatisfactory to brute-force and in the worst case scenario requires two shuffles - one for join (this can be skipped if good_domains are small enough to broadcasted), and the another one for group_by + agg.

Unfortunately Spark SQL doesn't allow custom partitioner to use only one shuffle for both (it is however possible with composite key in RDD API) and optimizer is not smart enough yet, to optimize join(_, "key1") and .groupBy("key1", _).

If you can accept some false negatives you can go probabilistic. First let's build probabilistic counter (here using bounter with small help from toolz)

from pyspark.sql.functions import concat_ws, reverse, split
from bounter import bounter
from toolz.curried import identity, partition_all

# This is only for testing on toy examples, in practice use more realistic value
size_mb = 20      
chunk_size = 100

def reverse_domain(c):
    return concat_ws(".", reverse(split(c, "\\.")))

def merge(acc, xs):
    acc.update(xs)
    return acc

counter = sc.broadcast((good_domains
    .select(reverse_domain("gooddomains"))
    .rdd.flatMap(identity)
    # Chunk data into groups so we reduce the number of update calls
    .mapPartitions(partition_all(chunk_size))
    # Use tree aggregate to reduce pressure on the driver, 
    # when number of partitions is large*
    # You can use depth parameter for further tuning
    .treeAggregate(bounter(need_iteration=False, size_mb=size_mb), merge, merge)))

next define an user defined function function like this

from pyspark.sql.functions import pandas_udf, PandasUDFType
from toolz import accumulate

def is_good_counter(counter):
    def is_good_(x):
        return any(
            x in counter.value 
            for x in accumulate(lambda x, y: "{}.{}".format(x, y), x.split("."))
        )

    @pandas_udf("boolean", PandasUDFType.SCALAR)
    def _(xs):
        return xs.apply(is_good_)
    return _

and filter the domains:

domains.filter(
    ~is_good_counter(counter)(reverse_domain("domains"))
).show(truncate=False)

+----------------------------------------+
|domains                                 |
+----------------------------------------+
|something.google.com.somethingelse.ac.uk|
|something.good.com.cy.mal.org           |
|something.bad.com.cy                    |
|omgalsogood.com.cy                      |
|notsoreal.googleecom                    |
+----------------------------------------+

In Scala this could be done with bloomFilter

import org.apache.spark.sql.Column
import org.apache.spark.sql.functions._
import org.apache.spark.util.sketch.BloomFilter

def reverseDomain(c: Column) = concat_ws(".", reverse(split(c, "\\.")))

val checker = good_domains.stat.bloomFilter(
  // Adjust values depending on the data
  reverseDomain($"gooddomains"), 1000, 0.001 
)

def isGood(checker: BloomFilter) = udf((s: String) => 
  s.split('.').toStream.scanLeft("") {
    case ("", x) => x
    case (acc, x) => s"${acc}.${x}"
}.tail.exists(checker mightContain _))


domains.filter(!isGood(checker)(reverseDomain($"domains"))).show(false)

+----------------------------------------+
|domains                                 |
+----------------------------------------+
|something.google.com.somethingelse.ac.uk|
|something.good.com.cy.mal.org           |
|something.bad.com.cy                    |
|omgalsogood.com.cy                      |
|notsoreal.googleecom                    |
+----------------------------------------+

and if needed, shouldn't be hard to call such code from Python.

This might be still not fully satisfying, due to approximate nature. If you require an exact result you can try to leverage redundant nature of the data, for example with trie (here using datrie implementation).

If good_domains are relatively small you can create a single model, in a similar way as in the probabilistic variant:

import string
import datrie


def seq_op(acc, x):
    acc[x] = True
    return acc

def comb_op(acc1, acc2):
    acc1.update(acc2)
    return acc1

trie = sc.broadcast((good_domains
    .select(reverse_domain("gooddomains"))
    .rdd.flatMap(identity)
    # string.printable is a bit excessive if you need standard domain
    # and not enough if you allow internationalized domain names.
    # In the latter case you'll have to adjust the `alphabet`
    # or use different implementation of trie.
    .treeAggregate(datrie.Trie(string.printable), seq_op, comb_op)))

define user defined function:

def is_good_trie(trie):
    def is_good_(x):
        if not x:
            return False
        else:
            return any(
                x == match or x[len(match)] == "."
                for match in trie.value.iter_prefixes(x)
            )

    @pandas_udf("boolean", PandasUDFType.SCALAR)
    def _(xs):
        return xs.apply(is_good_)

    return _

and apply it to the data:

domains.filter(
    ~is_good_trie(trie)(reverse_domain("domains"))
).show(truncate=False)

+----------------------------------------+
|domains                                 |
+----------------------------------------+
|something.google.com.somethingelse.ac.uk|
|something.good.com.cy.mal.org           |
|something.bad.com.cy                    |
|omgalsogood.com.cy                      |
|notsoreal.googleecom                    |
+----------------------------------------+

This specific approach works under assumption that all good_domains can be compressed into a single trie, but can be easily extended to handle cases where this assumption is not satisfied. For example you can build a single trie per top level domain or suffix (as defined in the naive solution)

(good_domains
    .select(suffix("gooddomains"), reverse_domain("gooddomains"))
    .rdd
    .aggregateByKey(datrie.Trie(string.printable), seq_op, comb_op))

and then, either load models on demand from serialized version, or use RDD operations.

The two non-native methods can be further adjusted depending on the data, business requirements (like false negative tolerance in case of approximate solution) and available resources (driver memory, executor memory, cardinality of suffixes, access to distributed POSIX-compliant distributed file system, and so on). There also some trade-offs to consider when choosing between applying these on DataFrames and RDDs (memory usage, communication and serialization overhead).


* See Understanding treeReduce() in Spark

这篇关于高效的字符串后缀检测的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆