python mysqlに基づいて実装された単純なキューおよびスパンロックインスタンスの詳細

5923 ワード

通常、マルチプロセスアプリケーションの開発では、複数のプロセスが同じリソース(臨界リソース)にアクセスすることは避けられません.この場合、グローバルなロックを追加することで、リソースの同期アクセスを実現する必要があります(つまり、同じ時間に1つのプロセスしかリソースにアクセスできません).
例を挙げると次のようになります.
mysqlを使用してタスクキューを実装すると、次のようになります.
1.次のようにキュー・タスクを格納するためのJobテーブルをMysqlに作成します.

create table jobs(
  id auto_increment not null primary key,
  message text not null,
  job_status not null default 0
);


Messageはタスク情報を格納するために使用されますjob_statusは、タスクステータスを識別するために使用されます.0:キュー、1:キューが2つしかないと仮定します.生産者プロセスがあり、jobテーブルに新しいデータを配置し、キューを作ります.

insert into jobs(message) values('msg1');

3.複数の消費者プロセスがあると仮定し、jobテーブルからキュー情報を取得し、次の操作を行います.

select * from jobs where job_status=0 order by id asc limit 1;
update jobs set job_status=1 where id = ?; -- id        id

4.プロセスにまたがるロックがない場合、2つの消費者プロセスは同時に重複したメッセージを受け取り、1つのメッセージが複数回消費される可能性がある.このような状況は私たちが望んでいないので、プロセス間のロックを実現する必要があります.
==============================================
プロセスにまたがるロックの実装といえば、主にいくつかの実装方法があります.
(1)信号量(2)ファイルロックfcntl(3)socket(ポート番号バインド)(4)signalといういくつかの方法はそれぞれメリットとデメリットがあり、全体的に前の2つの方法が多いかもしれませんが、ここでは詳しくは言いません.資料を調べてみてください.資料を調べたところmysqlにはロックの実現があり、性能に対する要求があまり高くない応用シーンに適しており、大同時の分散アクセスにボトルネックがある可能性がある.これに対してpythonでdemoを実現した.ファイル名:glock.py

#!/usr/bin/env python2.7 
# 
# -*- coding:utf-8 -*- 
# 
#  Desc  : 
# 
import logging, time 
import MySQLdb 
class Glock: 
  def __init__(self, db): 
    self.db = db 
  def _execute(self, sql): 
    cursor = self.db.cursor() 
    try: 
      ret = None 
      cursor.execute(sql) 
      if cursor.rowcount != 1: 
        logging.error("Multiple rows returned in mysql lock function.") 
        ret = None 
      else: 
        ret = cursor.fetchone() 
      cursor.close() 
      return ret 
    except Exception, ex: 
      logging.error("Execute sql \"%s\" failed! Exception: %s", sql, str(ex)) 
      cursor.close() 
      return None 
  def lock(self, lockstr, timeout): 
    sql = "SELECT GET_LOCK('%s', %s)" % (lockstr, timeout) 
    ret = self._execute(sql) 
 
    if ret[0] == 0: 
      logging.debug("Another client has previously locked '%s'.", lockstr) 
      return False 
    elif ret[0] == 1: 
      logging.debug("The lock '%s' was obtained successfully.", lockstr) 
      return True 
    else: 
      logging.error("Error occurred!") 
      return None 
  def unlock(self, lockstr): 
    sql = "SELECT RELEASE_LOCK('%s')" % (lockstr) 
    ret = self._execute(sql) 
    if ret[0] == 0: 
      logging.debug("The lock '%s' the lock is not released(the lock was not established by this thread).", lockstr) 
      return False 
    elif ret[0] == 1: 
      logging.debug("The lock '%s' the lock was released.", lockstr) 
      return True 
    else: 
      logging.error("The lock '%s' did not exist.", lockstr) 
      return None 
#Init logging 
def init_logging(): 
  sh = logging.StreamHandler() 
  logger = logging.getLogger() 
  logger.setLevel(logging.DEBUG) 
  formatter = logging.Formatter('%(asctime)s -%(module)s:%(filename)s-L%(lineno)d-%(levelname)s: %(message)s') 
  sh.setFormatter(formatter) 
  logger.addHandler(sh) 
  logging.info("Current log level is : %s",logging.getLevelName(logger.getEffectiveLevel())) 
def main(): 
  init_logging() 
  db = MySQLdb.connect(host='localhost', user='root', passwd='') 
  lock_name = 'queue' 
 
  l = Glock(db) 
 
  ret = l.lock(lock_name, 10) 
  if ret != True: 
    logging.error("Can't get lock! exit!") 
    quit() 
  time.sleep(10) 
  logging.info("You can do some synchronization work across processes!") 
  ##TODO 
  ## you can do something in here ## 
  l.unlock(lock_name) 
if __name__ == "__main__": 
  main() 


main関数:l.lock(lock_name,10)では、10はtimeoutを表す時間が10秒であり、10秒でロックが取れない場合は戻り、後の操作を実行します.このdemoでは,TODOがマークされている場所で,消費者がjobテーブルからメッセージを取り出す論理をここに置くことができる.すなわち分割線以上のものである.
2.複数の消費者プロセスがあると仮定し、jobテーブルからキュー情報を取得し、次の操作を行います.

select * from jobs where job_status=0 order by id asc limit 1;
update jobs set job_status=1 where id = ?; -- id        id

これにより,複数のプロセスが臨界リソースにアクセスする際の同期が保証され,データの一貫性が保証される.テスト時にglockを2つ起動します.py、結果は次のとおりです.

[@tj-10-47 test]# ./glock.py  
2014-03-14 17:08:40,277 -glock:glock.py-L70-INFO: Current log level is : DEBUG 
2014-03-14 17:08:40,299 -glock:glock.py-L43-DEBUG: The lock 'queue' was obtained successfully. 
2014-03-14 17:08:50,299 -glock:glock.py-L81-INFO: You can do some synchronization work across processes! 
2014-03-14 17:08:50,299 -glock:glock.py-L56-DEBUG: The lock 'queue' the lock was released. 


最初のglockが見えます.pyは17:08:50にロックを解除した、次のglockです.pyは17:08:50にロックを取得し,これが完全に実行可能であることを確認した.

[@tj-10-47 test]# ./glock.py 
2014-03-14 17:08:46,873 -glock:glock.py-L70-INFO: Current log level is : DEBUG
2014-03-14 17:08:50,299 -glock:glock.py-L43-DEBUG: The lock 'queue' was obtained successfully.
2014-03-14 17:09:00,299 -glock:glock.py-L81-INFO: You can do some synchronization work across processes!
2014-03-14 17:09:00,300 -glock:glock.py-L56-DEBUG: The lock 'queue' the lock was released.
[@tj-10-47 test]#