高效的字符串后缀检测 [英] Efficient string suffix detection

查看:71
本文介绍了高效的字符串后缀检测的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用一个庞大的数据集上的PySpark,我想在其中基于另一个数据帧中的字符串来过滤数据帧.例如,

I am working with PySpark on a huge dataset, where I want to filter the data frame based on strings in another data frame. For example,

dd = spark.createDataFrame(["something.google.com","something.google.com.somethingelse.ac.uk","something.good.com.cy", "something.good.com.cy.mal.org"], StringType()).toDF('domains')
+----------------------------------------+
|domains                                 |
+----------------------------------------+
|something.google.com                    |
|something.google.com.somethingelse.ac.uk|
|something.good.com.cy                   |
|something.good.com.cy.mal.org           |
+----------------------------------------+  

dd1 =  spark.createDataFrame(["google.com", "good.com.cy"], StringType()).toDF('gooddomains')
+-----------+
|gooddomains|
+-----------+
|google.com |
|good.com.cy|
+-----------+

我认为domainsgooddomains是有效的域名.

I assume that domains and gooddomains are valid domain names.

我想做的是过滤出dd中不以dd1结尾的匹配字符串.因此,在上面的示例中,我想过滤掉第1行和第3行,以

What I want to do is filter out the matching strings in dd that do not end with dd1. So in the above example, I want to filter out row 1 and row 3, to end up with

+----------------------------------------+
|domains                                 |
+----------------------------------------+
|something.google.com.somethingelse.ac.uk|
|something.good.com.cy.mal.org           |
+----------------------------------------+  

我当前的解决方案(如下所示)只能说明最多3个单词"的域.如果要在dd1(即白名单)中添加verygood.co.ac.uk,那么它将失败.

My current solution (as shown below) can only account for domains up to 3 'words'. If I were to add say, verygood.co.ac.uk in dd1 (i.e. whitelist), then It will fail.

def split_filter(x, whitelist):
    splitted1 = x.select(F.split(x['domains'], '\.').alias('splitted_domains'))
    last_two = splitted1.select(F.concat(splitted1.splitted_domains[F.size(splitted1.splitted_domains)-2], \
       F.lit('.'), \
       splitted1.splitted_domains[F.size(splitted1.splitted_domains)-1]).alias('last_two'))
    last_three = splitted1.select(F.concat(splitted1.splitted_domains[F.size(splitted1.splitted_domains)-3], \
       F.lit('.'), \
       splitted1.splitted_domains[F.size(splitted1.splitted_domains)-2], \
       F.lit('.'), \
       splitted1.splitted_domains[F.size(splitted1.splitted_domains)-1]).alias('last_three'))
    x = x.withColumn('id', F.monotonically_increasing_id())
    last_two = last_two.withColumn('id', F.monotonically_increasing_id())
    last_three = last_three.withColumn('id', F.monotonically_increasing_id())
    final_d = x.join(last_two, ['id']).join(last_three, ['id'])
    df1 = final_d.join(whitelist, final_d['last_two'] == whitelist['domains'], how = 'left_anti')
    df2 = df1.join(whitelist, df1['last_three'] == whitelist['domains'], how = 'left_anti')
    return df2.drop('id')

我正在将Spark 2.3.0与Python 2.7.5结合使用.

I am using Spark 2.3.0 with Python 2.7.5.

推荐答案

让我们扩展domains以获得更好的覆盖范围:

Let's extend the domains for slightly better coverage:

domains = spark.createDataFrame([
    "something.google.com",  # OK
    "something.google.com.somethingelse.ac.uk", # NOT OK 
    "something.good.com.cy", # OK 
    "something.good.com.cy.mal.org",  # NOT OK
    "something.bad.com.cy",  # NOT OK
    "omgalsogood.com.cy", # NOT OK
    "good.com.cy",   # OK 
    "sogood.example.com",  # OK Match for shorter redundant, mismatch on longer
    "notsoreal.googleecom" # NOT OK
], "string").toDF('domains')

good_domains =  spark.createDataFrame([
    "google.com", "good.com.cy", "alsogood.com.cy",
    "good.example.com", "example.com"  # Redundant case
], "string").toDF('gooddomains')

现在... 仅使用Spark SQL原语的天真的解决方案是稍微简化当前的方法.既然您已经声明可以安全地假定它们是有效的公共域,那么我们可以定义如下函数:

Now... A naive solution, using only Spark SQL primitives, is to simplify your current approach a bit. Since you've stated that it is safe to assume that these are valid public domains, we can define a function like this:

from pyspark.sql.functions import col, regexp_extract

def suffix(c): 
    return regexp_extract(c, "([^.]+\\.[^.]+$)", 1) 

提取顶级域和第一级子域:

which extract top level domain and first level subdomain:

domains_with_suffix = (domains
    .withColumn("suffix", suffix("domains"))
    .alias("domains"))
good_domains_with_suffix = (good_domains
    .withColumn("suffix", suffix("gooddomains"))
    .alias("good_domains"))

domains_with_suffix.show()

+--------------------+--------------------+
|             domains|              suffix|
+--------------------+--------------------+
|something.google.com|          google.com|
|something.google....|               ac.uk|
|something.good.co...|              com.cy|
|something.good.co...|             mal.org|
|something.bad.com.cy|              com.cy|
|  omgalsogood.com.cy|              com.cy|
|         good.com.cy|              com.cy|
|  sogood.example.com|         example.com|
|notsoreal.googleecom|notsoreal.googleecom|
+--------------------+--------------------+

现在我们可以进行外部联接:

Now we can outer join:

from pyspark.sql.functions import (
    col, concat, lit, monotonically_increasing_id, sum as sum_
)

candidates = (domains_with_suffix
    .join(
        good_domains_with_suffix,
        col("domains.suffix") == col("good_domains.suffix"), 
        "left"))

并过滤结果:

is_good_expr = (
    col("good_domains.suffix").isNotNull() &      # Match on suffix
    (

        # Exact match
        (col("domains") == col("gooddomains")) |
        # Subdomain match
        col("domains").endswith(concat(lit("."), col("gooddomains")))
    )
)

not_good_domains = (candidates
    .groupBy("domains")  # .groupBy("suffix", "domains") - see the discussion
    .agg((sum_(is_good_expr.cast("integer")) > 0).alias("any_good"))
    .filter(~col("any_good"))
    .drop("any_good"))

not_good_domains.show(truncate=False)     

+----------------------------------------+
|domains                                 |
+----------------------------------------+
|omgalsogood.com.cy                      |
|notsoreal.googleecom                    |
|something.good.com.cy.mal.org           |
|something.google.com.somethingelse.ac.uk|
|something.bad.com.cy                    |
+----------------------------------------+

这比直接与LIKE 联接所需的笛卡尔积要好,但对蛮力而言并不令人满意在最坏的情况下,需要进行两次改组-一个用于join(如果good_domains足够小到broadcasted则可以跳过),另一个用于group_by + agg.

This is better than a Cartesian product required for direct join with LIKE, but is unsatisfactory to brute-force and in the worst case scenario requires two shuffles - one for join (this can be skipped if good_domains are small enough to broadcasted), and the another one for group_by + agg.

不幸的是,Spark SQL不允许自定义分区程序仅对两者使用一个改组(但是可以使用复合键在RDD API中),优化器还不够智能,无法优化join(_, "key1").groupBy("key1", _).

Unfortunately Spark SQL doesn't allow custom partitioner to use only one shuffle for both (it is however possible with composite key in RDD API) and optimizer is not smart enough yet, to optimize join(_, "key1") and .groupBy("key1", _).

如果您可以接受某些误报,则可以概率.首先,我们来构建概率计数器(此处使用 bounter ,而得到

If you can accept some false negatives you can go probabilistic. First let's build probabilistic counter (here using bounter with small help from toolz)

from pyspark.sql.functions import concat_ws, reverse, split
from bounter import bounter
from toolz.curried import identity, partition_all

# This is only for testing on toy examples, in practice use more realistic value
size_mb = 20      
chunk_size = 100

def reverse_domain(c):
    return concat_ws(".", reverse(split(c, "\\.")))

def merge(acc, xs):
    acc.update(xs)
    return acc

counter = sc.broadcast((good_domains
    .select(reverse_domain("gooddomains"))
    .rdd.flatMap(identity)
    # Chunk data into groups so we reduce the number of update calls
    .mapPartitions(partition_all(chunk_size))
    # Use tree aggregate to reduce pressure on the driver, 
    # when number of partitions is large*
    # You can use depth parameter for further tuning
    .treeAggregate(bounter(need_iteration=False, size_mb=size_mb), merge, merge)))

接下来定义一个用户定义的函数,像这样

next define an user defined function function like this

from pyspark.sql.functions import pandas_udf, PandasUDFType
from toolz import accumulate

def is_good_counter(counter):
    def is_good_(x):
        return any(
            x in counter.value 
            for x in accumulate(lambda x, y: "{}.{}".format(x, y), x.split("."))
        )

    @pandas_udf("boolean", PandasUDFType.SCALAR)
    def _(xs):
        return xs.apply(is_good_)
    return _

并过滤domains:

domains.filter(
    ~is_good_counter(counter)(reverse_domain("domains"))
).show(truncate=False)

+----------------------------------------+
|domains                                 |
+----------------------------------------+
|something.google.com.somethingelse.ac.uk|
|something.good.com.cy.mal.org           |
|something.bad.com.cy                    |
|omgalsogood.com.cy                      |
|notsoreal.googleecom                    |
+----------------------------------------+

在Scala 中,可以使用bloomFilter

import org.apache.spark.sql.Column
import org.apache.spark.sql.functions._
import org.apache.spark.util.sketch.BloomFilter

def reverseDomain(c: Column) = concat_ws(".", reverse(split(c, "\\.")))

val checker = good_domains.stat.bloomFilter(
  // Adjust values depending on the data
  reverseDomain($"gooddomains"), 1000, 0.001 
)

def isGood(checker: BloomFilter) = udf((s: String) => 
  s.split('.').toStream.scanLeft("") {
    case ("", x) => x
    case (acc, x) => s"${acc}.${x}"
}.tail.exists(checker mightContain _))


domains.filter(!isGood(checker)(reverseDomain($"domains"))).show(false)

+----------------------------------------+
|domains                                 |
+----------------------------------------+
|something.google.com.somethingelse.ac.uk|
|something.good.com.cy.mal.org           |
|something.bad.com.cy                    |
|omgalsogood.com.cy                      |
|notsoreal.googleecom                    |
+----------------------------------------+

,如果需要的话,应该不会很难调用这样的代码来自Python .

由于近似性质,这可能仍不能完全令人满意.如果您需要精确的结果,则可以尝试利用数据的冗余性质,例如,使用 datrie 实现).

