python併發2:使用asyncio處理併發

asyncio

asyncio

是Python3。4 之後引入的標準庫的,這個包使用事件迴圈驅動的協程實現併發。

asyncio 包在引入標準庫之前代號

“Tulip”(鬱金香)

,所以在網上搜索資料時,會經常看到這種花的名字。

什麼是事件迴圈?

wiki 上說:

事件迴圈是”一種等待程式分配事件或者訊息的程式設計架構“。基本上來說事件迴圈就是:

”當A發生時,執行B"

。或者用最簡單的例子來解釋這一概念就是每個瀏覽器中都存在的JavaScript事件迴圈。當你點選了某個東西(“當A發生時”),這一點選動作會發送給JavaScript的事件迴圈,並檢查是否存在註冊過的onclick 回撥來處理這一點選(執行B)。只要有註冊過的回撥函式就會伴隨點選動作的細節資訊被執行。事件迴圈被認為是一種虛幻是因為它不停的手機事件並透過迴圈來發如何應對這些事件。

對 Python 來說,用來提供事件迴圈的 asyncio 被加入標準庫中。asyncio 重點解決網路服務中的問題,事件迴圈在這裡將來自套接字(socket)的 I/O 已經準備好讀和/或寫作為“當A發生時”(透過selectors模組)。除了 GUI 和 I/O,事件迴圈也經常用於在別的執行緒或子程序中執行程式碼,並將事件迴圈作為調節機制(例如,合作式多工)。如果你恰好理解 Python 的 GIL,事件迴圈對於需要釋放 GIL 的地方很有用。

執行緒與協程

我們先看兩斷程式碼,分別用 threading 模組和asyncio 包實現的一段程式碼。

python併發2:使用asyncio處理併發

執行一下,結果大致是這個樣子:

python併發2:使用asyncio處理併發

這是一個動圖,“thinking“ 前的 \ 線是會動的(為了錄屏,我把sleep 的時間調大了)

python 並沒有提供終止執行緒的API,所以若想關閉執行緒,必須給執行緒傳送訊息。這裡我們使用signal。go 屬性:在主執行緒中把它設定為False後,spinner 執行緒會接收到,然後退出

現在我們再看下使用 asyncio 包的版本:

python併發2:使用asyncio處理併發

除非想阻塞主執行緒,從而凍結事件迴圈或整個應用,否則不要再 asyncio 協程中使用 time。sleep()。

如果協程需要在一段時間內什麼都不做,應該使用 yield from asyncio。sleep(DELAY)

使用 @asyncio。coroutine 裝飾器不是強制要求,但建議這麼做因為這樣能在程式碼中突顯協程,如果還沒從中產出值,協程就把垃圾回收了(意味著操作未完成,可能有缺陷),可以發出警告。這個裝飾器不會預激協程。

這兩段程式碼的執行結果基本相同,現在我們看一下兩段程式碼的核心程式碼 supervisor 主要區別:

