Python原始碼學習Schedule

關於我

一個有思想的程式猿,終身學習實踐者,目前在一個創業團隊任team lead,技術棧涉及Android、Python、Java和Go,這個也是我們團隊的主要技術棧。

Github:https://github。com/hylinux1024

微信公眾號:終身開發者(angrycode)

Python原始碼學習Schedule

上一篇《一個簡單的Python排程器》介紹了一個簡單的Python排程器的使用,後來我翻閱了一下它的原始碼,驚奇的發現核心庫才一個檔案,程式碼量短短700行不到。這是絕佳的學習材料。

讓我喜出望外的是這個庫的作者竟然就是我最近閱讀的一本書《Python Tricks》的作者!現在就讓我們看看大神的實現思路。

0x00 準備

專案地址

https://github。com/dbader/schedule

將程式碼checkout到本地

環境

PyCharm+venv+Python3

0x01 用法

這個在上一篇也介紹過了,非常簡單

import schedule# 定義需要執行的方法def job(): print(“a simple scheduler in python。”)# 設定排程的引數,這裡是每2秒執行一次schedule。every(2)。seconds。do(job)if __name__ == ‘__main__’: while True: schedule。run_pending()# 執行結果a simple scheduler in python。a simple scheduler in python。a simple scheduler in python。。。。

這個庫的文件也很詳細,可以瀏覽 https://schedule。readthedocs。io/ 瞭解庫的大概用法

0x02 專案結構

(venv) ➜ schedule git:(master) tree -L 2。。。。├── requirements-dev。txt├── schedule│ └── __init__。py├── setup。py├── test_schedule。py├── tox。ini└── venv ├── bin ├── include ├── lib ├── pip-selfcheck。json └── pyvenv。cfg8 directories, 18 files

schedule目錄下就一個__init__。py檔案,這是我們需要重點學習的地方。

setup。py檔案是釋出專案的配置檔案

test_schedule。py是單元測試檔案,一開始除了看文件外,也可以從單元測試中入手,瞭解這個庫的使用

requirements-dev。txt 開發環境的依賴庫檔案,如果核心的庫是不需要第三方的依賴的,但是單元測試需要

venv是我checkout後建立的,原本的專案是沒有的

0x03 schedule

我們知道__init__。py是定義Python包必需的檔案。在這個檔案中定義方法、類都可以在使用import命令時匯入到工程專案中,然後使用。

schedule 原始碼

以下是schedule會用到的模組,都是Python內部的模組。

import collectionsimport datetimeimport functoolsimport loggingimport randomimport reimport timelogger = logging。getLogger(‘schedule’)

然後定義了一個日誌列印工具例項

接著是定義了該模組的3個異常類的結構體系,是由Exception派生出來的,分別是ScheduleError、ScheduleValueError和IntervalError

class ScheduleError(Exception): “”“Base schedule exception”“” passclass ScheduleValueError(ScheduleError): “”“Base schedule value error”“” passclass IntervalError(ScheduleValueError): “”“An improper interval was used”“” pass

還定義了一個CancelJob的類,用於取消排程器的繼續執行

class CancelJob(object): “”“ Can be returned from a job to unschedule itself。 ”“” pass

例如在自定義的需要被排程方法中返回這個CancelJob類就可以實現一次性的任務

# 定義需要執行的方法def job(): print(“a simple scheduler in python。”) # 返回CancelJob可以停止排程器的後續執行 return schedule。CancelJob

接著就是這個庫的兩個核心類Scheduler和Job。

class Scheduler(object): “”“ Objects instantiated by the :class:`Scheduler ` are factories to create jobs, keep record of scheduled jobs and handle their execution。 ”“”class Job(object): “”“ A periodic job as used by :class:`Scheduler`。 :param interval: A quantity of a certain time unit :param scheduler: The :class:`Scheduler ` instance that this job will register itself with once it has been fully configured in :meth:`Job。do()`。 Every job runs at a given fixed time interval that is defined by: * a :meth:`time unit ` * a quantity of `time units` defined by `interval` A job is usually created and returned by :meth:`Scheduler。every` method, which also defines its `interval`。 ”“”

Scheduler是排程器的實現類,它負責排程任務(job)的建立和執行。

Job則是對需要執行任務的抽象。

這兩個類是這個庫的核心,後面我們還會看到詳細的分析。

接下來就是預設排程器default_scheduler和任務列表jobs的建立。

# The following methods are shortcuts for not having to# create a Scheduler instance:#: Default :class:`Scheduler ` objectdefault_scheduler = Scheduler()#: Default :class:`Jobs ` listjobs = default_scheduler。jobs # todo: should this be a copy, e。g。 jobs()?

在執行import schedule後,就預設建立了default_scheduler。而Scheduler的構造方法為

def __init__(self): self。jobs = []

在執行初始化時,排程器就建立了一個空的任務列表。

在檔案的最後定義了一些鏈式呼叫的方法,使用起來也是非常人性化的,值得學習。

這裡的方法都定義在模組下,而且都是封裝了default_scheduler例項的呼叫。

