Redis分散ロック実装-luaスクリプトとAOPの2つの方法に基づく

20633 ワード

1、分布式ロックのよくある実現方式


分散ロックには一般的に3つの実現方式がある:1.データベース楽観ロック;2.Redisベースの分散ロック;3.ZooKeeperベースの分散ロック.本論文ではredis実装に基づいて,使いやすいようにspring boot starter方式にカプセル化されたredis分散ロックも提供した.

2、分布式ロックの実現に必要な条件


分散ロックが使用可能であることを確認するには、少なくともロックの実装が次の4つの条件を満たしていることを確認します.
1)反発性.任意の時点で、ロックを保持できるクライアントは1つしかありません.2)デッドロックが発生しない再入性.ロックを保持している間にクラッシュし、アクティブにロックを解除しなくても、後続の他のクライアントがロックを追加できることを保証します.3)許容誤差がある.ほとんどのRedisノードが正常に動作している限り、クライアントはロックとロック解除を行うことができます.4)同一性.ロックとロック解除は同じクライアントでなければなりません.クライアントは自分で他の人のロックを解除することはできません.

3、luaスクリプト方式による実現


ロックは、あるリソースに対して、アクセスの反発性を保証します.実際の使用では、このリソースは一般的に文字列です.Redisを使用してロックを実装するには、主にリソースをRedisに配置し、その原子性を利用して、他のスレッドがアクセスするときに、Redisにすでにこのリソースが存在する場合、その後の操作は許可されません.Spring bootがRedisを使用する操作は主にRedisTemplateによって実現され、一般的な手順は以下の通りである.
1)ロックリソースをRedisに入れる(キーが存在しない場合には成功するのでsetIfAbsentメソッドを使用することに注意):
redisTemplate.opsForValue().setIfAbsent("key", "value");

2)有効期限の設定
redisTemplate.expire("key", 30000, TimeUnit.MILLISECONDS);

3)ロック解除
redisTemplate.delete("key");

一般に、このような実装はロックの要件を満たすことができるが、setIfAbsentメソッドが呼び出された後にスレッドが削除されると、ロックされたリソースに期限切れが設定されず、デフォルトでは期限切れにならない場合、このロックは常に存在する.したがって,ロックの設定とその有効期限の2つの操作の原子性を保証する必要があり,spring dataのRedisTemplateにはこのような方法はない.
しかしjedisにはこのような原子操作の方法があり,RedisTemplateのexecute法によりjedisにおける操作命令の対象を取得する必要があり,コードは以下の通りである.
String result = redisTemplate.execute(new RedisCallback() {
    @Override
    public String doInRedis(RedisConnection connection) throws DataAccessException {
        JedisCommands commands = (JedisCommands) connection.getNativeConnection();
        return commands.set(key, "     ", "NX", "PX", expire);
    }
});