asyncio。Task 物件差不多與 threading。Thread 物件等效(Task 物件像是實現寫作時多工的庫中的綠色執行緒

Task 物件用於驅動協程,Thread 物件用於呼叫可呼叫的物件

Task 物件不由自己動手例項化,而是透過把協程傳給 asyncio。async(。。。) 函式或 loop。create_task(。。。) 方法獲取

獲取的Task 物件已經排定了執行時間;Thread 例項必須呼叫start方法,明確告知它執行

線上程版supervisor函式中,slow_function 是普通的函式,由執行緒直接呼叫,而非同步版的slow_function 函式是協程,由yield from 驅動。

沒有API能從外部終止執行緒,因為執行緒隨時可能被中斷。而如果想終止任務,可以使用Task。cancel() 例項方法,在

協程

內部丟擲CancelledError 異常。協程可以在暫停的yield 處捕獲這個異常,處理終止請求

supervisor

協程

必須在main 函式中由loop。run_until_complete 方法執行。

協程和

執行緒相比關鍵的一個優點是,

執行緒必須記住保留鎖,去保護程式中的重要部分,防止多步操作再執行的過程中中斷,防止山水處於於曉狀態

協程預設會做好保護,我們必須顯式產出(使用yield 或 yield from 交出控制權)才能讓程式的餘下部分執行。

asyncio.Future:故意不阻塞

asynci。Future 類與 concurrent。futures。Future 類的介面基本一致,不過實現方式不同,不可互換。

上一篇python併發 1:使用 futures 處理併發我們介紹過 concurrent。futures。Future 的 future,在 concurrent。futures。Future 中,future只是排程執行某物的結果。在 asyncio 包中,BaseEventLoop。create_task(。。。) 方法接收一個協程,排定它的執行時間,然後返回一個asyncio。Task 例項(也是asyncio。Future 類的例項,因為 Task 是 Future 的子類,用於包裝協程。(在 concurrent。futures。Future 中,類似的操作是Executor。submit(。。。))。

與concurrent。futures。Future 類似,asyncio。Future 類也提供了

。done() 返回布林值,表示Future 是否已經執行

。add_done_callback() 這個方法只有一個引數,型別是可呼叫物件,Future執行結束後會回撥這個物件。

。result() 這個方法沒有引數,因此不能指定超時時間。 如果呼叫 。result() 方法時期還沒有執行完畢,會丟擲 asyncio。InvalidStateError 異常。

對應的 concurrent。futures。Future 類中的 Future 執行結束後呼叫result(), 會返回可呼叫物件的結果或者丟擲執行可呼叫物件時丟擲的異常,如果是 Future 沒有執行結束時呼叫 f。result()方法,這時會阻塞呼叫方所在的執行緒,直到有結果返回。此時result 方法還可以接收 timeout 引數,如果在指定的時間內 Future 沒有執行完畢,會丟擲 TimeoutError 異常。

我們使用asyncio。Future 時, 通常使用yield from,從中獲取結果,而不是使用 result()方法

yield from 表示式在暫停的協程中生成返回值,回覆執行過程。

asyncio。Future 類的目的是與 yield from 一起使用,所以通常不需要使用以下方法:

不需呼叫 my_future。add_down_callback(。。。), 因為可以直接把想在 future 執行結束後的操作放在協程中 yield from my_future 表示式的後邊。(因為協程可以暫停和恢復函式)

無需呼叫 my_future。result(), 因為 yield from 產生的結果就是(result = yield from my_future)

在 asyncio 包中,可以使用yield from 從asyncio。Future 物件中產出結果。這也就意味著我們可以這麼寫:

python併發2:使用asyncio處理併發

asyncio.async(...) 函式

python併發2:使用asyncio處理併發

這個函式統一了協程和Future: 第一個引數可以是二者中的任意一個。如果是Future 或者 Task 物件,就直接返回,如果是協程,那麼async 函式會自動呼叫 loop。create_task(。。。) 方法建立 Task 物件。 loop 引數是可選的,用於傳入事件迴圈; 如果沒有傳入,那麼async函式會透過呼叫asyncio。get_event_loop() 函式獲取迴圈物件。

BaseEventLoop.create_task(coro)

這個方法排定協程的執行時間,返回一個 asyncio。Task 物件。如果在自定義的BaseEventLoop 子類上呼叫,返回的物件可能是外部庫中與Task類相容的某個類的例項。

BaseEventLoop。create_task() 方法只在Python3。4。2 及以上版本可用。 Python3。3 只能使用 asyncio。async(。。。)函式。

如果想在Python控制檯或者小型測試指令碼中實驗future和協程,可以使用下面的片段:

python併發2:使用asyncio處理併發

使用asyncio 和 aiohttp 包下載

現在,我們瞭解了asyncio 的基礎知識,是時候使用asyncio 來重寫我們 上一篇 python併發 1:使用 futures 處理併發 下載國旗的指令碼了。

先看一下程式碼:

python併發2:使用asyncio處理併發

這段程式碼的執行簡述如下:

在download_many 函式獲取一個事件迴圈,處理呼叫download_one 函式生成的幾個協程物件

asyncio 事件迴圈一次啟用各個協程

客戶程式碼中的協程(get_flag)使用 yield from 把指責委託給庫裡的協程(aiohttp。request)時,控制權交還給事件迴圈,執行之前排定的協程

事件迴圈透過基於回撥的底層API,在阻塞的操作執行完畢後獲得通知。

獲得通知後,主迴圈把結果發給暫停的協程

協程向前執行到下一個yield from 表示式,例如 get_flag 函式的yield from resp。read()。事件迴圈再次得到控制權,重複第4~6步,直到迴圈終止。

download_many 函式中,我們使用了 asyncio。wait(。。。) 函式,這個函式是一個協程,協程的引數是一個由future或者協程構成的可迭代物件;wait 會分別把各個協程包裝進一個Task物件。最終的結果是,wait 處理的所有物件都透過某種方式變成Future 類的例項。

wait 是協程函式,因此,返回的是一個協程或者生成器物件;waite_coro 變數中儲存的就是這種物件

loop。run_until_complete 方法的引數是一個future 或協程。如果是協程,run_until_complete 方法與 wait 函式一樣,把協程包裝進一個Task 物件中。這裡 run_until_complete 方法把 wait_coro 包裝進一個Task 物件中,由yield from 驅動。wait_coro 執行結束後返回兩個引數,第一個引數是結束的future 第二個引數是未結束的future。

wait

有兩個命名引數,timeout 和 return_when 如果設定了可能會返回未結束的future。

有一點你可能也注意到了,我們重寫了get_flags 函式,是因為之前用到的 requests 庫執行的是阻塞型I/O操作。為了使用 asyncio 包,我們必須把函式改成非同步版。

小技巧

如果你覺得 使用了協程後代碼難以理解,可以採用 Python之父(Guido van Rossum)的建議,假裝沒有yield from。

已上邊這段程式碼為例:

python併發2:使用asyncio處理併發

知識點

在asyncio 包的API中使用 yield from 時,有個細節要注意:

使用asyncio包時,我們編寫的非同步程式碼中包含由asyncio本身驅動的協程(委派生成器),而生成器最終把指責委託給asyncio包或者第三方庫中的協程。這種處理方式相當於架起了管道,讓asyncio事件迴圈驅動執行底層非同步I/O的庫函式。

避免阻塞型呼叫

我們先看一個圖,這個圖顯示了電腦從不同儲存介質中讀取資料的延遲情況:

python併發2:使用asyncio處理併發

透過這個圖,我們可以看到,阻塞型呼叫對於CPU來說是巨大的浪費。有什麼辦法可以避免阻塞型呼叫中止整個應用程式麼?

有兩種方法:

在單獨的執行緒中執行各個阻塞型操作

把每個阻塞型操作轉化成非阻塞的非同步呼叫使用

當然我們推薦第二種方案,因為第一種方案中如果每個連線都使用一個執行緒,成本太高。

第二種我們可以使用把生成器當做協程使用的方式實現非同步程式設計。對事件迴圈來說,呼叫回撥與在暫停的協程上呼叫 。send() 方法效果差不多。各個暫停的協程消耗的記憶體比執行緒小的多。

現在,你應該能理解為什麼 flags_asyncio。py 指令碼比 flags。py 快的多了吧。

因為flags。py 是依次同步下載,每次下載都要用幾十億個CPU週期等待結果。而在flags_asyncio。py中,在download_many 函式中呼叫loop。run_until_complete 方法時,事件迴圈驅動各個download_one 協程,執行到yield from 表示式出,那個表示式又驅動各個 get_flag 協程,執行到第一個yield from 表示式處,呼叫 aiohttp。request()函式。這些呼叫不會阻塞,因此在零點幾秒內所有請求都可以全部開始。

改進 asyncio 下載指令碼

現在我們改進一下上邊的 flags_asyncio。py,在其中新增上異常處理,計數器

python併發2:使用asyncio處理併發

由於協程發起的請求速度較快,為了防止向伺服器發起太多的併發請求,使伺服器過載,我們在download_coro 函式中建立一個asyncio。Semaphore 例項,然後把它傳給download_one 函式。

Semaphore

物件維護著一個內部計數器,若在物件上呼叫

.acquire()

協程方法,計數器則遞減;若在物件上呼叫

.release()

協程方法,計數器則遞增。計數器的值是在初始化的時候設定。

如果計數器大於0,那麼呼叫

.acquire()

方法不會阻塞,如果計數器為0,

.acquire()

方法會阻塞呼叫這個方法的協程,直到其他協程在同一個 Semaphore 物件上呼叫

.release()

方法,讓計數器遞增。

在上邊的程式碼中,我們並沒有手動呼叫 。acquire() 或 。release() 方法,而是在 download_one 函式中 把 semaphore 當做上下文管理器使用:

python併發2:使用asyncio處理併發

這段程式碼保證,任何時候都不會有超過 MAX_CONCUR_REQ 個 get_flag 協程啟動。

使用 asyncio.as_completed 函式

因為要使用 yield from 獲取 asyncio。as_completed 函式產出的future的結果,所以 as_completed 函式秩序在協程中呼叫。由於 download_many 要作為引數傳給非協程的main 函式,我已我們添加了一個新的 downloader_coro 協程,讓download_many 函式只用於設定事件迴圈。

使用Executor 物件,防止阻塞事件迴圈

現在我們回去看下上邊

關於電腦從不同儲存介質讀取資料的延遲情況圖

,有一個實時需要注意,那就是訪問本地檔案系統也會阻塞。

上邊的程式碼中,save_flag 函式阻塞了客戶程式碼與 asyncio 事件迴圈公用的唯一執行緒,因此儲存檔案時,整個應用程式都會暫停。為了避免這個問題,可以使用事件迴圈物件的 run_in_executor 方法。

asyncio 的事件迴圈在後臺維護著一個ThreadPoolExecutor 物件,我們可以呼叫 run_in_executor 方法,把可呼叫的物件發給它執行。

下邊是我們改動後的程式碼:

python併發2:使用asyncio處理併發

run_in_executor 方法的第一個引數是Executor 例項;如果設為None,使用事件迴圈的預設 ThreadPoolExecutor 例項。

從回撥到future到協程

在接觸協程之前,我們可能對回撥有一定的認識,那麼和回撥相比,協程有什麼改進呢?

python中的回撥程式碼樣式:

python併發2:使用asyncio處理併發

上邊的程式碼的缺陷:

容易出現回撥地獄

程式碼難以閱讀

在這個問題上,協程能發揮很大的作用。如果換成協程和yield from 結果做的非同步程式碼,程式碼示例如下:

python併發2:使用asyncio處理併發

和之前的程式碼相比,這個程式碼就容易理解多了。如果非同步呼叫 api_call1,api_call2,api_call3 會丟擲異常,那麼可以把相應的 yield from 表示式放在 try/except 塊中處理異常。

使用協程必須習慣 yield from 表示式,並且協程不能直接呼叫,必須顯式的排定協程的執行時間,或在其他排定了執行時間的協程中使用yield from 表示式吧它啟用。如果不使用 loop。create_task(three_stages(request1)),那麼什麼都不會發生。

下面我們用一個實際的例子來演示一下:

每次下載發起多次請求

我們修改一下上邊下載國旗的程式碼,使在下載國旗的同時還可以獲取國家名稱在儲存圖片的時候使用。

我們使用協程和yield from 解決這個問題:

python併發2:使用asyncio處理併發

在這段程式碼中,我們在download_one 函式中分別在 semaphore 控制的兩個with 塊中呼叫get_flag 和 get_country,是為了節約時間。

get_flag 的return 語句在外層加上括號,是因為() 的運算子優先順序高,會先執行括號內的yield from 語句 返回的結果。如果不加 會報句法錯誤

加() ,相當於

python併發2:使用asyncio處理併發

如果不加(),那麼程式會在 yield from 處中斷,交出控制權,這時使用return 會報句法錯誤。

總結

這一篇我們討論了:

對比了一個多執行緒程式和asyncio版,說明了多執行緒和非同步任務之間的關係

比較了 asyncio。Future 類 和 concurrent。futures。Future 類的區別

如何使用非同步程式設計管理網路應用中的高併發

在非同步程式設計中,與回撥相比,協程顯著提升效能的方式

下一篇,我們將介紹如何使用asyncio包編寫伺服器

參考連結

[1] python併發 1:使用 futures 處理併發:

http://blog。gusibi。site/post/python-concurrency-with-futures/

[2] python併發 1:使用 futures 處理併發:

http://blog。gusibi。site/post/python-concurrency-with-futures/

[3] class asyncio。Semaphore:

https://docs。python。org/3/library/asyncio-sync。html#asyncio。Semaphore

[4] asyncio — Asynchronous I/O, event loop, coroutines and tasks:

https://docs。python。org/3/library/asyncio。html

[5] 【譯】 Python 3。5 協程究竟是個啥:

http://blog。rainy。im/2016/03/10/how-the-heck-does-async-await-work-in-python-3-5/

[6] PEP 0492 Coroutines with async and await syntax:

https://www。python。org/dev/peps/pep-0492/

[7] Python 之 asyncio:

https://juejin。im/entry/57b138e1165abd00542ab1fa

[8] 我所不能理解的Python中的Asyncio模組:

https://python。freelycode。com/contribution/detail/515