在 Python3中加速数百万个正则表达式替换

我有两份名单:

  • 大约750K “句子”(长字符串)的列表
  • 一个大约20K < strong > “ words”的列表,我想从我的750K 句子中删除

因此,我必须循环通过750K 句子并执行大约20K 的替换,但是只有当我的单词实际上是 < em > “ words” 并且不是一个更大的字符串的一部分时。

我是通过预编译我的 文字来做到这一点的,这样它们的两侧就是 \b的单词边界元字符:

compiled_words = [re.compile(r'\b' + word + r'\b') for word in my20000words]

然后我循环播放我的 “句子”:

import re


for sentence in sentences:
for word in compiled_words:
sentence = re.sub(word, "", sentence)
# put sentence into a growing list

这个嵌套循环正在处理关于 每秒50句的内容,这很好,但是处理我的所有句子仍然需要几个小时。

  • 有没有一种方法可以使用 str.replace方法(我认为这种方法更快) ,但仍然要求替换只发生在 单词界限

  • 或者,有没有加速 re.sub方法的方法?如果单词的长度大于句子的长度,我已经略微提高了速度,跳过了 re.sub,但这并没有多大改善。

我使用的是 Python 3.5.2

68363 次浏览

您可能想尝试的一件事情是预处理句子以编码单词边界。通过划分单词边界,基本上把每个句子变成一个单词列表。

这样应该会更快,因为要处理一个句子,你只需要遍历每个单词并检查它们是否匹配。

目前,正则表达式搜索每次都必须遍历整个字符串,查找单词边界,然后在下一次遍历之前“丢弃”这项工作的结果。

您可以尝试编译单个模式,如 "\b(word1|word2|word3)\b"

因为 re依赖于 C 代码来进行实际匹配,所以节省的成本可能非常大。

正如@pvg 在评论中指出的,它还受益于单次匹配。

如果你的单词不是 regex,Eric 的 回答会更快。

也许 Python 在这里并不是正确的工具,这里是一个有 Unix 工具链的工具

sed G file         |
tr ' ' '\n'        |
grep -vf blacklist |
awk -v RS= -v OFS=' ' '{$1=$1}1'

假设您的黑名单文件经过添加单词边界的预处理。这些步骤是: 将文件转换为双倍行距,将每个句子拆分为每行一个单词,从文件中大量删除黑名单单词,并合并回行。

这个数量级应该至少能跑快一倍。

用于从单词(每行一个单词)预处理黑名单文件

sed 's/.*/\\b&\\b/' words > blacklist

实用的方法

下面描述的解决方案使用大量内存将所有文本存储在同一个字符串中,并降低复杂性。如果 RAM 是一个问题,那么在使用它之前要三思。