注意:Redisはバージョン2.6.12からsetコマンドがNX、PXなどのパラメータをサポートし、setnx、setex、psetexコマンドの効果を達成します.ドキュメントを参照:http://doc.redisfans.com/string/set.html
NX:ロックリソースが存在しない場合にのみSETに成功することを示します.Redisの原子性を利用して,ロックを取得できるのは最初の要求スレッドのみであり,その後のすべてのスレッドはロックリソースが解放されるまでロックを取得できないことを保証した.
PX:expireは、ロックされたリソースの自動有効期限をミリ秒単位で表します.具体的な有効期限は実際のシーンによって異なります.
これにより、ロックを取得する際に、Redis値の設定と期限切れ時間の原子性を保証することができ、前述した2回のRedis操作中に予期せぬロックが解放されないという問題を回避することができる.しかし、次のようなシーンの順序を考慮すると、問題が発生する可能性があります.
  • スレッドT 1は、ロック
  • を取得する.
  • スレッドT 1は、いくつかの理由により
  • が長時間ブロックするトラフィック動作を実行する.
  • ロックは自動的に期限切れになり、すなわちロックは自動的に
  • を解放する.
  • スレッドT 2取得ロック
  • スレッドT 1の業務操作が完了し、リリースロック(実はリリーススレッドT 2のロック)
  • このようなシナリオ順では、スレッドT 2のトラフィック動作は、実際にはロックされて保護機構を提供しない.したがって、各スレッドがロックを解放するときは、自分のロックしか解放できません.すなわち、ロックには所有者のマークが必要であり、ロックを解放する原子的な操作も保証する必要があります.
    したがって,ロックを取得する際に,現在のスレッドにランダムで一意でない列を生成してRedisを入れることができる.ロックを解除するときは、ロックに対応する値がスレッドの値と同じかどうかを判断し、同じ場合に削除操作を行います.
    Redisは2.6.0からLuaインタプリタを内蔵し、EVALコマンドを使用してLuaスクリプトを評価できます.http://doc.redisfans.com/script/eval.html
    したがって、Luaスクリプトによってロックを解除する原子操作を達成することができ、Luaスクリプトを以下のように定義することができます.
    if redis.call("get",KEYS[1]) == ARGV[1] then
        return redis.call("del",KEYS[1])
    else
        return 0
    end
    

    RedisTemplateを使用して実行されるコードは次のとおりです.
    //   Lua    Redis   value key,               redis                  
    // spring          ,                  ,       redis connection     
    Long result = redisTemplate.execute(new RedisCallback() {
        public Long doInRedis(RedisConnection connection) throws DataAccessException {
            Object nativeConnection = connection.getNativeConnection();
            //                     ,         ,        
            //     
            if (nativeConnection instanceof JedisCluster) {
                return (Long) ((JedisCluster) nativeConnection).eval(UNLOCK_LUA, keys, args);
            }
    
            //     
            else if (nativeConnection instanceof Jedis) {
                return (Long) ((Jedis) nativeConnection).eval(UNLOCK_LUA, keys, args);
            }
            return 0L;
        }
    });
    

    コードにはクラスタモードとスタンドアロンモードがあり、どちらの方法、パラメータも同じである.なぜならspringパッケージの実行スクリプトの方法(RedisConnectionインタフェースがRedisScriptingCommandsインタフェースのevalメソッドに継承されている)では、クラスタモードの方法は実行スクリプトをサポートしない異常(実際にはサポートされているが)を直接投げ出しているため、Redisのconnectionを取得してスクリプトを実行するしかない.JedisClusterとJedisのメソッドは共通のインタフェースを実現していないので、別々に呼び出すしかありません.
    Springパッケージのクラスタモード実行スクリプトメソッドソース:
    # JedisClusterConnection.java
    /**
     * (non-Javadoc)
     * @see org.springframework.data.redis.connection.RedisScriptingCommands#eval(byte[], org.springframework.data.redis.connection.ReturnType, int, byte[][])
     */
    @Override
    public  T eval(byte[] script, ReturnType returnType, int numKeys, byte[]... keysAndArgs) {
        throw new InvalidDataAccessApiUsageException("Eval is not supported in cluster environment.");
    }
    

    これで、比較的信頼性の高いRedis分散ロックを完了しましたが、クラスタモードの極端な状況では、次のようなシーン順序(本明細書では詳細に説明しません):
  • スレッドT 1取得ロック成功
  • Redisのmasterノードを切り、slaveは自動的に
  • を突き上げる.
  • スレッドT 2がロックを取得すると、slaveノードからロックが存在するか否かが判断されるが、Redisのmaster slaveレプリケーションは非同期であるため、このときスレッドT 2がロック
  • に成功する可能性がある.
    コードは次のように実装されます.
    3.1コンポーネント依存まず我々はMavenを通じてJedisオープンソースコンポーネントを導入し、pom.xmlファイルには、次のコードが追加されます.
    
        redis.clients
        jedis
        2.9.0
    
    

    3.2具体コード
    1つ目の方法:
    import com.alibaba.fastjson.JSON;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.data.redis.core.RedisCallback;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.stereotype.Component;
    import redis.clients.jedis.Jedis;
    import redis.clients.jedis.JedisCluster;
    import redis.clients.jedis.JedisCommands;
    
    import java.util.ArrayList;
    import java.util.List;
    
    /**
     * @description redis       
     * @date 2019/5/15 9:29
     */
    @Component
    public class RedisDistributedLock {
        private final Logger logger = LoggerFactory.getLogger(RedisDistributedLock.class);
    
        @Autowired
        private RedisTemplate redisTemplate;
    
        /**
         *  key    value,    key   ,   SETNX
         */
        private static final String SET_IF_NOT_EXIST = "NX";
    
        /**
         *         key     ,   EXPIRE key milliseconds
         */
        private static String SET_WITH_EXPIRE_TIME = "PX";
    
        /**
         *        key     ,   EXPIRE key seconds
         */
        private static String SET_WITH_EXPIRE_TIME_E = "EX";
    
        /**
         *                
         */
        public static final String OK = "OK";
    
        /**
         *   lua    redis   value key
         *   lua    :        
         */
        private static final String UNLOCK_LUA = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
    
        /**
         *         
         * @param key    
         * @param value      id(         uuid,              )
         * @param expire     
         */
        public boolean lock(String key, String value, long expire) {
            try {
                RedisCallback callback = (connection) -> {
                    JedisCommands commands = (JedisCommands) connection.getNativeConnection();
                    return commands.set(key, value, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expire);
                };
                String result = redisTemplate.execute(callback);
                //      OK,   null
                return OK.equals(result);
            } catch (Exception e) {
                logger.error("redis    ,key :" + key, e);
            }
            return false;
        }
    
        /**
         *    key  
         */
        public String get(String key) {
            try {
                RedisCallback callback = (connection) -> {
                    JedisCommands commands = (JedisCommands) connection.getNativeConnection();
                    return commands.get(key);
                };
                String result = redisTemplate.execute(callback);
                return result;
            } catch (Exception e) {
                logger.error("redis     ,key :" + key, e);
            }
            return "";
        }
    
        /**
         *    
         * @param key    
         * @param value      id(         uuid,              )
         */
        public boolean unLock(String key,String value) {
            //       ,                      ,                 ,        
            try {
                List keys = new ArrayList<>();
                keys.add(key);
                List values = new ArrayList<>();
                values.add(value);
    
                //   lua    redis   value key,               redis                  
                // spring          ,                  ,       redis connection     
                RedisCallback callback = (connection) -> {
                    Object nativeConnection = connection.getNativeConnection();
                    //                     ,         ,        
                    //     
                    if (nativeConnection instanceof JedisCluster) {
                        return (Long) ((JedisCluster) nativeConnection).eval(UNLOCK_LUA, keys, values);
                    }
                    //     
                    else if (nativeConnection instanceof Jedis) {
                        return (Long) ((Jedis) nativeConnection).eval(UNLOCK_LUA, keys, values);
                    }
                    return 0L;
                };
                Long result = redisTemplate.execute(callback);
                return result != null && result >= 0;
            } catch (Exception e) {
                logger.error("redis    ,key :" + key, e);
            }
            return false;
        }
    
        public boolean unLockList(List keys,List values) {
            //       ,                      ,                 ,        
            try {
    
                //   lua    redis   value key,               redis                  
                // spring          ,                  ,       redis connection     
                RedisCallback callback = (connection) -> {
                    Object nativeConnection = connection.getNativeConnection();
                    //                     ,         ,        
                    //     
                    if (nativeConnection instanceof JedisCluster) {
                        return (Long) ((JedisCluster) nativeConnection).eval(UNLOCK_LUA, keys, values);
                    }
                    //     
                    else if (nativeConnection instanceof Jedis) {
                        return (Long) ((Jedis) nativeConnection).eval(UNLOCK_LUA, keys, values);
                    }
                    return 0L;
                };
                Long result = redisTemplate.execute(callback);
                return result != null && result > 0;
            } catch (Exception e) {
                logger.error("redis    ,key :" + JSON.toJSONString(keys), e);
            }
            return false;
        }
    
    }
    

    第2の方法:後で他の方法で分散ロックを実現するために拡張できるように、インタフェースと抽象クラスを定義し、コードは以下の通りです.
    /**
     *       
     * @Author xf
     * @Date 2019/5/22 20:39
     */
    public interface DistributedLock {
        public static final long TIMEOUT_MILLIS = 30000;
    
        public static final int RETRY_TIMES = Integer.MAX_VALUE;
    
        public static final long SLEEP_MILLIS = 500;
    
        public boolean lock(String key);
    
        public boolean lock(String key, int retryTimes);
    
        public boolean lock(String key, int retryTimes, long sleepMillis);
    
        public boolean lock(String key, long expire);
    
        public boolean lock(String key, long expire, int retryTimes);
    
        public boolean lock(String key, long expire, int retryTimes, long sleepMillis);
    
        public boolean releaseLock(String key);
    }
    
    /**
     *        ,       ,          
     * @Author xf
     * @Date 2019/5/22 20:41
     */
    public abstract class AbstractDistributedLock implements DistributedLock{
        @Override
        public boolean lock(String key) {
            return lock(key, TIMEOUT_MILLIS, RETRY_TIMES, SLEEP_MILLIS);
        }
    
        @Override
        public boolean lock(String key, int retryTimes) {
            return lock(key, TIMEOUT_MILLIS, retryTimes, SLEEP_MILLIS);
        }
    
        @Override
        public boolean lock(String key, int retryTimes, long sleepMillis) {
            return lock(key, TIMEOUT_MILLIS, retryTimes, sleepMillis);
        }
    
        @Override
        public boolean lock(String key, long expire) {
            return lock(key, expire, RETRY_TIMES, SLEEP_MILLIS);
        }
    
        @Override
        public boolean lock(String key, long expire, int retryTimes) {
            return lock(key, expire, retryTimes, SLEEP_MILLIS);
        }
    }
    
    /**
     *        
     * @Author xf
     * @Date 2019/5/22 20:43
     */
    public class RedisDistributedLock extends AbstractDistributedLock{
    
        private final Logger logger = LoggerFactory.getLogger(RedisDistributedLock.class);
    
        private RedisTemplate redisTemplate;
    
        private ThreadLocal lockFlag = new ThreadLocal();
    
        public static final String UNLOCK_LUA;
    
        static {
            StringBuilder sb = new StringBuilder();
            sb.append("if redis.call('get',KEYS[1]) == ARGV[1] ");
            sb.append("then ");
            sb.append("    return redis.call('del',KEYS[1]) ");
            sb.append("else ");
            sb.append("    return 0 ");
            sb.append("end ");
            UNLOCK_LUA = sb.toString();
        }
    
        public RedisDistributedLock(RedisTemplate redisTemplate) {
            super();
            this.redisTemplate = redisTemplate;
        }
    
        @Override
        public boolean lock(String key, long expire, int retryTimes, long sleepMillis) {
            boolean result = setRedis(key, expire);
            //        ,             
            while((!result) && retryTimes-- > 0){
                try {
                    logger.debug("lock failed, retrying..." + retryTimes);
                    Thread.sleep(sleepMillis);
                } catch (InterruptedException e) {
                    return false;
                }
                result = setRedis(key, expire);
            }
            return result;
        }
    
        private boolean setRedis(String key, long expire) {
            try {
                String result = redisTemplate.execute(new RedisCallback() {
                    @Override
                    public String doInRedis(RedisConnection connection) throws DataAccessException {
                        JedisCommands commands = (JedisCommands) connection.getNativeConnection();
                        String uuid = UUID.randomUUID().toString();
                        lockFlag.set(uuid);
                        return commands.set(key, uuid, "NX", "PX", expire);
                    }
                });
                return !StringUtils.isEmpty(result);
            } catch (Exception e) {
                logger.error("set redis occured an exception", e);
            }
            return false;
        }
    
        @Override
        public boolean releaseLock(String key) {
            //       ,                      ,                 ,        
            try {
                List keys = new ArrayList();
                keys.add(key);
                List args = new ArrayList();
                args.add(lockFlag.get());
    
                //   lua    redis   value key,               redis                  
                // spring          ,                  ,       redis connection     
    
                Long result = redisTemplate.execute(new RedisCallback() {
                    public Long doInRedis(RedisConnection connection) throws DataAccessException {
                        Object nativeConnection = connection.getNativeConnection();
                        //                     ,         ,        
                        //     
                        if (nativeConnection instanceof JedisCluster) {
                            return (Long) ((JedisCluster) nativeConnection).eval(UNLOCK_LUA, keys, args);
                        }
    
                        //     
                        else if (nativeConnection instanceof Jedis) {
                            return (Long) ((Jedis) nativeConnection).eval(UNLOCK_LUA, keys, args);
                        }
                        return 0L;
                    }
                });
    
                return result != null && result > 0;
            } catch (Exception e) {
                logger.error("release lock occured an exception", e);
            }
            return false;
        }
    }
    
    

    4、AOP方式による実現


    実際の使用では、分散ロックをカプセル化してメソッドレベルで使用できます.これにより、ロックをどこからでも取得したり解放したりする必要がなくなり、より便利に使用できます.
    まず注釈を定義します.
    /**
     * @Author xf
     * @Date 2019/5/22 20:50
     */
    @Target(ElementType.METHOD)
    @Retention(RetentionPolicy.RUNTIME)
    @Inherited
    public @interface RedisLock {
    
        /**     ,redis key*/
        String value() default "default";
    
        /**     ,    */
        long keepMills() default 30000;
    
        /**          */
        LockFailAction action() default LockFailAction.CONTINUE;
    
        public enum LockFailAction{
            /**    */
            GIVEUP,
            /**    */
            CONTINUE;
        }
    
        /**        ,  GIVEUP    */
        long sleepMills() default 200;
    
        /**     */
        int retryTimes() default 5;
    
    }
    

    分散ロックをアセンブリするbean
    /**
     * @Author xf
     * @Date 2019/5/22 20:55
     */
    @Configuration
    @AutoConfigureAfter(RedisAutoConfiguration.class)
    public class DistributedLockAutoConfiguration {
    
        @Bean
        @ConditionalOnBean(RedisTemplate.class)
        public DistributedLock redisDistributedLock(RedisTemplate redisTemplate){
            return new RedisDistributedLock(redisTemplate);
        }
    
    }
    

    断面の定義(spring boot構成方法)
    /**
     * @Author xf
     * @Date 2019/5/22 20:57
     */
    @Aspect
    @Configuration
    @ConditionalOnClass(DistributedLock.class)
    @AutoConfigureAfter(DistributedLockAutoConfiguration.class)
    public class DistributedLockAspectConfiguration {
    	
    	private final Logger logger = LoggerFactory.getLogger(DistributedLockAspectConfiguration.class);
    	
    	@Autowired
    	private DistributedLock distributedLock;
    	
    	private ExpressionParser parser = new SpelExpressionParser();
    
    	private LocalVariableTableParameterNameDiscoverer discoverer = new LocalVariableTableParameterNameDiscoverer();
    
    	@Pointcut("@annotation(com.itcode.lock.redis.spring.boot.autoconfigure.annotations.LockAction)")
    	private void lockPoint(){
    		
    	}
    	
    	@Around("lockPoint()")
    	public Object around(ProceedingJoinPoint pjp) throws Throwable{
    		Method method = ((MethodSignature) pjp.getSignature()).getMethod();
    		LockAction lockAction = method.getAnnotation(LockAction.class);
    		String key = lockAction.value();
    		Object[] args = pjp.getArgs();
    		key = parse(key, method, args);
    		
    		
    		int retryTimes = lockAction.action().equals(LockFailAction.CONTINUE) ? lockAction.retryTimes() : 0;
    		boolean lock = distributedLock.lock(key, lockAction.keepMills(), retryTimes, lockAction.sleepMills());
    		if(!lock) {
    			logger.debug("get lock failed : " + key);
    			return null;
    		}
    		
    		//   ,    ,   
    		logger.debug("get lock success : " + key);
    		try {
    			return pjp.proceed();
    		} catch (Exception e) {
    			logger.error("execute locked method occured an exception", e);
    			throw e;
    		} finally {
    			boolean releaseResult = distributedLock.releaseLock(key);
    			logger.debug("release lock : " + key + (releaseResult ? " success" : " failed"));
    		}
    	}
    	
    	/**
    	 * @description   spring EL   
    	 * @author xf
    	 * @date 2019/5/22 20:57
    	 * @param key    
    	 * @param method   
    	 * @param args     
    	 * @return
    	 */
    	private String parse(String key, Method method, Object[] args) {
    		String[] params = discoverer.getParameterNames(method);
    		EvaluationContext context = new StandardEvaluationContext();
    		for (int i = 0; i < params.length; i ++) {
    			context.setVariable(params[i], args[i]);
    		}
    		return parser.parseExpression(key).getValue(context, String.class);
    	}
    }
    
    

    Spring boot starterはresources/META-INFにspringを追加する必要がある.factoriesファイル
    # Auto Configure
    org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
    com.itcode.lock.redis.spring.boot.autoconfigure.DistributedLockAutoConfiguration,\
    com.itcode.lock.redis.spring.boot.autoconfigure.DistributedLockAspectConfiguration
    

    このようにパッケージングした後にspring bootを用いて開発したプロジェクトは,このstarterに直接依存して,方法にRedisLock注釈を付けて分散ロックの機能を実現することができ,もちろん自分で制御する必要があれば,直接分散ロックのbeanに注入すればよい.
    @Autowired
    private DistributedLock distributedLock;
    

    他の分散ロック実装を使用する必要がある場合は、AbstractDistributedLockを継承してロックの取得とロックの解放の方法を実装すればよい.
    参照先:https://www.cnblogs.com/linjiqin/p/8003838.html http://www.cnblogs.com/number7/p/8320259.html