Google Sitemaps用XML自動生成ツール

Pythonで定時実行処理を実装する

Posted by ごうじん, on Sunday, January 19, 2014 01:19 PM JST
Python 2.7.9

cronなどの定時実行機構を使わず、Pythonで定時実行スケジューリングと処理の実行を、crontabモジュールを使って実装してみました。

crontabは、crontab書式をパースして、次回実行までの待ち時間(秒)を教えてくれるモジュールです。
この機能を使って、定時実行処理を実装していきます。

crontabのインストール

easy_install または pip で、crontabをインストールするだけ。とっても簡単。
$ sudo easy_install -U crontab
Password:
Searching for crontab
Reading http://pypi.python.org/simple/crontab/
Best match: crontab 0.18
Downloading https://pypi.python.org/packages/source/c/crontab/crontab-0.18.tar.gz#md5=e0306b3654d8cf6bd41d8ee7e51516fb
Processing crontab-0.18.tar.gz
Running crontab-0.18/setup.py -q bdist_egg --dist-dir /tmp/easy_install-NiTm5I/crontab-0.18/egg-dist-tmp-RKukqr
zip_safe flag not set; analyzing archive contents...
Adding crontab 0.18 to easy-install.pth file

Installed /Library/Python/2.7/site-packages/crontab-0.18-py2.7.egg
Processing dependencies for crontab
Finished processing dependencies for crontab

まずは、軽く実装してみる。

scheduled_processing.py
# -*- coding: utf-8 -*-
"""
定時実行する
"""
from crontab import CronTab
from datetime import datetime, timedelta
import logging
import math
from multiprocessing import Pool
import time

class JobConfig(object):
	"""
	処理設定
	"""

	def __init__(self, crontab, job):
		"""
		:type crontab: crontab.CronTab
		:param crontab: 実行時間設定
		:type job: function
		:param job: 実行する関数
		"""

		self._crontab = crontab
		self.job = job


	def schedule(self):
		"""
		次回実行日時を取得する。
		:rtype: datetime.datetime
		:return: 次回実行日時を
		"""

		crontab = self._crontab
		return datetime.now() + timedelta(seconds=math.ceil(crontab.next()))

	def next(self):
		"""
		次回実行時刻まで待機する時間を取得する。
		:rtype: long
		:retuen: 待機時間(秒)
		"""

		crontab = self._crontab
		return math.ceil(crontab.next())


def job_controller(jobConfig):
	""" 
	処理コントローラ
	:type crontab: crontab.CronTab
	:param crontab: 実行設定
	"""

	logging.info("->- 処理を開始しました。")

	while True:

		try:

			# 次実行日時を表示
			logging.info("-?- 次回実行日時\tschedule:%s" %
				jobConfig.schedule().strftime("%Y-%m-%d %H:%M:%S"))

			# 次実行時刻まで待機
			time.sleep(jobConfig.next())

			logging.info("-!> 処理を実行します。")

			# 処理を実行する。
			jobConfig.job()

			logging.info("-!< 処理を実行しました。")

		except KeyboardInterrupt:
			break

	logging.info("-<- 処理を終了終了しました。")


def job1():
	"""
	処理1
	"""
	# TODO:実行したい処理を書く。
	logging.debug("処理1")


def job2():
	"""
	処理2
	"""
	# TODO:実行したい処理を書く。
	logging.debug("処理2")


def main():
	"""
	"""

	# ログ設定
	logging.basicConfig(level=logging.DEBUG,
		format="time:%(asctime)s.%(msecs)03d\tprocess:%(process)d" +
			"\tmessage:%(message)s",
		datefmt="%Y-%m-%d %H:%M:%S")

	# 毎分実行設定
	jobConfigs = [

		# 処理1を1分毎に実行する。
		JobConfig(CronTab("* * * * *"), job1),

		# 処理2を2分毎に実行する。
		JobConfig(CronTab("*/2 * * * *"), job2)
	]

	# 処理を並列に実行
	p = Pool(len(jobConfigs))
	try:
		p.map(job_controller, jobConfigs)
	except KeyboardInterrupt:
		pass


if __name__ == "__main__":

	main()

ポイントは、

  1. スケジュール設定と処理関数を内包するクラスを作成する。(12〜47,113,116行目)
  2. 処理コントローラにクラスを渡す。(122行目)
  3. 処理コントローラ内で、時間が来るまで待機し、処理関数を実行する。(68,73行目)
  4. crontabから教えてもらった待ち時間を、そのままsleepすると、定時より何マイクロ秒か前に起こされるので、切り上げた待ち時間を正とする。(37,47行目)