使用 join/split技巧,您可以完全避免循环,从而加快算法的速度。

  • 用一个特殊的分界线将一个句子连接起来,这个分界线不包含在这些句子中:
  • merged_sentences = ' * '.join(sentences)
    

  • 使用 |“或”regex 语句编译一个单独的 regex,以便从句子中去除所有需要去除的单词:
  • regex = re.compile(r'\b({})\b'.format('|'.join(words)), re.I) # re.I is a case insensitive flag
    

  • 在单词下面加上已编译的正则表达式,然后用特殊的分隔符将其分割回单独的句子:
  • clean_sentences = re.sub(regex, "", merged_sentences).split(' * ')
    

    表演

    "".join的复杂度是 O (n)。这是相当直观的,但无论如何,有一个来源的简短引用:

    for (i = 0; i < seqlen; i++) {
    [...]
    sz += PyUnicode_GET_LENGTH(item);
    

    因此,对于 join/split,初始方法是 O (单词) + 2 * O (句子) ,这仍然是线性复杂度,而初始方法是2 * O (N2)。


    顺便说一句,不要使用多线程。GIL 会阻止每个操作,因为你的任务是严格的 CPU 绑定,所以 GIL 没有机会被释放,但每个线程会同时发送滴答声,这会导致额外的工作,甚至导致操作无限。

    TLDR

    如果希望获得最快的解决方案,请使用此方法(带设置查找)。对于一个类似于 OP 的数据集,它大约比公认的答案快2000倍。

    如果您坚持使用正则表达式进行查找,请使用 这个基于试验的版本,它仍然比正则表达式联合快1000倍。

    理论

    如果你的句子不是巨大的字符串,那么每秒钟处理超过50个字符串是可行的。

    如果您将所有禁用的单词保存到一个集合中,那么检查该集合中是否包含另一个单词将非常快。

    将逻辑打包到一个函数中,将这个函数作为参数给 re.sub,然后就完成了!

    密码

    import re
    with open('/usr/share/dict/american-english') as wordbook:
    banned_words = set(word.strip().lower() for word in wordbook)
    
    
    
    
    def delete_banned_words(matchobj):
    word = matchobj.group(0)
    if word.lower() in banned_words:
    return ""
    else:
    return word
    
    
    sentences = ["I'm eric. Welcome here!", "Another boring sentence.",
    "GiraffeElephantBoat", "sfgsdg sdwerha aswertwe"] * 250000
    
    
    word_pattern = re.compile('\w+')
    
    
    for sentence in sentences:
    sentence = word_pattern.sub(delete_banned_words, sentence)
    

    转换后的句子是:

    ' .  !
    .
    GiraffeElephantBoat
    sfgsdg sdwerha aswertwe
    

    请注意:

    • 搜索不区分大小写(多亏了 lower())
    • ""替换单词可能会留下两个空格(如代码中所示)
    • 对于 python3,\w+也匹配重音字符(例如 "ångström")。
    • 任何非单词字符(制表符,空格,换行符,标记,...)将保持不变。

    表演

    有一百万个句子,banned_words几乎有100000个单词,脚本运行在不到7。

    相比之下,Liteye 的 回答需要160s 才能完成10000个句子。

    n为总字数,以 m为禁字数,OP 和 Liteye 的代码为 O(n*m)

    相比之下,我的代码应该在 O(n+m)中运行。考虑到句子比禁词多得多,该算法成为 O(n)

    正则表达式联合测试

    使用 '\b(word1|word2|...|wordN)\b'模式的正则表达式搜索的复杂度是多少? 是 O(N)还是 O(1)

    很难理解正则表达式引擎的工作方式,所以让我们编写一个简单的测试。

    此代码将 10**i随机英语单词提取到一个列表中。它创建相应的 regex 联合,并使用不同的单词对其进行测试:

    • “ one”显然不是一个单词(它以 #开头)
    • 一个是名单上的第一个词
    • 一个是清单上的最后一个词
    • 一个看起来像一个词,但实际上不是


    import re
    import timeit
    import random
    
    
    with open('/usr/share/dict/american-english') as wordbook:
    english_words = [word.strip().lower() for word in wordbook]
    random.shuffle(english_words)
    
    
    print("First 10 words :")
    print(english_words[:10])
    
    
    test_words = [
    ("Surely not a word", "#surely_NöTäWORD_so_regex_engine_can_return_fast"),
    ("First word", english_words[0]),
    ("Last word", english_words[-1]),
    ("Almost a word", "couldbeaword")
    ]
    
    
    
    
    def find(word):
    def fun():
    return union.match(word)
    return fun
    
    
    for exp in range(1, 6):
    print("\nUnion of %d words" % 10**exp)
    union = re.compile(r"\b(%s)\b" % '|'.join(english_words[:10**exp]))
    for description, test_word in test_words:
    time = timeit.timeit(find(test_word), number=1000) * 1000
    print("  %-17s : %.1fms" % (description, time))
    

    产出:

    First 10 words :
    ["geritol's", "sunstroke's", 'fib', 'fergus', 'charms', 'canning', 'supervisor', 'fallaciously', "heritage's", 'pastime']
    
    
    Union of 10 words
    Surely not a word : 0.7ms
    First word        : 0.8ms
    Last word         : 0.7ms
    Almost a word     : 0.7ms
    
    
    Union of 100 words
    Surely not a word : 0.7ms
    First word        : 1.1ms
    Last word         : 1.2ms
    Almost a word     : 1.2ms
    
    
    Union of 1000 words
    Surely not a word : 0.7ms
    First word        : 0.8ms
    Last word         : 9.6ms
    Almost a word     : 10.1ms
    
    
    Union of 10000 words
    Surely not a word : 1.4ms
    First word        : 1.8ms
    Last word         : 96.3ms
    Almost a word     : 116.6ms
    
    
    Union of 100000 words
    Surely not a word : 0.7ms
    First word        : 0.8ms
    Last word         : 1227.1ms
    Almost a word     : 1404.1ms
    

    因此,搜索一个具有 '\b(word1|word2|...|wordN)\b'模式的单词看起来有:

    • O(1)最好的情况
    • O(n/2)平均病例,仍然是 O(n)
    • O(n)最坏的情况

    这些结果与一个简单的循环搜索是一致的。

    正则表达式联合的一个更快的替代方法是创建 一个 try 中的 regex 模式

    这里有一个快速简单的解决方案,带有测试装置。

    最佳策略:

    • re.sub("\w+",repl,sentence)搜索单词。
    • “ repl”可以是可调用的。我使用了一个函数来执行 dict 查找,这个 dict 包含要搜索和替换的单词。
    • 这是最简单和最快的解决方案(参见下面示例代码中的函数 replace4)。

    第二佳策略:

    • 这个想法是把句子拆分成单词,使用 re.split,同时保留分隔符重建以后的句子。然后,通过一个简单的 dict 查找完成替换。
    • 实现: (参见下面示例代码中的函数 replace3)。

    例如函数的计时:

    replace1:     0.62 sentences/s
    replace2:     7.43 sentences/s
    replace3: 48498.03 sentences/s
    replace4: 61374.97 sentences/s (...and 240,000/s with PyPy)
    

    还有密码:

    #! /bin/env python3
    # -*- coding: utf-8
    
    
    import time, random, re
    
    
    def replace1( sentences ):
    for n, sentence in enumerate( sentences ):
    for search, repl in patterns:
    sentence = re.sub( "\\b"+search+"\\b", repl, sentence )
    
    
    def replace2( sentences ):
    for n, sentence in enumerate( sentences ):
    for search, repl in patterns_comp:
    sentence = re.sub( search, repl, sentence )
    
    
    def replace3( sentences ):
    pd = patterns_dict.get
    for n, sentence in enumerate( sentences ):
    #~ print( n, sentence )
    # Split the sentence on non-word characters.
    # Note: () in split patterns ensure the non-word characters ARE kept
    # and returned in the result list, so we don't mangle the sentence.
    # If ALL separators are spaces, use string.split instead or something.
    # Example:
    #~ >>> re.split(r"([^\w]+)", "ab céé? . d2eéf")
    #~ ['ab', ' ', 'céé', '? . ', 'd2eéf']
    words = re.split(r"([^\w]+)", sentence)
            
    
    # and... done.
    sentence = "".join( pd(w,w) for w in words )
    
    
    #~ print( n, sentence )
    
    
    def replace4( sentences ):
    pd = patterns_dict.get
    def repl(m):
    w = m.group()
    return pd(w,w)
        
    
    for n, sentence in enumerate( sentences ):
    sentence = re.sub(r"\w+", repl, sentence)
    
    
    
    
    
    
    # Build test set
    test_words = [ ("word%d" % _) for _ in range(50000) ]
    test_sentences = [ " ".join( random.sample( test_words, 10 )) for _ in range(1000) ]
    
    
    # Create search and replace patterns
    patterns = [ (("word%d" % _), ("repl%d" % _)) for _ in range(20000) ]
    patterns_dict = dict( patterns )
    patterns_comp = [ (re.compile("\\b"+search+"\\b"), repl) for search, repl in patterns ]
    
    
    
    
    def test( func, num ):
    t = time.time()
    func( test_sentences[:num] )
    print( "%30s: %.02f sentences/s" % (func.__name__, num/(time.time()-t)))
    
    
    print( "Sentences", len(test_sentences) )
    print( "Words    ", len(test_words) )
    
    
    test( replace1, 1 )
    test( replace2, 10 )
    test( replace3, 1000 )
    test( replace4, 1000 )
    

    编辑: 你也可以忽略小写检查,如果你通过一个小写的句子列表和编辑回复

    def replace4( sentences ):
    pd = patterns_dict.get
    def repl(m):
    w = m.group()
    return pd(w.lower(),w)
    

    这样吧:

    #!/usr/bin/env python3
    
    
    from __future__ import unicode_literals, print_function
    import re
    import time
    import io
    
    
    def replace_sentences_1(sentences, banned_words):
    # faster on CPython, but does not use \b as the word separator
    # so result is slightly different than replace_sentences_2()
    def filter_sentence(sentence):
    words = WORD_SPLITTER.split(sentence)
    words_iter = iter(words)
    for word in words_iter:
    norm_word = word.lower()
    if norm_word not in banned_words:
    yield word
    yield next(words_iter) # yield the word separator
    
    
    WORD_SPLITTER = re.compile(r'(\W+)')
    banned_words = set(banned_words)
    for sentence in sentences:
    yield ''.join(filter_sentence(sentence))
    
    
    
    
    def replace_sentences_2(sentences, banned_words):
    # slower on CPython, uses \b as separator
    def filter_sentence(sentence):
    boundaries = WORD_BOUNDARY.finditer(sentence)
    current_boundary = 0
    while True:
    last_word_boundary, current_boundary = current_boundary, next(boundaries).start()
    yield sentence[last_word_boundary:current_boundary] # yield the separators
    last_word_boundary, current_boundary = current_boundary, next(boundaries).start()
    word = sentence[last_word_boundary:current_boundary]
    norm_word = word.lower()
    if norm_word not in banned_words:
    yield word
    
    
    WORD_BOUNDARY = re.compile(r'\b')
    banned_words = set(banned_words)
    for sentence in sentences:
    yield ''.join(filter_sentence(sentence))
    
    
    
    
    corpus = io.open('corpus2.txt').read()
    banned_words = [l.lower() for l in open('banned_words.txt').read().splitlines()]
    sentences = corpus.split('. ')
    output = io.open('output.txt', 'wb')
    print('number of sentences:', len(sentences))
    start = time.time()
    for sentence in replace_sentences_1(sentences, banned_words):
    output.write(sentence.encode('utf-8'))
    output.write(b' .')
    print('time:', time.time() - start)
    

    这些解决方案根据单词边界进行分割,并查找集合中的每个单词。它们应该比 re.sub 的 word Alternates (Liteyes 的解决方案)更快,因为这些解决方案是 O(n),其中 n 是由于 amortized O(1)集查找而产生的输入大小,而使用 regex Alternates 将导致 regex 引擎必须检查每个字符的单词匹配,而不仅仅是单词边界。我的解决方案特别注意保留原始文本中使用的空格(例如,它不压缩空格,并保留制表符、换行符和其他空格字符) ,但是如果你决定不关心它,从输出中删除它们应该相当简单。

    我在 corus.txt 上测试了一下,这是一个从 Gutenberg Project 下载的多本电子书的串联,而被禁用的 _ words.txt 是从 Ubuntu 的文字列表(/usr/share/dict/american-chinese)中随机挑选的20000个单词。处理862462个句子大约需要30秒(其中一半在 PyPy 上)。我把句子定义为任何被“分开”的东西.

    $ # replace_sentences_1()
    $ python3 filter_words.py
    number of sentences: 862462
    time: 24.46173644065857
    $ pypy filter_words.py
    number of sentences: 862462
    time: 15.9370770454
    
    
    $ # replace_sentences_2()
    $ python3 filter_words.py
    number of sentences: 862462
    time: 40.2742919921875
    $ pypy filter_words.py
    number of sentences: 862462
    time: 13.1190629005
    

    PyPy 尤其从第二种方法中获益更多,而 CPython 在第一种方法中表现得更好。上面的代码应该同时适用于 Python2和3。

    TLDR

    如果希望获得最快的基于正则表达式的解决方案,请使用此方法。对于一个类似于 OP 的数据集,它大约比公认的答案快1000倍。

    如果您不关心正则表达式,那么使用 这个基于集合的版本,它比正则表达式联合快2000倍。

    用 Trie 优化正则表达式

    由于正则表达式引擎 做得不是很好对模式进行了优化,因此 简单的正则表达式联合方法在使用许多禁用单词时会变得很慢。

    可以创建一个包含所有禁用单词的 试试并编写相应的正则表达式。生成的 try 或 regex 并不真正具有人类可读性,但它们确实支持非常快速的查找和匹配。

    例子

    ['foobar', 'foobah', 'fooxar', 'foozap', 'fooza']
    

    Regex union

    该列表被转换为一个 try:

    {
    'f': {
    'o': {
    'o': {
    'x': {
    'a': {
    'r': {
    '': 1
    }
    }
    },
    'b': {
    'a': {
    'r': {
    '': 1
    },
    'h': {
    '': 1
    }
    }
    },
    'z': {
    'a': {
    '': 1,
    'p': {
    '': 1
    }
    }
    }
    }
    }
    }
    }
    

    然后到这个 regex 模式:

    r"\bfoo(?:ba[hr]|xar|zap?)\b"
    

    Regex trie

    其巨大的优势在于,要测试 zoo是否匹配,正则表达式引擎只匹配 需要比较第一个字符(它不匹配) ,而不是 试试这五个词。对于5个单词来说,这是一个预处理过度,但是对于数千个单词来说,它显示了有希望的结果。

    请注意,使用 (?:)非捕获组的原因是:

    密码

    下面是略作修改的 大意,我们可以将其用作 trie.py库:

    import re
    
    
    
    
    class Trie():
    """Regex::Trie in Python. Creates a Trie out of a list of words. The trie can be exported to a Regex pattern.
    The corresponding Regex should match much faster than a simple Regex union."""
    
    
    def __init__(self):
    self.data = {}
    
    
    def add(self, word):
    ref = self.data
    for char in word:
    ref[char] = char in ref and ref[char] or {}
    ref = ref[char]
    ref[''] = 1
    
    
    def dump(self):
    return self.data
    
    
    def quote(self, char):
    return re.escape(char)
    
    
    def _pattern(self, pData):
    data = pData
    if "" in data and len(data.keys()) == 1:
    return None
    
    
    alt = []
    cc = []
    q = 0
    for char in sorted(data.keys()):
    if isinstance(data[char], dict):
    try:
    recurse = self._pattern(data[char])
    alt.append(self.quote(char) + recurse)
    except:
    cc.append(self.quote(char))
    else:
    q = 1
    cconly = not len(alt) > 0
    
    
    if len(cc) > 0:
    if len(cc) == 1:
    alt.append(cc[0])
    else:
    alt.append('[' + ''.join(cc) + ']')
    
    
    if len(alt) == 1:
    result = alt[0]
    else:
    result = "(?:" + "|".join(alt) + ")"
    
    
    if q:
    if cconly:
    result += "?"
    else:
    result = "(?:%s)?" % result
    return result
    
    
    def pattern(self):
    return self._pattern(self.dump())
    

    测试

    下面是一个小测试(与 这个相同) :

    # Encoding: utf-8
    import re
    import timeit
    import random
    from trie import Trie
    
    
    with open('/usr/share/dict/american-english') as wordbook:
    banned_words = [word.strip().lower() for word in wordbook]
    random.shuffle(banned_words)
    
    
    test_words = [
    ("Surely not a word", "#surely_NöTäWORD_so_regex_engine_can_return_fast"),
    ("First word", banned_words[0]),
    ("Last word", banned_words[-1]),
    ("Almost a word", "couldbeaword")
    ]
    
    
    def trie_regex_from_words(words):
    trie = Trie()
    for word in words:
    trie.add(word)
    return re.compile(r"\b" + trie.pattern() + r"\b", re.IGNORECASE)
    
    
    def find(word):
    def fun():
    return union.match(word)
    return fun
    
    
    for exp in range(1, 6):
    print("\nTrieRegex of %d words" % 10**exp)
    union = trie_regex_from_words(banned_words[:10**exp])
    for description, test_word in test_words:
    time = timeit.timeit(find(test_word), number=1000) * 1000
    print("  %s : %.1fms" % (description, time))
    

    产出:

    TrieRegex of 10 words
    Surely not a word : 0.3ms
    First word : 0.4ms
    Last word : 0.5ms
    Almost a word : 0.5ms
    
    
    TrieRegex of 100 words
    Surely not a word : 0.3ms
    First word : 0.5ms
    Last word : 0.9ms
    Almost a word : 0.6ms
    
    
    TrieRegex of 1000 words
    Surely not a word : 0.3ms
    First word : 0.7ms
    Last word : 0.9ms
    Almost a word : 1.1ms
    
    
    TrieRegex of 10000 words
    Surely not a word : 0.1ms
    First word : 1.0ms
    Last word : 1.2ms
    Almost a word : 1.2ms
    
    
    TrieRegex of 100000 words
    Surely not a word : 0.3ms
    First word : 1.2ms
    Last word : 0.9ms
    Almost a word : 1.6ms
    

    对于信息,正则表达式的开头是这样的:

    | s (? : (? : (? : st | r) | ly | ness (? :’s) ?) ? | issa (? : (? :’s | e [ ds ]) ? | issa (? : (? : s | e | ing | s)) ? | en (? : (? : (? : s | ing | s)) ? | t (? : (? : s | s)) | ing | s) ? | i (? : (? : s | ly | st?)) ? | i (? : on (? :’s) ? | sm (? :’s) ?) | i (? : on (? :’s) ? | sm (? :’s) ? | ing)) | i (? : b (? : (? : e (? : n (? ? ? ?)) ? | t (? : (? : s | s) ? | d) | ing | s) ? | pti... 。

    它真的不可读,但是对于100000个禁止单词的列表,这个 Trieregex 比简单的 regex 联合快1000倍!

    下面是用 Trie-python-Graphviz和 Graphviz twopi导出的完整测试的示意图:

    Enter image description here

    把你所有的句子连接成一个文件。使用 Aho-Corasick 算法(这里有一个)的任何实现来定位你所有的“坏”字。遍历文件,替换每个坏单词,更新后面找到的单词的偏移量等等。