python mysqlに基づいて実装された単純なキューおよびスパンロックインスタンスの詳細
5923 ワード
通常、マルチプロセスアプリケーションの開発では、複数のプロセスが同じリソース(臨界リソース)にアクセスすることは避けられません.この場合、グローバルなロックを追加することで、リソースの同期アクセスを実現する必要があります(つまり、同じ時間に1つのプロセスしかリソースにアクセスできません).
例を挙げると次のようになります.
mysqlを使用してタスクキューを実装すると、次のようになります.
1.次のようにキュー・タスクを格納するためのJobテーブルをMysqlに作成します.
Messageはタスク情報を格納するために使用されますjob_statusは、タスクステータスを識別するために使用されます.0:キュー、1:キューが2つしかないと仮定します.生産者プロセスがあり、jobテーブルに新しいデータを配置し、キューを作ります.
3.複数の消費者プロセスがあると仮定し、jobテーブルからキュー情報を取得し、次の操作を行います.
4.プロセスにまたがるロックがない場合、2つの消費者プロセスは同時に重複したメッセージを受け取り、1つのメッセージが複数回消費される可能性がある.このような状況は私たちが望んでいないので、プロセス間のロックを実現する必要があります.
==============================================
プロセスにまたがるロックの実装といえば、主にいくつかの実装方法があります.
(1)信号量(2)ファイルロックfcntl(3)socket(ポート番号バインド)(4)signalといういくつかの方法はそれぞれメリットとデメリットがあり、全体的に前の2つの方法が多いかもしれませんが、ここでは詳しくは言いません.資料を調べてみてください.資料を調べたところmysqlにはロックの実現があり、性能に対する要求があまり高くない応用シーンに適しており、大同時の分散アクセスにボトルネックがある可能性がある.これに対してpythonでdemoを実現した.ファイル名:glock.py
main関数:l.lock(lock_name,10)では、10はtimeoutを表す時間が10秒であり、10秒でロックが取れない場合は戻り、後の操作を実行します.このdemoでは,TODOがマークされている場所で,消費者がjobテーブルからメッセージを取り出す論理をここに置くことができる.すなわち分割線以上のものである.
2.複数の消費者プロセスがあると仮定し、jobテーブルからキュー情報を取得し、次の操作を行います.
これにより,複数のプロセスが臨界リソースにアクセスする際の同期が保証され,データの一貫性が保証される.テスト時にglockを2つ起動します.py、結果は次のとおりです.
最初のglockが見えます.pyは17:08:50にロックを解除した、次のglockです.pyは17:08:50にロックを取得し,これが完全に実行可能であることを確認した.
例を挙げると次のようになります.
mysqlを使用してタスクキューを実装すると、次のようになります.
1.次のようにキュー・タスクを格納するためのJobテーブルをMysqlに作成します.
create table jobs(
id auto_increment not null primary key,
message text not null,
job_status not null default 0
);
Messageはタスク情報を格納するために使用されますjob_statusは、タスクステータスを識別するために使用されます.0:キュー、1:キューが2つしかないと仮定します.生産者プロセスがあり、jobテーブルに新しいデータを配置し、キューを作ります.
insert into jobs(message) values('msg1');
3.複数の消費者プロセスがあると仮定し、jobテーブルからキュー情報を取得し、次の操作を行います.
select * from jobs where job_status=0 order by id asc limit 1;
update jobs set job_status=1 where id = ?; -- id id
4.プロセスにまたがるロックがない場合、2つの消費者プロセスは同時に重複したメッセージを受け取り、1つのメッセージが複数回消費される可能性がある.このような状況は私たちが望んでいないので、プロセス間のロックを実現する必要があります.
==============================================
プロセスにまたがるロックの実装といえば、主にいくつかの実装方法があります.
(1)信号量(2)ファイルロックfcntl(3)socket(ポート番号バインド)(4)signalといういくつかの方法はそれぞれメリットとデメリットがあり、全体的に前の2つの方法が多いかもしれませんが、ここでは詳しくは言いません.資料を調べてみてください.資料を調べたところmysqlにはロックの実現があり、性能に対する要求があまり高くない応用シーンに適しており、大同時の分散アクセスにボトルネックがある可能性がある.これに対してpythonでdemoを実現した.ファイル名:glock.py
#!/usr/bin/env python2.7
#
# -*- coding:utf-8 -*-
#
# Desc :
#
import logging, time
import MySQLdb
class Glock:
def __init__(self, db):
self.db = db
def _execute(self, sql):
cursor = self.db.cursor()
try:
ret = None
cursor.execute(sql)
if cursor.rowcount != 1:
logging.error("Multiple rows returned in mysql lock function.")
ret = None
else:
ret = cursor.fetchone()
cursor.close()
return ret
except Exception, ex:
logging.error("Execute sql \"%s\" failed! Exception: %s", sql, str(ex))
cursor.close()
return None
def lock(self, lockstr, timeout):
sql = "SELECT GET_LOCK('%s', %s)" % (lockstr, timeout)
ret = self._execute(sql)
if ret[0] == 0:
logging.debug("Another client has previously locked '%s'.", lockstr)
return False
elif ret[0] == 1:
logging.debug("The lock '%s' was obtained successfully.", lockstr)
return True
else:
logging.error("Error occurred!")
return None
def unlock(self, lockstr):
sql = "SELECT RELEASE_LOCK('%s')" % (lockstr)
ret = self._execute(sql)
if ret[0] == 0:
logging.debug("The lock '%s' the lock is not released(the lock was not established by this thread).", lockstr)
return False
elif ret[0] == 1:
logging.debug("The lock '%s' the lock was released.", lockstr)
return True
else:
logging.error("The lock '%s' did not exist.", lockstr)
return None
#Init logging
def init_logging():
sh = logging.StreamHandler()
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s -%(module)s:%(filename)s-L%(lineno)d-%(levelname)s: %(message)s')
sh.setFormatter(formatter)
logger.addHandler(sh)
logging.info("Current log level is : %s",logging.getLevelName(logger.getEffectiveLevel()))
def main():
init_logging()
db = MySQLdb.connect(host='localhost', user='root', passwd='')
lock_name = 'queue'
l = Glock(db)
ret = l.lock(lock_name, 10)
if ret != True:
logging.error("Can't get lock! exit!")
quit()
time.sleep(10)
logging.info("You can do some synchronization work across processes!")
##TODO
## you can do something in here ##
l.unlock(lock_name)
if __name__ == "__main__":
main()
main関数:l.lock(lock_name,10)では、10はtimeoutを表す時間が10秒であり、10秒でロックが取れない場合は戻り、後の操作を実行します.このdemoでは,TODOがマークされている場所で,消費者がjobテーブルからメッセージを取り出す論理をここに置くことができる.すなわち分割線以上のものである.
2.複数の消費者プロセスがあると仮定し、jobテーブルからキュー情報を取得し、次の操作を行います.
select * from jobs where job_status=0 order by id asc limit 1;
update jobs set job_status=1 where id = ?; -- id id
これにより,複数のプロセスが臨界リソースにアクセスする際の同期が保証され,データの一貫性が保証される.テスト時にglockを2つ起動します.py、結果は次のとおりです.
[@tj-10-47 test]# ./glock.py
2014-03-14 17:08:40,277 -glock:glock.py-L70-INFO: Current log level is : DEBUG
2014-03-14 17:08:40,299 -glock:glock.py-L43-DEBUG: The lock 'queue' was obtained successfully.
2014-03-14 17:08:50,299 -glock:glock.py-L81-INFO: You can do some synchronization work across processes!
2014-03-14 17:08:50,299 -glock:glock.py-L56-DEBUG: The lock 'queue' the lock was released.
最初のglockが見えます.pyは17:08:50にロックを解除した、次のglockです.pyは17:08:50にロックを取得し,これが完全に実行可能であることを確認した.
[@tj-10-47 test]# ./glock.py
2014-03-14 17:08:46,873 -glock:glock.py-L70-INFO: Current log level is : DEBUG
2014-03-14 17:08:50,299 -glock:glock.py-L43-DEBUG: The lock 'queue' was obtained successfully.
2014-03-14 17:09:00,299 -glock:glock.py-L81-INFO: You can do some synchronization work across processes!
2014-03-14 17:09:00,300 -glock:glock.py-L56-DEBUG: The lock 'queue' the lock was released.
[@tj-10-47 test]#