といった所です。シーケンス図にすると、以下のような感じになります。

シーケンス図

実行結果

$ python scheduled_processing.py
time:2014-01-19 13:22:04.816	process:18525	message:->- 処理を開始しました。
time:2014-01-19 13:22:04.816	process:18526	message:->- 処理を開始しました。
time:2014-01-19 13:22:04.818	process:18525	message:-?- 次回実行日時	schedule:2014-01-19 13:23:00
time:2014-01-19 13:22:04.818	process:18526	message:-?- 次回実行日時	schedule:2014-01-19 13:24:00
time:2014-01-19 13:23:00.822	process:18525	message:-!> 処理を実行します。
time:2014-01-19 13:23:00.823	process:18525	message:処理1
time:2014-01-19 13:23:00.824	process:18525	message:-!< 処理を実行しました。
time:2014-01-19 13:23:00.825	process:18525	message:-?- 次回実行日時	schedule:2014-01-19 13:24:00
time:2014-01-19 13:24:00.826	process:18526	message:-!> 処理を実行します。
time:2014-01-19 13:24:00.827	process:18526	message:処理2
time:2014-01-19 13:24:00.828	process:18526	message:-!< 処理を実行しました。
time:2014-01-19 13:24:00.829	process:18526	message:-?- 次回実行日時	schedule:2014-01-19 13:26:00
                                 :
                                 :
^C
time:2014-01-19 13:27:43.650	process:18525	message:-<- 処理を終了終了しました。
time:2014-01-19 13:27:43.650	process:18526	message:-<- 処理を終了終了しました。

所感

crontabモジュールは小さな機能のみを提供している所が気に入っています。得られたデータを、どう処理するかは利用者にまかされているので、工夫のしがいがあります。

処理をコントロールする部分をデコレーターにすると、もう少しすっきりすると思いますので、リファクタリングして効率化したいと思います。

スケジュール設定をデコレータ化

--- scheduled_processing.py.bak	2014-01-19 13:19:00.000000000 +0900
+++ scheduled_processing.py	2014-01-19 16:41:03.000000000 +0900
@@ -14,16 +14,13 @@
 	処理設定
 	"""
 
-	def __init__(self, crontab, job):
+	def __init__(self, crontab):
 		"""
 		:type crontab: crontab.CronTab
 		:param crontab: 実行時間設定
-		:type job: function
-		:param job: 実行する関数
 		"""
 
 		self._crontab = crontab
-		self.job = job
 
 
 	def schedule(self):
@@ -47,39 +44,48 @@
 		return math.ceil(crontab.next())
 
 
-def job_controller(jobConfig):
+def job_controller(crontab):
 	""" 
 	処理コントローラ
-	:type crontab: crontab.CronTab
+	:type crontab: str
 	:param crontab: 実行設定
 	"""
+	def receive_func(job):
+		import functools
+		@functools.wraps(job)
+		def wrapper():
 
-	logging.info("->- 処理を開始しました。")
+			jobConfig = JobConfig(CronTab(crontab))
+			logging.info("->- 処理を開始しました。")
 
-	while True:
+			while True:
 
-		try:
+				try:
 
-			# 次実行日時を表示
-			logging.info("-?- 次回実行日時\tschedule:%s" %
-				jobConfig.schedule().strftime("%Y-%m-%d %H:%M:%S"))
+					# 次実行日時を表示
+					logging.info("-?- 次回実行日時\tschedule:%s" %
+						jobConfig.schedule().strftime("%Y-%m-%d %H:%M:%S"))
 
-			# 次実行時刻まで待機
-			time.sleep(jobConfig.next())
+					# 次実行時刻まで待機
+					time.sleep(jobConfig.next())
 
-			logging.info("-!> 処理を実行します。")
+					logging.info("-!> 処理を実行します。")
 
-			# 処理を実行する。
-			jobConfig.job()
+					# 処理を実行する。
+					job()
 
-			logging.info("-!< 処理を実行しました。")
+					logging.info("-!< 処理を実行しました。")
 
-		except KeyboardInterrupt:
-			break
+				except KeyboardInterrupt:
+					break
 
-	logging.info("-<- 処理を終了終了しました。")
+			logging.info("-<- 処理を終了終了しました。")
+		return wrapper
+	return receive_func
 
 
+# 1分毎に実行する。
+@job_controller("* * * * *")
 def job1():
 	"""
 	処理1
