關於我
一個有思想的程式猿,終身學習實踐者,目前在一個創業團隊任team lead,技術棧涉及Android、Python、Java和Go,這個也是我們團隊的主要技術棧。
Github:https://github。com/hylinux1024
微信公眾號:終身開發者(angrycode)
上一篇《一個簡單的Python排程器》介紹了一個簡單的Python排程器的使用,後來我翻閱了一下它的原始碼,驚奇的發現核心庫才一個檔案,程式碼量短短700行不到。這是絕佳的學習材料。
讓我喜出望外的是這個庫的作者竟然就是我最近閱讀的一本書《Python Tricks》的作者!現在就讓我們看看大神的實現思路。
0x00 準備
專案地址
https://github。com/dbader/schedule
將程式碼checkout到本地
環境
PyCharm+venv+Python3
0x01 用法
這個在上一篇也介紹過了,非常簡單
import schedule# 定義需要執行的方法def job(): print(“a simple scheduler in python。”)# 設定排程的引數,這裡是每2秒執行一次schedule。every(2)。seconds。do(job)if __name__ == ‘__main__’: while True: schedule。run_pending()# 執行結果a simple scheduler in python。a simple scheduler in python。a simple scheduler in python。。。。
這個庫的文件也很詳細,可以瀏覽 https://schedule。readthedocs。io/ 瞭解庫的大概用法
0x02 專案結構
(venv) ➜ schedule git:(master) tree -L 2。。。。├── requirements-dev。txt├── schedule│ └── __init__。py├── setup。py├── test_schedule。py├── tox。ini└── venv ├── bin ├── include ├── lib ├── pip-selfcheck。json └── pyvenv。cfg8 directories, 18 files
schedule目錄下就一個__init__。py檔案,這是我們需要重點學習的地方。
setup。py檔案是釋出專案的配置檔案
test_schedule。py是單元測試檔案,一開始除了看文件外,也可以從單元測試中入手,瞭解這個庫的使用
requirements-dev。txt 開發環境的依賴庫檔案,如果核心的庫是不需要第三方的依賴的,但是單元測試需要
venv是我checkout後建立的,原本的專案是沒有的
0x03 schedule
我們知道__init__。py是定義Python包必需的檔案。在這個檔案中定義方法、類都可以在使用import命令時匯入到工程專案中,然後使用。
schedule 原始碼
以下是schedule會用到的模組,都是Python內部的模組。
import collectionsimport datetimeimport functoolsimport loggingimport randomimport reimport timelogger = logging。getLogger(‘schedule’)
然後定義了一個日誌列印工具例項
接著是定義了該模組的3個異常類的結構體系,是由Exception派生出來的,分別是ScheduleError、ScheduleValueError和IntervalError
class ScheduleError(Exception): “”“Base schedule exception”“” passclass ScheduleValueError(ScheduleError): “”“Base schedule value error”“” passclass IntervalError(ScheduleValueError): “”“An improper interval was used”“” pass
還定義了一個CancelJob的類,用於取消排程器的繼續執行
class CancelJob(object): “”“ Can be returned from a job to unschedule itself。 ”“” pass
例如在自定義的需要被排程方法中返回這個CancelJob類就可以實現一次性的任務
# 定義需要執行的方法def job(): print(“a simple scheduler in python。”) # 返回CancelJob可以停止排程器的後續執行 return schedule。CancelJob
接著就是這個庫的兩個核心類Scheduler和Job。
class Scheduler(object): “”“ Objects instantiated by the :class:`Scheduler
Scheduler是排程器的實現類,它負責排程任務(job)的建立和執行。
Job則是對需要執行任務的抽象。
這兩個類是這個庫的核心,後面我們還會看到詳細的分析。
接下來就是預設排程器default_scheduler和任務列表jobs的建立。
# The following methods are shortcuts for not having to# create a Scheduler instance:#: Default :class:`Scheduler
在執行import schedule後,就預設建立了default_scheduler。而Scheduler的構造方法為
def __init__(self): self。jobs = []
在執行初始化時,排程器就建立了一個空的任務列表。
在檔案的最後定義了一些鏈式呼叫的方法,使用起來也是非常人性化的,值得學習。
這裡的方法都定義在模組下,而且都是封裝了default_scheduler例項的呼叫。
def every(interval=1): “”“Calls :meth:`every
我們看下入口方法run_pending(),從本文一開頭的Demo可以知道這個是啟動排程器的方法。這裡它執行了default_scheduler中的方法。
default_scheduler。run_pending()
所以我們就把目光定位到Scheduler類的相應方法
def run_pending(self): “”“ Run all jobs that are scheduled to run。 Please note that it is *intended behavior that run_pending() does not run missed jobs*。 For example, if you‘ve registered a job that should run every minute and you only call run_pending() in one hour increments then your job won’t be run 60 times in between but only once。 ”“” runnable_jobs = (job for job in self。jobs if job。should_run) for job in sorted(runnable_jobs): self。_run_job(job)
這個方法中首先從jobs列表將需要執行的任務過濾後放在runnable_jobs列表,然後將其排序後順序執行內部的_run_job(job)方法
def _run_job(self, job): ret = job。run() if isinstance(ret, CancelJob) or ret is CancelJob: self。cancel_job(job)
在_run_job方法中就呼叫了job類中的run方法,並根據返回值判斷是否需要取消任務。
這時候我們要看下Job類的實現邏輯。
首先我們要看下Job是什麼時候建立的。還是從Demo中的程式碼入手
schedule。every(2)。seconds。do(job)
這裡先執行了schedule。every()方法
def every(interval=1): “”“Calls :meth:`every
這個方法就是scheduler類中的every方法
def every(self, interval=1): “”“ Schedule a new periodic job。 :param interval: A quantity of a certain time unit :return: An unconfigured :class:`Job
在這裡建立了一個任務job,並將引數interval和scheduler例項傳入到構造方法中,最後返回job例項用於實現鏈式呼叫。
跳轉到Job的構造方法
def __init__(self, interval, scheduler=None): self。interval = interval # pause interval * unit between runs self。latest = None # upper limit to the interval self。job_func = None # the job job_func to run self。unit = None # time units, e。g。 ‘minutes’, ‘hours’, 。。。 self。at_time = None # optional time at which this job runs self。last_run = None # datetime of the last run self。next_run = None # datetime of the next run self。period = None # timedelta between runs, only valid for self。start_day = None # Specific day of the week to start on self。tags = set() # unique set of tags for the job self。scheduler = scheduler # scheduler to register with
主要初始化了間隔時間配置、需要執行的方法、排程器各種時間單位等。
執行every方法之後又呼叫了seconds這個屬性方法
@propertydef seconds(self): self。unit = ‘seconds’ return self
設定了時間單位,這個設定秒,當然還有其它類似的屬性方法minutes、hours、days等等。
最後就是執行了do方法
def do(self, job_func, *args, **kwargs): “”“ Specifies the job_func that should be called every time the job runs。 Any additional arguments are passed on to job_func when the job runs。 :param job_func: The function to be scheduled :return: The invoked job instance ”“” self。job_func = functools。partial(job_func, *args, **kwargs) try: functools。update_wrapper(self。job_func, job_func) except AttributeError: # job_funcs already wrapped by functools。partial won‘t have # __name__, __module__ or __doc__ and the update_wrapper() # call will fail。 pass self。_schedule_next_run() self。scheduler。jobs。append(self) return self
在這裡使用functools工具的中的偏函式partial將我們自定義的方法封裝成可呼叫的物件
然後就呼叫_schedule_next_run方法,它主要是對時間的解析,按照時間對job排序,我覺得這個方法是本專案中的技術點,邏輯也是稍微複雜一丟丟,仔細閱讀就可以看懂,主要是對時間datetime的使用。由於篇幅,這裡就不再貼出程式碼。
這裡就完成了任務job的新增。然後在呼叫run_pending方法中就可以讓任務執行。
0x04 總結一下
schedule庫定義兩個核心類Scheduler和Job。在匯入包時就預設建立一個Scheduler物件,並初始化任務列表。
schedule模組提供了鏈式呼叫的介面,在配置schedule引數時,就會建立任務物件job,並會將job新增到任務列表中,最後在執行run_pending方法時,就會呼叫我們自定義的方法。
這個庫的核心思想是使用面向物件方法,對事物能夠準確地抽象,它總體的邏輯並不複雜,是學習原始碼很不錯的範例。
0x05 學習資料
https://github。com/dbader/schedule
https://schedule。readthedocs。io