3分鐘實踐:Python語言在Serverless架構下實現敏感詞過濾

前言

隨著各種社交論壇等的日益火爆,敏感詞過濾逐漸成了非常重要的也是值得重視的功能。那麼在Serverless架構下,透過Python語言,敏感詞過濾又有那些新的實現呢?我們能否是用最簡單的方法,實現一個敏感詞過濾的API呢?

敏感過濾入門

Replace方法

如果說敏感詞過濾,其實不如說是文字的替換,以Python為例,說到詞彙替換,不得不想到replace,我們可以準備一個敏感詞庫,然後透過replace進行敏感詞替換:

def check_filter(keywords, text):    for eve in keywords:        text = text。replace(eve, “***”)    return textkeywords = (“關鍵詞1”, “關鍵詞2”, “關鍵詞3”)content = “這是一個關鍵詞替換的例子,這裡涉及到了關鍵詞1還有關鍵詞2,最後還會有關鍵詞3。”print(check_filter(keywords, content))

但是動動腦大家就會發現,這種做法在文字和敏感詞庫非常龐大的前提下,會有很嚴重的效能問題。例如我將程式碼進行修改,進行基本的效能測試:

import timedef check_filter(keywords, text):    for eve in keywords:        text = text。replace(eve, “***”)    return textkeywords =[ “關鍵詞” + str(i) for i in range(0,10000)]startTime = time。time()content = “這是一個關鍵詞替換的例子,這裡涉及到了關鍵詞1還有關鍵詞2,最後還會有關鍵詞3。” * 10000check_filter(keywords, content)print(time。time()-startTime)

此時的輸出結果是:1。235044002532959,可以看到效能非常差。

正則表達方法

與其用replace,還不如透過正則表達re。sub來的更加快速。

def check_filter(keywords, text):     return re。sub(“|”。join(keywords), “***”, text)keywords = (“關鍵詞1”, “關鍵詞2”, “關鍵詞3”)content = “這是一個關鍵詞替換的例子,這裡涉及到了關鍵詞1還有關鍵詞2,最後還會有關鍵詞3。” print(check_filter(keywords, content))

我們同樣增加效能測試,按照上面的方法進行改造測試,輸出結果是0。47878289222717285。透過這樣的例子,我們可以發現,這種做法在效能層面變高了很多,這少可以說提升了幾倍,如果隨著詞庫的增加,這個倍數會成倍增加。

DFA過濾敏感詞

這種方法相對來說效率會更高一些。例如,我們認為壞人,壞孩子,壞蛋是敏感詞,則他們的樹關係可以表達:

3分鐘實踐:Python語言在Serverless架構下實現敏感詞過濾

用DFA字典來表示:

{    ‘壞’: {        ‘蛋’: {            ‘\x00’: 0        },         ‘人’: {            ‘\x00’: 0        },         ‘孩’: {            ‘子’: {                ‘\x00’: 0            }        }    }}

使用這種樹表示問題最大的好處就是可以降低檢索次數,提高檢索效率,基本程式碼實現:

import timeclass DFAFilter(object):    def __init__(self):        self。keyword_chains = {}  # 關鍵詞連結串列        self。delimit = ‘\x00’  # 限定    def add(self, keyword):        keyword = keyword。lower()  # 關鍵詞英文變為小寫        chars = keyword。strip()  # 關鍵字去除首尾空格和換行        if not chars:  # 如果關鍵詞為空直接返回            return        level = self。keyword_chains        # 遍歷關鍵字的每個字        for i in range(len(chars)):            # 如果這個字已經存在字元鏈的key中就進入其子字典            if chars[i] in level:                level = level[chars[i]]            else:                if not isinstance(level, dict):                    break                for j in range(i, len(chars)):                    level[chars[j]] = {}                    last_level, last_char = level, chars[j]                    level = level[chars[j]]                last_level[last_char] = {self。delimit: 0}                break        if i == len(chars) - 1:            level[self。delimit] = 0    def parse(self, path):        with open(path, encoding=‘utf-8’) as f:            for keyword in f:                self。add(str(keyword)。strip())    def filter(self, message, repl=“*”):        message = message。lower()        ret = []        start = 0        while start < len(message):            level = self。keyword_chains            step_ins = 0            for char in message[start:]:                if char in level:                    step_ins += 1                    if self。delimit not in level[char]:                        level = level[char]                    else:                        ret。append(repl * step_ins)                        start += step_ins - 1                        break                else:                    ret。append(message[start])                    break            else:                ret。append(message[start])            start += 1        return ‘’。join(ret)startTime = time。time()gfw = DFAFilter()gfw。parse( “。/sensitive_words。txt”)content = “這是一個關鍵詞替換的例子,這裡涉及到了關鍵詞1還有關鍵詞2,最後還會有關鍵詞3。” * 10000result = gfw。filter(content)print(time。time()-startTime)

