3つの方法で分散ロックを実現
43288 ワード
方案一:数据库乐观锁
乐观锁通常实现基于数据版本(version)的记录机制实现的,比如有一张红包表(t_bonus),有一个字段(left_count)记录礼物的剩余个数,用户每领取一个奖品,对应的left_count减1,在并发的情况下如何要保证left_count不为负数,乐观锁的实现方式为在红包表上添加一个版本号字段(version),默认为0。
异常实现流程
-- -- 1 , left_count 1, select * from t_bonus where id = 10001 and left_count > 0 -- 2 , left_count 1, select * from t_bonus where id = 10001 and left_count > 0 -- 1 , left_count 0, update t_bonus set left_count = left_count - 1 where id = 10001 -- 2 , left_count -1, update t_bonus set left_count = left_count - 1 where id = 10001
楽観的ロックによる実現 -- ALTER TABLE table ADD COLUMN version INT DEFAULT '0' NOT NULL AFTER t_bonus; -- 1 , left_count 1, , 1234 select left_count, version from t_bonus where id = 10001 and left_count > 0 -- 2 , left_count 1, , 1234 select left_count, version from t_bonus where id = 10001 and left_count > 0 -- 1, version 1235,update 1, update t_bonus set version = 1235, left_count = left_count-1 where id = 10001 and version = 1234 -- 2, version 1235,udpate 0, , update t_bonus set version = 1235, left_count = left_count-1 where id = 10001 and version = 1234
シナリオ2:Redisベースの分散ロック
SETNXコマンド(SET if Not eXists)構文:SETNX key value機能:原子的操作、keyが存在しない場合のみ、keyの値をvalueに設定し、1を返します.与えられたkeyが既に存在する場合、SETNXは何もせず、0を返す.Expireコマンド構文:expire(key,expireTime)機能:key設定期限切れGETSETコマンド構文:GETSET key value機能:所与のkeyの値をvalueに設定し、keyの古い値(old value)を返し、keyが存在するが文字列タイプではない場合、エラーを返し、keyが存在しない場合、nilを返します.GETコマンド構文:GET key機能:keyに関連付けられた文字列値を返し、keyが存在しない場合は特殊値nilを返します.DELコマンド構文:DELキー[KEY...]機能:指定された1つ以上のキーを削除すると、存在しないキーは無視されます.
1つ目:redisのsetnx()、expire()メソッドを使用して、分散ロック setnx(lockkey,1)が0を返すと、占有失敗を示す.1を返すと、占有成功 となります. expire()コマンドは、デッドロックの問題を回避するためにlockkeyにタイムアウト時間を設定します. ビジネスコードを実行した後、deleteコマンドでkeyを削除できます.
この案は実際には日常の仕事の需要を解決することができるが、技術案の検討から言えば、まだいくつかの改善できるところがあるかもしれない.たとえば、第1ステップsetnxの実行に成功した後、expire()コマンドの実行に成功する前にダウンタイムが発生した場合、デッドロックの問題が発生します.
2つ目は、redisのsetnx()、get()、getset()メソッドを使用して、分散ロックに使用し、デッドロックの問題を解決する setnx(lockkey、現在時間+期限切れタイムアウト時間)は、1を返すとロックを取得できます.0を返すとロックが取得されず、2に移行します. get(lockkey)は、値oldExpireTimeを取得し、このvalue値を現在のシステム時間と比較し、現在のシステム時間よりも小さい場合、このロックがタイムアウトしたと判断し、他のリクエストの再取得を許可し、3に移行することができる. newExpireTime=現在時刻+期限切れタイムアウト時間を計算し、getset(lockkey,newExpireTime)は現在のlockkeyの値currentExpireTimeを返します. currentExpireTimeとoldExpireTimeが等しいかどうかを判断し、等しい場合は現在のgetset設定に成功し、ロックが取得されたことを示します.等しくない場合、このロックがまた別のリクエストによって取得されたことを示す場合、現在のリクエストは直接失敗に戻るか、再試行を続けることができます. ロックを取得した後、現在のスレッドは自分の業務処理を開始することができ、処理が完了した後、自分の処理時間とロック設定に対するタイムアウト時間を比較し、ロック設定のタイムアウト時間より小さい場合、delete解放ロックを直接実行する.ロック設定のタイムアウト時間より大きい場合は、再ロックして処理する必要はありません. import cn.com.tpig.cache.redis.RedisService; import cn.com.tpig.utils.SpringUtils; /** * Created by IDEA * User: shma1664 * Date: 2016-08-16 14:01 * Desc: redis */ public final class RedisLockUtil { private static final int defaultExpire = 60; private RedisLockUtil() { // } /** * * @param key redis key * @param expire , * @return true: ,false, */ public static boolean lock(String key, int expire) { RedisService redisService = SpringUtils.getBean(RedisService.class); long status = redisService.setnx(key, "1"); if(status == 1) { redisService.expire(key, expire); return true; } return false; } public static boolean lock(String key) { return lock2(key, defaultExpire); } /** * * @param key redis key * @param expire , * @return true: ,false, */ public static boolean lock2(String key, int expire) { RedisService redisService = SpringUtils.getBean(RedisService.class); long value = System.currentTimeMillis() + expire; long status = redisService.setnx(key, String.valueOf(value)); if(status == 1) { return true; } long oldExpireTime = Long.parseLong(redisService.get(key, "0")); if(oldExpireTime < System.currentTimeMillis()) { // long newExpireTime = System.currentTimeMillis() + expire; long currentExpireTime = Long.parseLong(redisService.getSet(key, String.valueOf(newExpireTime))); if(currentExpireTime == oldExpireTime) { return true; } } return false; } public static void unLock1(String key) { RedisService redisService = SpringUtils.getBean(RedisService.class); redisService.del(key); } public static void unLock2(String key) { RedisService redisService = SpringUtils.getBean(RedisService.class); long oldExpireTime = Long.parseLong(redisService.get(key, "0")); if(oldExpireTime > System.currentTimeMillis()) { redisService.del(key); } } }
public void drawRedPacket(long userId) { String key = "draw.redpacket.userid:" + userId; boolean lock = RedisLockUtil.lock2(key, 60); if(lock) { try { // } finally { // RedisLockUtil.unLock(key); } } else { new RuntimeException(" "); } }
Spring AOPは注釈方式とSpELに基づいて開梱即用のredis分布式ロック戦略を実現するimport java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; /** * RUNTIME * * , VM , 。 * @author shma1664 * */ @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.METHOD) public @interface RedisLockable { String[] key() default ""; long expiration() default 60; }
import javax.annotation.Resource; import java.lang.reflect.Method; import com.autohome.api.dealer.util.cache.RedisClient; import com.google.common.base.Joiner; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.Signature; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Pointcut; import org.aspectj.lang.reflect.MethodSignature; import org.springframework.expression.EvaluationContext; import org.springframework.expression.Expression; import org.springframework.expression.ExpressionParser; import org.springframework.expression.spel.standard.SpelExpressionParser; import org.springframework.expression.spel.support.StandardEvaluationContext; import org.springframework.stereotype.Component; /** * Created by IDEA * User: mashaohua * Date: 2016-09-28 18:08 * Desc: */ @Aspect @Component public class RedisLockAop { @Resource private RedisClient redisClient; @Pointcut("execution(* com.autohome.api.dealer.tuan.service.*.*(..))") public void pointcut(){} @Around("pointcut()") public Object doAround(ProceedingJoinPoint point) throws Throwable{ Signature signature = point.getSignature(); MethodSignature methodSignature = (MethodSignature) signature; Method method = methodSignature.getMethod(); String targetName = point.getTarget().getClass().getName(); String methodName = point.getSignature().getName(); Object[] arguments = point.getArgs(); if (method != null && method.isAnnotationPresent(RedisLockable.class)) { RedisLockable redisLock = method.getAnnotation(RedisLockable.class); long expire = redisLock.expiration(); String redisKey = getLockKey(targetName, methodName, redisLock.key(), arguments); boolean isLock = RedisLockUtil.lock2(redisKey, expire); if(!isLock) { try { return point.proceed(); } finally { unLock2(redisKey); } } else { throw new RuntimeException(" , "); } } return point.proceed(); } private String getLockKey(String targetName, String methodName, String[] keys, Object[] arguments) { StringBuilder sb = new StringBuilder(); sb.append("lock.").append(targetName).append(".").append(methodName); if(keys != null) { String keyStr = Joiner.on(".").skipNulls().join(keys); String[] parameters = ReflectParamNames.getNames(targetName, methodName); ExpressionParser parser = new SpelExpressionParser(); Expression expression = parser.parseExpression(keyStr); EvaluationContext context = new StandardEvaluationContext(); int length = parameters.length; if (length > 0) { for (int i = 0; i < length; i++) { context.setVariable(parameters[i], arguments[i]); } } String keysValue = expression.getValue(context, String.class); sb.append("#").append(keysValue); } return sb.toString(); }
<dependency>
<groupId>org.javassistgroupId>
<artifactId>javassistartifactId>
<version>3.18.1-GAversion>
dependency>
import javassist.*; import javassist.bytecode.CodeAttribute; import javassist.bytecode.LocalVariableAttribute; import javassist.bytecode.MethodInfo; import org.apache.log4j.Logger; /** * Created by IDEA * User: mashaohua * Date: 2016-09-28 18:39 * Desc: */ public class ReflectParamNames { private static Logger log = Logger.getLogger(ReflectParamNames.class); private static ClassPool pool = ClassPool.getDefault(); static{ ClassClassPath classPath = new ClassClassPath(ReflectParamNames.class); pool.insertClassPath(classPath); } public static String[] getNames(String className,String methodName) { CtClass cc = null; try { cc = pool.get(className); CtMethod cm = cc.getDeclaredMethod(methodName); // javaassist MethodInfo methodInfo = cm.getMethodInfo(); CodeAttribute codeAttribute = methodInfo.getCodeAttribute(); LocalVariableAttribute attr = (LocalVariableAttribute) codeAttribute.getAttribute(LocalVariableAttribute.tag); if (attr == null) return new String[0]; int begin = 0; String[] paramNames = new String[cm.getParameterTypes().length]; int count = 0; int pos = Modifier.isStatic(cm.getModifiers()) ? 0 : 1; for (int i = 0; i < attr.tableLength(); i++){ // , windows linux , , , this if (attr.variableName(i).equals("this")){ begin = i; break; } } for (int i = begin+1; i <= begin+paramNames.length; i++){ paramNames[count] = attr.variableName(i); count++; } return paramNames; } catch (Exception e) { e.printStackTrace(); }finally{ try { if(cc != null) cc.detach(); } catch (Exception e2) { log.error(e2.getMessage()); } } return new String[0]; } }
分散ロックを使用する必要がある場所に注記を追加する/** * * redis , , SpEL * redisLockKey:lock.com.autohome.api.dealer.tuan.service.impl.drawBonus#orderId * @param orderId id * @return */ @RedisLockable(key = {"#orderId"}, expiration = 120) @Override public BonusConvertBean drawBonus(Integer orderId) throws BonusException{ // }
第3のスキーム:Zookeeperベースの分散ロック
ノード名の一意性による排他ロック
ZooKeeperメカニズムは、同じディレクトリの下に一意のファイル名しかないことを規定しており、zookeeper上のznodeはロックと見なされ、createznodeによって実現される.すべてのクライアントは/lock/${lock_name}_を作成します.lockノードは、最終的に作成に成功したクライアントがこのロックを所有しており、作成に失敗した場合は、リスニングを継続して待つか、異常を投げ出して独占ロックを実現するかを選択することができます.package com.shma.example.zookeeper.lock; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; /** * Created by IDEA * User: mashaohua * Date: 2016-09-30 16:09 * Desc: */ public class ZookeeperLock implements Lock, Watcher { private ZooKeeper zk; private String root = "/locks";// private String lockName;// private String myZnode;// private int sessionTimeout = 30000; private List exception = new ArrayList(); /** * , config zookeeper * @param config 127.0.0.1:2181 * @param lockName ,lockName lock */ public ZookeeperLock(String config, String lockName){ this.lockName = lockName; // try { zk = new ZooKeeper(config, sessionTimeout, this); Stat stat = zk.exists(root, false); if(stat == null){ // zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (IOException e) { exception.add(e); } catch (KeeperException e) { exception.add(e); } catch (InterruptedException e) { exception.add(e); } } @Override public void lock() { if(exception.size() > 0){ throw new LockException(exception.get(0)); } if(!tryLock()) { throw new LockException(" , "); } } @Override public void lockInterruptibly() throws InterruptedException { this.lock(); } @Override public boolean tryLock() { try { myZnode = zk.create(root + "/" + lockName, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); return true; } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } return false; } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return tryLock(); } @Override public void unlock() { try { zk.delete(myZnode, -1); myZnode = null; zk.close(); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } } @Override public Condition newCondition() { return null; } @Override public void process(WatchedEvent watchedEvent) { // } }
ZookeeperLock lock = null; try { lock = new ZookeeperLock("127.0.0.1:2182","test1"); lock.lock(); // } catch (LockException e) { throw e; } finally { if(lock != null) lock.unlock(); }
一時シーケンスノード制御シーケンスによる実現
/lockはすでに存在しており、すべてのクライアントがその下に一時的な順序番号付けディレクトリノードを作成し、masterを選択するのと同じように、番号が最小のロックを取得し、削除を終了し、順次便利である.アルゴリズムの考え方:ロック操作では、すべてのクライアントが/lockディレクトリの下で一時的な順序ノードを作成することができ、作成したクライアントが自分の作成ノードのシリアル番号が/lock/ディレクトリの下で最も小さいノードであることを発見した場合、ロックを取得します.そうでなければ、自分が作成したノードのシリアル番号より小さいノード(自分が作成したノードより小さい最大ノード)を監視し、待機に入ります.ロック解除操作では、作成したノードを削除するだけです.package com.shma.example.zookeeper.lock; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; /** * Created by IDEA * User: mashaohua * Date: 2016-09-30 16:09 * Desc: */ public class DistributedLock implements Lock, Watcher{ private ZooKeeper zk; private String root = "/locks";// private String lockName;// private String waitNode;// private String myZnode;// private CountDownLatch latch;// private int sessionTimeout = 30000; private List exception = new ArrayList(); /** * , config zookeeper * @param config 127.0.0.1:2181 * @param lockName ,lockName lock */ public DistributedLock(String config, String lockName){ this.lockName = lockName; // try { zk = new ZooKeeper(config, sessionTimeout, this); Stat stat = zk.exists(root, false); if(stat == null){ // zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); } } catch (IOException e) { exception.add(e); } catch (KeeperException e) { exception.add(e); } catch (InterruptedException e) { exception.add(e); } } /** * zookeeper */ public void process(WatchedEvent event) { if(this.latch != null) { this.latch.countDown(); } } public void lock() { if(exception.size() > 0){ throw new LockException(exception.get(0)); } try { if(this.tryLock()){ System.out.println("Thread " + Thread.currentThread().getId() + " " +myZnode + " get lock true"); return; } else{ waitForLock(waitNode, sessionTimeout);// } } catch (KeeperException e) { throw new LockException(e); } catch (InterruptedException e) { throw new LockException(e); } } public boolean tryLock() { try { String splitStr = "_lock_"; if(lockName.contains(splitStr)) throw new LockException("lockName can not contains \\u000B"); // myZnode = zk.create(root + "/" + lockName + splitStr, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(myZnode + " is created "); // List subNodes = zk.getChildren(root, false); // lockName List lockObjNodes = new ArrayList(); for (String node : subNodes) { String _node = node.split(splitStr)[0]; if(_node.equals(lockName)){ lockObjNodes.add(node); } } Collections.sort(lockObjNodes); System.out.println(myZnode + "==" + lockObjNodes.get(0)); if(myZnode.equals(root+"/"+lockObjNodes.get(0))){ // , return true; } // , 1 String subMyZnode = myZnode.substring(myZnode.lastIndexOf("/") + 1); waitNode = lockObjNodes.get(Collections.binarySearch(lockObjNodes, subMyZnode) - 1); } catch (KeeperException e) { throw new LockException(e); } catch (InterruptedException e) { throw new LockException(e); } return false; } public boolean tryLock(long time, TimeUnit unit) { try { if(this.tryLock()){ return true; } return waitForLock(waitNode,time); } catch (Exception e) { e.printStackTrace(); } return false; } private boolean waitForLock(String lower, long waitTime) throws InterruptedException, KeeperException { Stat stat = zk.exists(root + "/" + lower,true); // , , if(stat != null){ System.out.println("Thread " + Thread.currentThread().getId() + " waiting for " + root + "/" + lower); this.latch = new CountDownLatch(1); this.latch.await(waitTime, TimeUnit.MILLISECONDS); this.latch = null; } return true; } public void unlock() { try { System.out.println("unlock " + myZnode); zk.delete(myZnode,-1); myZnode = null; zk.close(); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } } public void lockInterruptibly() throws InterruptedException { this.lock(); } public Condition newCondition() { return null; } public class LockException extends RuntimeException { private static final long serialVersionUID = 1L; public LockException(String e){ super(e); } public LockException(Exception e){ super(e); } } }
https://github.com/shawntime/shawn-common-utils/tree/master/src/main/java/com/shawntime/common/lock
作者:ベンジャミン警官
リンク:https://www.jianshu.com/p/535efcab356d
出典:簡書
著作権は作者の所有である.商業転載は著者に連絡して許可を得てください.非商業転載は出典を明記してください.