def every(interval=1): “”“Calls :meth:`every ` on the :data:`default scheduler instance `。 ”“” return default_scheduler。every(interval)def run_pending(): “”“Calls :meth:`run_pending ` on the :data:`default scheduler instance `。 ”“” default_scheduler。run_pending()def run_all(delay_seconds=0): “”“Calls :meth:`run_all ` on the :data:`default scheduler instance `。 ”“” default_scheduler。run_all(delay_seconds=delay_seconds)def clear(tag=None): “”“Calls :meth:`clear ` on the :data:`default scheduler instance `。 ”“” default_scheduler。clear(tag)def cancel_job(job): “”“Calls :meth:`cancel_job ` on the :data:`default scheduler instance `。 ”“” default_scheduler。cancel_job(job)def next_run(): “”“Calls :meth:`next_run ` on the :data:`default scheduler instance `。 ”“” return default_scheduler。next_rundef idle_seconds(): “”“Calls :meth:`idle_seconds ` on the :data:`default scheduler instance `。 ”“” return default_scheduler。idle_seconds

我們看下入口方法run_pending(),從本文一開頭的Demo可以知道這個是啟動排程器的方法。這裡它執行了default_scheduler中的方法。

default_scheduler。run_pending()

所以我們就把目光定位到Scheduler類的相應方法

def run_pending(self): “”“ Run all jobs that are scheduled to run。 Please note that it is *intended behavior that run_pending() does not run missed jobs*。 For example, if you‘ve registered a job that should run every minute and you only call run_pending() in one hour increments then your job won’t be run 60 times in between but only once。 ”“” runnable_jobs = (job for job in self。jobs if job。should_run) for job in sorted(runnable_jobs): self。_run_job(job)

這個方法中首先從jobs列表將需要執行的任務過濾後放在runnable_jobs列表,然後將其排序後順序執行內部的_run_job(job)方法

def _run_job(self, job): ret = job。run() if isinstance(ret, CancelJob) or ret is CancelJob: self。cancel_job(job)

在_run_job方法中就呼叫了job類中的run方法,並根據返回值判斷是否需要取消任務。

這時候我們要看下Job類的實現邏輯。

首先我們要看下Job是什麼時候建立的。還是從Demo中的程式碼入手

schedule。every(2)。seconds。do(job)

這裡先執行了schedule。every()方法

def every(interval=1): “”“Calls :meth:`every ` on the :data:`default scheduler instance `。 ”“” return default_scheduler。every(interval)

這個方法就是scheduler類中的every方法

def every(self, interval=1): “”“ Schedule a new periodic job。 :param interval: A quantity of a certain time unit :return: An unconfigured :class:`Job ` ”“” job = Job(interval, self) return job

在這裡建立了一個任務job,並將引數interval和scheduler例項傳入到構造方法中,最後返回job例項用於實現鏈式呼叫。

跳轉到Job的構造方法

def __init__(self, interval, scheduler=None): self。interval = interval # pause interval * unit between runs self。latest = None # upper limit to the interval self。job_func = None # the job job_func to run self。unit = None # time units, e。g。 ‘minutes’, ‘hours’, 。。。 self。at_time = None # optional time at which this job runs self。last_run = None # datetime of the last run self。next_run = None # datetime of the next run self。period = None # timedelta between runs, only valid for self。start_day = None # Specific day of the week to start on self。tags = set() # unique set of tags for the job self。scheduler = scheduler # scheduler to register with

主要初始化了間隔時間配置、需要執行的方法、排程器各種時間單位等。

執行every方法之後又呼叫了seconds這個屬性方法

@propertydef seconds(self): self。unit = ‘seconds’ return self

設定了時間單位,這個設定秒,當然還有其它類似的屬性方法minutes、hours、days等等。

最後就是執行了do方法

def do(self, job_func, *args, **kwargs): “”“ Specifies the job_func that should be called every time the job runs。 Any additional arguments are passed on to job_func when the job runs。 :param job_func: The function to be scheduled :return: The invoked job instance ”“” self。job_func = functools。partial(job_func, *args, **kwargs) try: functools。update_wrapper(self。job_func, job_func) except AttributeError: # job_funcs already wrapped by functools。partial won‘t have # __name__, __module__ or __doc__ and the update_wrapper() # call will fail。 pass self。_schedule_next_run() self。scheduler。jobs。append(self) return self

在這裡使用functools工具的中的偏函式partial將我們自定義的方法封裝成可呼叫的物件

然後就呼叫_schedule_next_run方法,它主要是對時間的解析,按照時間對job排序,我覺得這個方法是本專案中的技術點,邏輯也是稍微複雜一丟丟,仔細閱讀就可以看懂,主要是對時間datetime的使用。由於篇幅,這裡就不再貼出程式碼。

這裡就完成了任務job的新增。然後在呼叫run_pending方法中就可以讓任務執行。

0x04 總結一下

schedule庫定義兩個核心類Scheduler和Job。在匯入包時就預設建立一個Scheduler物件,並初始化任務列表。

schedule模組提供了鏈式呼叫的介面,在配置schedule引數時,就會建立任務物件job,並會將job新增到任務列表中,最後在執行run_pending方法時,就會呼叫我們自定義的方法。

這個庫的核心思想是使用面向物件方法,對事物能夠準確地抽象,它總體的邏輯並不複雜,是學習原始碼很不錯的範例。

0x05 學習資料

https://github。com/dbader/schedule

https://schedule。readthedocs。io