這裡我們的字典庫是:

with open(“。/sensitive_words”, ‘w’) as f:    f。write(“\n”。join( [ “關鍵詞” + str(i) for i in range(0,10000)]))

執行結果:

4。9114227294921875e-05

可以看到效能進一步提升。

AC自動機過濾敏感詞演算法

接下來,我們來看一下 AC自動機過濾敏感詞演算法:

AC自動機:一個常見的例子就是給出n個單詞,再給出一段包含m個字元的文章,讓你找出有多少個單詞在文章裡出現過。

簡單地講,AC自動機就是字典樹+kmp演算法+失配指標

程式碼實現:

# AC自動機演算法class node(object):    def __init__(self):        self。next = {}        self。fail = None        self。isWord = False        self。word = “”class ac_automation(object):    def __init__(self):        self。root = node()    # 新增敏感詞函式    def addword(self, word):        temp_root = self。root        for char in word:            if char not in temp_root。next:                temp_root。next[char] = node()            temp_root = temp_root。next[char]        temp_root。isWord = True        temp_root。word = word    # 失敗指標函式    def make_fail(self):        temp_que = []        temp_que。append(self。root)        while len(temp_que) != 0:            temp = temp_que。pop(0)            p = None            for key, value in temp。next。item():                if temp == self。root:                    temp。next[key]。fail = self。root                else:                    p = temp。fail                    while p is not None:                        if key in p。next:                            temp。next[key]。fail = p。fail                            break                        p = p。fail                    if p is None:                        temp。next[key]。fail = self。root                temp_que。append(temp。next[key])    # 查詢敏感詞函式    def search(self, content):        p = self。root        result = []        currentposition = 0        while currentposition < len(content):            word = content[currentposition]            while word in p。next == False and p != self。root:                p = p。fail            if word in p。next:                p = p。next[word]            else:                p = self。root            if p。isWord:                result。append(p。word)                p = self。root            currentposition += 1        return result    # 載入敏感詞庫函式    def parse(self, path):        with open(path, encoding=‘utf-8’) as f:            for keyword in f:                self。addword(str(keyword)。strip())    # 敏感詞替換函式    def words_replace(self, text):        “”“        :param ah: AC自動機        :param text: 文字        :return: 過濾敏感詞之後的文字        ”“”        result = list(set(self。search(text)))        for x in result:            m = text。replace(x, ‘*’ * len(x))            text = m        return textah = ac_automation()path = ‘。/sensitive_words’ah。parse(path)content = “這是一個關鍵詞替換的例子,這裡涉及到了關鍵詞1還有關鍵詞2,最後還會有關鍵詞3。”print(ah。words_replace(content))

詞庫同樣是:

with open(“。/sensitive_words”, ‘w’) as f:    f。write(“\n”。join( [ “關鍵詞” + str(i) for i in range(0,10000)]))

使用上面的方法,將content*10000測試結果為0。1727597713470459。

小結

可以看到這個所有演算法中,在上述的基本演算法中DFA過濾敏感詞效能最高,但是實際上,對於後兩者演算法,並沒有誰一定更好,可能某些時候,AC自動機過濾敏感詞演算法會得到更高的效能,所以在生產生活中,推薦時候用兩者,可以根據自己的具體業務需要來做。

如何部署在Serverless架構下

很簡單,以AC自動機過濾敏感詞演算法為例:我們只需要增加是幾行程式碼就好,完整程式碼如下:

# -*- coding:utf-8 -*-import json, uuid# AC自動機演算法class node(object):    def __init__(self):        self。next = {}        self。fail = None        self。isWord = False        self。word = “”class ac_automation(object):    def __init__(self):        self。root = node()    # 新增敏感詞函式    def addword(self, word):        temp_root = self。root        for char in word:            if char not in temp_root。next:                temp_root。next[char] = node()            temp_root = temp_root。next[char]        temp_root。isWord = True        temp_root。word = word    # 失敗指標函式    def make_fail(self):        temp_que = []        temp_que。append(self。root)        while len(temp_que) != 0:            temp = temp_que。pop(0)            p = None            for key, value in temp。next。item():                if temp == self。root:                    temp。next[key]。fail = self。root                else:                    p = temp。fail                    while p is not None:                        if key in p。next:                            temp。next[key]。fail = p。fail                            break                        p = p。fail                    if p is None:                        temp。next[key]。fail = self。root                temp_que。append(temp。next[key])    # 查詢敏感詞函式    def search(self, content):        p = self。root        result = []        currentposition = 0        while currentposition < len(content):            word = content[currentposition]            while word in p。next == False and p != self。root:                p = p。fail            if word in p。next:                p = p。next[word]            else:                p = self。root            if p。isWord:                result。append(p。word)                p = self。root            currentposition += 1        return result    # 載入敏感詞庫函式    def parse(self, path):        with open(path, encoding=‘utf-8’) as f:            for keyword in f:                self。addword(str(keyword)。strip())    # 敏感詞替換函式    def words_replace(self, text):        “”“        :param ah: AC自動機        :param text: 文字        :return: 過濾敏感詞之後的文字        ”“”        result = list(set(self。search(text)))        for x in result:            m = text。replace(x, ‘*’ * len(x))            text = m        return textdef response(msg, error=False):    return_data = {        “uuid”: str(uuid。uuid1()),        “error”: error,        “message”: msg    }    print(return_data)    return return_dataah = ac_automation()path = ‘。/sensitive_words’ah。parse(path)def main_handler(event, context):    try:        sourceContent = json。loads(event[“body”])[“content”]        return response({            “sourceContent”: sourceContent,            “filtedContent”: ah。words_replace(sourceContent)        })    except Exception as e:        return response(str(e), True)

最後,為了方便本地測試,我們可以增加:

def test():    event = {        “requestContext”: {            “serviceId”: “service-f94sy04v”,            “path”: “/test/{path}”,            “httpMethod”: “POST”,            “requestId”: “c6af9ac6-7b61-11e6-9a41-93e8deadbeef”,            “identity”: {                “secretId”: “abdcdxxxxxxxsdfs”            },            “sourceIp”: “14。17。22。34”,            “stage”: “release”        },        “headers”: {            “Accept-Language”: “en-US,en,cn”,            “Accept”: “text/html,application/xml,application/json”,            “Host”: “********”,            “User-Agent”: “User Agent String”        },        “body”: “{\”content\“:\”這是一個測試的文字,我也就呵呵了\“}”,        “pathParameters”: {            “path”: “value”        },        “queryStringParameters”: {            “foo”: “bar”        },        “headerParameters”: {            “Refer”: “10。0。2。14”        },        “stageVariables”: {            “stage”: “release”        },        “path”: “/test/value”,        “queryString”: {            “foo”: “bar”,            “bob”: “alice”        },        “httpMethod”: “POST”    }    print(main_handler(event, None))if __name__ == “__main__”:    test()

完成之後,我們就可以測試執行一下,例如我的字典是:

呵呵測試

執行之後結果:

{‘uuid’: ‘9961ae2a-5cfc-11ea-a7c2-acde48001122’, ‘error’: False, ‘message’: {‘sourceContent’: ‘這是一個測試的文字,我也就呵呵了’, ‘filtedContent’: ‘這是一個**的文字,我也就**了’}}

接下來,我們將程式碼部署到雲端,新建serverless。yaml:

sensitive_word_filtering:  component: “@serverless/tencent-scf”  inputs:    name: sensitive_word_filtering    codeUri: 。/    exclude:      - 。gitignore      - 。git/**      - 。serverless      - 。env    handler: index。main_handler    runtime: Python3。6    region: ap-beijing    description: 敏感詞過濾    memorySize: 64    timeout: 2    events:      - apigw:          name: serverless          parameters:            environment: release            endpoints:              - path: /sensitive_word_filtering                description: 敏感詞過濾                method: POST                enableCORS: true                param:                  - name: content                    position: BODY                    required: ‘FALSE’                    type: string                    desc: 待過濾的句子

然後透過sls ——debug進行部署,部署結果:

3分鐘實踐:Python語言在Serverless架構下實現敏感詞過濾

最後,透過POSTMan進行測試:

3分鐘實踐:Python語言在Serverless架構下實現敏感詞過濾

額外的話

對於敏感詞庫去那裡獲得問題,github上有很多,可以自己搜一下,中文的英文的都有。我這裡只是一個例子;

這個API使用場景,完全可以放在我們的社群跟帖系統/留言評論系統/部落格釋出系統中,防止出現敏感詞彙,導致不必要的麻煩。

3分鐘實踐:Python語言在Serverless架構下實現敏感詞過濾