@@ -88,6 +94,8 @@
 	logging.debug("処理1")
 
 
+# 2分毎に実行する。
+@job_controller("*/2 * * * *")
 def job2():
 	"""
 	処理2
@@ -106,20 +114,16 @@
 			"\tmessage:%(message)s",
 		datefmt="%Y-%m-%d %H:%M:%S")
 
-	# 毎分実行設定
-	jobConfigs = [
-
-		# 処理1を1分毎に実行する。
-		JobConfig(CronTab("* * * * *"), job1),
-
-		# 処理2を2分毎に実行する。
-		JobConfig(CronTab("*/2 * * * *"), job2)
-	]
+	# 処理リスト作成
+	jobs = [job1, job2]
 
 	# 処理を並列に実行
-	p = Pool(len(jobConfigs))
+	p = Pool(len(jobs))
 	try:
-		p.map(job_controller, jobConfigs)
+		for job in jobs:
+			p.apply_async(job)
+		p.close()
+		p.join()
 	except KeyboardInterrupt:
 		pass
scheduled_processing.py
# -*- coding: utf-8 -*-
"""
定時実行する
"""
from crontab import CronTab
from datetime import datetime, timedelta
import logging
import math
from multiprocessing import Pool
import time

class JobConfig(object):
	"""
	処理設定
	"""

	def __init__(self, crontab):
		"""
		:type crontab: crontab.CronTab
		:param crontab: 実行時間設定
		"""

		self._crontab = crontab


	def schedule(self):
		"""
		次回実行日時を取得する。
		:rtype: datetime.datetime
		:return: 次回実行日時を
		"""

		crontab = self._crontab
		return datetime.now() + timedelta(seconds=math.ceil(crontab.next()))

	def next(self):
		"""
		次回実行時刻まで待機する時間を取得する。
		:rtype: long
		:retuen: 待機時間(秒)
		"""

		crontab = self._crontab
		return math.ceil(crontab.next())


def job_controller(crontab):
	""" 
	処理コントローラ
	:type crontab: str
	:param crontab: 実行設定
	"""
	def receive_func(job):
		import functools
		@functools.wraps(job)
		def wrapper():

			jobConfig = JobConfig(CronTab(crontab))
			logging.info("->- 処理を開始しました。")

			while True:

				try:

					# 次実行日時を表示
					logging.info("-?- 次回実行日時\tschedule:%s" %
						jobConfig.schedule().strftime("%Y-%m-%d %H:%M:%S"))

					# 次実行時刻まで待機
					time.sleep(jobConfig.next())

					logging.info("-!> 処理を実行します。")

					# 処理を実行する。
					job()

					logging.info("-!< 処理を実行しました。")

				except KeyboardInterrupt:
					break

			logging.info("-<- 処理を終了終了しました。")
		return wrapper
	return receive_func


# 1分毎に実行する。
@job_controller("* * * * *")
def job1():
	"""
	処理1
	"""
	# TODO:実行したい処理を書く。
	logging.debug("処理1")


# 2分毎に実行する。
@job_controller("*/2 * * * *")
def job2():
	"""
	処理2
	"""
	# TODO:実行したい処理を書く。
	logging.debug("処理2")


def main():
	"""
	"""

	# ログ設定
	logging.basicConfig(level=logging.DEBUG,
		format="time:%(asctime)s.%(msecs)03d\tprocess:%(process)d" +
			"\tmessage:%(message)s",
		datefmt="%Y-%m-%d %H:%M:%S")

	# 処理リスト作成
	jobs = [job1, job2]

	# 処理を並列に実行
	p = Pool(len(jobs))
	try:
		for job in jobs:
			p.apply_async(job)
		p.close()
		p.join()
	except KeyboardInterrupt:
		pass


if __name__ == "__main__":

	main()

ポイントは、

  1. 処理コントローラ(job_controller)を関数からデコレータに変更した。(47〜84行目)
  2. いつ、処理1(job1)・処理2(job2)が実行されるのか、デコレータにより直感的になった。(88,98行目)
  3. デコレータで関数が置き換わってしまわないようにする処理で、ネストが深くなった。(47〜84行目)
  4. シーケンス図でどう表現すればいいのか分からない。(^^;)
でしょうか。全体的には、見通しがよくなりました。

参考サイト