redisシリーズ-問題解決策の更新(データ整合性補正)


redisシリーズ-問題解決策の更新
redisはキャッシュをしてmysqlデータベースの圧力を軽減すると同時に、更新によって二重ライブラリのデータが一致しない場合があります.私がここで取った案は:
mysqlはデータを更新後、redisキャッシュ(これも多くの企業で採用されている)を削除し、ユーザーが次回アクセス時にキャッシュにヒットせず、mysqlにアクセスし、新しいredisキャッシュを生成する.
ほとんどの場合は安全であるが、絶対安全ではない、不一致も生じるため、タイミングデータ補正を行う.ポリシー:
先にmysqlでクエリーデータredisのデータを削除mysqlのデータをredisに書き込む
1.1タイミングタスクを設定ここでは皆さんの理解を容易にするために、私が置いたプロジェクトの初期化のコードは、タイミングタスクの登録を見るだけでいいです.
from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.schedulers.background import BackgroundScheduler
from flask import Flask

from schedule.statistic import fix_statistic


def create_flask_app(config, enable_config_file=False):
    """
      Flask  
    """
    app = Flask(__name__)
    app.config.from_object(config)
    if enable_config_file:
        from utils import constants
        #       
        app.config.from_envvar(constants.GLOBAL_SETTING_ENV_NAME, silent=True)

    return app


def create_app(config, enable_config_file=False):
    """
      flask           

    :param config:    
    :param enable_config_file:                         
    :return: flask  
    """
    app = create_flask_app(config, enable_config_file)

    #           
    from utils.converters import register_converters
    register_converters(app)

    #   redis  
    from redis.sentinel import Sentinel
    _sentinel = Sentinel(app.config['REDIS_SENTINELS'])
    #   redis      
    app.redis_master = _sentinel.master_for(app.config['REDIS_SENTINEL_SERVICE_NAME'])
    app.redis_slave = _sentinel.slave_for(app.config['REDIS_SENTINEL_SERVICE_NAME'])

    #   redis  
    from rediscluster import StrictRedisCluster
    app.redis_cluster = StrictRedisCluster(startup_nodes=app.config['REDIS_CLUSTER'])

    #   myql   
    from models import db
    db.init_app(app)

    #     
    from utils.logging import create_logger
    create_logger(app)

    #    
    from utils.limiter import limiter as lmt
    lmt.init_app(app)

    #   Snowflake ID worker
    from utils.snowflake.id_worker import IdWorker
    app.id_worker = IdWorker(app.config['DATACENTER_ID'],
                             app.config['WORKER_ID'],
                             app.config['SEQUENCE'])

    #       
    executor = ThreadPoolExecutor(max_workers=3)
    executors = {
        'default':executor
    }
    app.scheduler = BackgroundScheduler(executors=executors)
    #         3     
    # app.scheduler.add_job(fix_statistic, 'cron', hour=3)
    # date     
    app.scheduler.add_job(fix_statistic, 'date')
    app.scheduler.start()


    #       
    from utils.middlewares import jwt_authentication
    app.before_request(jwt_authentication)

    #         
    from .resources.user import user_bp
    app.register_blueprint(user_bp)

    #         
    from .resources.news import news_bp
    app.register_blueprint(news_bp)

    #         
    from .resources.search import search_bp
    app.register_blueprint(search_bp)

    return app


1.2タスク関数の定義

from flask import current_app
from sqlalchemy import func

from models import db
from models.news import Article


def fix_statistic(app):
    """      """

    with app.app_context():  #    init_app      ,                /db,             
        #   :          
        #   mysql              select user_id, count(id) from t_article group by user_id where status = 2
        ret = db.session.query(Article.user_id, func.count(Article.id)).filter(Article.status == Article.STATUS.APPROVED).group_by(Article.user_id).all()

        #   redis    
        pipe = current_app.redis_master.pipeline(transaction=False)
        pipe.delete('count:user:arts')

        #  mysql      redis
        for user_id, count in ret:
            pipe.zadd('count:user:arts', count, user_id)

        pipe.execute()  #      redis

コード多重化を可能にするために,以上のコードを改造する.
from flask import current_app
from sqlalchemy import func

from cache.statistic import UserArticleCountStorage, UserFollowingsCountStorage, UserFansCountStorage
from models import db
from models.news import Article


def __fix_statistic(cls):
    try:
        #   mysql     
        ret = cls.db_query()
        #   redis  
        cls.reset(ret)
    except BaseException as e:
        current_app.logger.error(e)
        raise e


def fix_statistic(app):
    """      """

    with app.app_context():

        #            
        __fix_statistic(UserArticleCountStorage)

        #            
        __fix_statistic(UserFollowingsCountStorage)

        #            
        __fix_statistic(UserFansCountStorage)

データ操作を抽出して対応クラスにカプセル化し,クラスメソッドとして定義する
from flask import current_app
from redis import StrictRedis, RedisError
from sqlalchemy import func

from models import db
from models.news import Article


class BaseCountStorage:
    """    """
    key = ''

    @classmethod
    def get(cls, user_id):
        """
                  

        :param user_id:      
        :return:     
        """
        redis = current_app.redis_slave  # type: StrictRedis
        try:
            #         
            count = redis.zscore(cls.key, user_id)
        except RedisError as e:
            current_app.logger.error(e)
            raise e

        if count:
            return int(count) if int(count) > 0 else 0
        else:
            return 0

    @classmethod
    def incr(cls, user_id):
        """           +1"""
        redis = current_app.redis_master  # type: StrictRedis
        try:
            redis.zincrby(cls.key, user_id)
        except RedisError as e:
            current_app.logger.error(e)
            raise e
#            (  1)
    @classmethod
    def reset(cls, db_query_ret):
        """    """

        #   redis    
        pipe = current_app.redis_master.pipeline(transaction=False)
        pipe.delete(cls.key)

        #  mysql      redis
        for user_id, count in db_query_ret:
            pipe.zadd(cls.key, count, user_id)

        pipe.execute()  #      redis


class UserArticleCountStorage(BaseCountStorage):
    """         

    count:user:arts  zset  [{value:   id, score:    }, {}]
    """
    key = 'count:user:arts'  #    
    
# mysql        (  2)
    @classmethod
    def db_query(cls):
        return db.session.query(Article.user_id, func.count(Article.id)).filter(
            Article.status == Article.STATUS.APPROVED).group_by(Article.user_id).all()


class UserFollowingsCountStorage(BaseCountStorage):
    """         

    count:user:followings  zset  [{value:   id, score:    }, {}]
    """
    key = 'count:user:followings'  #    
# mysql        (  3)
    @classmethod
    def db_query(cls):
        return db.session.query(Article.user_id, func.count(Article.id)).filter(
            Article.status == Article.STATUS.APPROVED).group_by(Article.user_id).all()


class UserFansCountStorage(BaseCountStorage):
    """         
    count:user:fans  zset  [{value:   id, score:    }, {}]
    """
    key = 'count:user:fans'  #    
# mysql        (  4)
    @classmethod
    def db_query(cls):
        return db.session.query(Article.user_id, func.count(Article.id)).filter(
            Article.status == Article.STATUS.APPROVED).group_by(Article.user_id).all()