This might be still not fully satisfying, due to approximate nature. If you require an exact result you can try to leverage redundant nature of the data, for example with trie (here using datrie implementation).

如果good_domains相对较小,则可以按照与概率变体类似的方式创建单个模型:

If good_domains are relatively small you can create a single model, in a similar way as in the probabilistic variant:

import string
import datrie


def seq_op(acc, x):
    acc[x] = True
    return acc

def comb_op(acc1, acc2):
    acc1.update(acc2)
    return acc1

trie = sc.broadcast((good_domains
    .select(reverse_domain("gooddomains"))
    .rdd.flatMap(identity)
    # string.printable is a bit excessive if you need standard domain
    # and not enough if you allow internationalized domain names.
    # In the latter case you'll have to adjust the `alphabet`
    # or use different implementation of trie.
    .treeAggregate(datrie.Trie(string.printable), seq_op, comb_op)))

定义用户定义的功能:

def is_good_trie(trie):
    def is_good_(x):
        if not x:
            return False
        else:
            return any(
                x == match or x[len(match)] == "."
                for match in trie.value.iter_prefixes(x)
            )

    @pandas_udf("boolean", PandasUDFType.SCALAR)
    def _(xs):
        return xs.apply(is_good_)

    return _

并将其应用于数据:

domains.filter(
    ~is_good_trie(trie)(reverse_domain("domains"))
).show(truncate=False)

+----------------------------------------+
|domains                                 |
+----------------------------------------+
|something.google.com.somethingelse.ac.uk|
|something.good.com.cy.mal.org           |
|something.bad.com.cy                    |
|omgalsogood.com.cy                      |
|notsoreal.googleecom                    |
+----------------------------------------+

此特定方法在所有good_domains都可以压缩为单个trie的假设下工作,但可以轻松扩展以处理不满足此假设的情况.例如,您可以为每个顶级域或后缀(在朴素的解决方案中定义)构建一个trie

This specific approach works under assumption that all good_domains can be compressed into a single trie, but can be easily extended to handle cases where this assumption is not satisfied. For example you can build a single trie per top level domain or suffix (as defined in the naive solution)

(good_domains
    .select(suffix("gooddomains"), reverse_domain("gooddomains"))
    .rdd
    .aggregateByKey(datrie.Trie(string.printable), seq_op, comb_op))

,然后从序列化版本中按需加载模型,或使用RDD操作.

and then, either load models on demand from serialized version, or use RDD operations.

可以根据数据,业务需求(例如近似解决方案中的假负容忍度)和可用资源(驱动程序内存,执行程序内存,suffixes的基数),访问分布式资源来进一步调整这两种非本地方法符合POSIX的分布式文件系统,等等.在DataFramesRDDs上应用它们之间进行选择时,还需要权衡一些选择(内存使用,通信和序列化开销).

The two non-native methods can be further adjusted depending on the data, business requirements (like false negative tolerance in case of approximate solution) and available resources (driver memory, executor memory, cardinality of suffixes, access to distributed POSIX-compliant distributed file system, and so on). There also some trade-offs to consider when choosing between applying these on DataFrames and RDDs (memory usage, communication and serialization overhead).

*请参见了解Spark中的treeReduce()

这篇关于高效的字符串后缀检测的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