redisシリーズ-問題解決策の更新(データ整合性補正)
30366 ワード
redisシリーズ-問題解決策の更新
redisはキャッシュをしてmysqlデータベースの圧力を軽減すると同時に、更新によって二重ライブラリのデータが一致しない場合があります.私がここで取った案は:
mysqlはデータを更新後、redisキャッシュ(これも多くの企業で採用されている)を削除し、ユーザーが次回アクセス時にキャッシュにヒットせず、mysqlにアクセスし、新しいredisキャッシュを生成する.
ほとんどの場合は安全であるが、絶対安全ではない、不一致も生じるため、タイミングデータ補正を行う.ポリシー:
先にmysqlでクエリーデータredisのデータを削除mysqlのデータをredisに書き込む
1.1タイミングタスクを設定ここでは皆さんの理解を容易にするために、私が置いたプロジェクトの初期化のコードは、タイミングタスクの登録を見るだけでいいです.
1.2タスク関数の定義
コード多重化を可能にするために,以上のコードを改造する.
データ操作を抽出して対応クラスにカプセル化し,クラスメソッドとして定義する
redisはキャッシュをしてmysqlデータベースの圧力を軽減すると同時に、更新によって二重ライブラリのデータが一致しない場合があります.私がここで取った案は:
mysqlはデータを更新後、redisキャッシュ(これも多くの企業で採用されている)を削除し、ユーザーが次回アクセス時にキャッシュにヒットせず、mysqlにアクセスし、新しいredisキャッシュを生成する.
ほとんどの場合は安全であるが、絶対安全ではない、不一致も生じるため、タイミングデータ補正を行う.ポリシー:
先にmysqlでクエリーデータredisのデータを削除mysqlのデータをredisに書き込む
1.1タイミングタスクを設定ここでは皆さんの理解を容易にするために、私が置いたプロジェクトの初期化のコードは、タイミングタスクの登録を見るだけでいいです.
from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.schedulers.background import BackgroundScheduler
from flask import Flask
from schedule.statistic import fix_statistic
def create_flask_app(config, enable_config_file=False):
"""
Flask
"""
app = Flask(__name__)
app.config.from_object(config)
if enable_config_file:
from utils import constants
#
app.config.from_envvar(constants.GLOBAL_SETTING_ENV_NAME, silent=True)
return app
def create_app(config, enable_config_file=False):
"""
flask
:param config:
:param enable_config_file:
:return: flask
"""
app = create_flask_app(config, enable_config_file)
#
from utils.converters import register_converters
register_converters(app)
# redis
from redis.sentinel import Sentinel
_sentinel = Sentinel(app.config['REDIS_SENTINELS'])
# redis
app.redis_master = _sentinel.master_for(app.config['REDIS_SENTINEL_SERVICE_NAME'])
app.redis_slave = _sentinel.slave_for(app.config['REDIS_SENTINEL_SERVICE_NAME'])
# redis
from rediscluster import StrictRedisCluster
app.redis_cluster = StrictRedisCluster(startup_nodes=app.config['REDIS_CLUSTER'])
# myql
from models import db
db.init_app(app)
#
from utils.logging import create_logger
create_logger(app)
#
from utils.limiter import limiter as lmt
lmt.init_app(app)
# Snowflake ID worker
from utils.snowflake.id_worker import IdWorker
app.id_worker = IdWorker(app.config['DATACENTER_ID'],
app.config['WORKER_ID'],
app.config['SEQUENCE'])
#
executor = ThreadPoolExecutor(max_workers=3)
executors = {
'default':executor
}
app.scheduler = BackgroundScheduler(executors=executors)
# 3
# app.scheduler.add_job(fix_statistic, 'cron', hour=3)
# date
app.scheduler.add_job(fix_statistic, 'date')
app.scheduler.start()
#
from utils.middlewares import jwt_authentication
app.before_request(jwt_authentication)
#
from .resources.user import user_bp
app.register_blueprint(user_bp)
#
from .resources.news import news_bp
app.register_blueprint(news_bp)
#
from .resources.search import search_bp
app.register_blueprint(search_bp)
return app
1.2タスク関数の定義
from flask import current_app
from sqlalchemy import func
from models import db
from models.news import Article
def fix_statistic(app):
""" """
with app.app_context(): # init_app , /db,
# :
# mysql select user_id, count(id) from t_article group by user_id where status = 2
ret = db.session.query(Article.user_id, func.count(Article.id)).filter(Article.status == Article.STATUS.APPROVED).group_by(Article.user_id).all()
# redis
pipe = current_app.redis_master.pipeline(transaction=False)
pipe.delete('count:user:arts')
# mysql redis
for user_id, count in ret:
pipe.zadd('count:user:arts', count, user_id)
pipe.execute() # redis
コード多重化を可能にするために,以上のコードを改造する.
from flask import current_app
from sqlalchemy import func
from cache.statistic import UserArticleCountStorage, UserFollowingsCountStorage, UserFansCountStorage
from models import db
from models.news import Article
def __fix_statistic(cls):
try:
# mysql
ret = cls.db_query()
# redis
cls.reset(ret)
except BaseException as e:
current_app.logger.error(e)
raise e
def fix_statistic(app):
""" """
with app.app_context():
#
__fix_statistic(UserArticleCountStorage)
#
__fix_statistic(UserFollowingsCountStorage)
#
__fix_statistic(UserFansCountStorage)
データ操作を抽出して対応クラスにカプセル化し,クラスメソッドとして定義する
from flask import current_app
from redis import StrictRedis, RedisError
from sqlalchemy import func
from models import db
from models.news import Article
class BaseCountStorage:
""" """
key = ''
@classmethod
def get(cls, user_id):
"""
:param user_id:
:return:
"""
redis = current_app.redis_slave # type: StrictRedis
try:
#
count = redis.zscore(cls.key, user_id)
except RedisError as e:
current_app.logger.error(e)
raise e
if count:
return int(count) if int(count) > 0 else 0
else:
return 0
@classmethod
def incr(cls, user_id):
""" +1"""
redis = current_app.redis_master # type: StrictRedis
try:
redis.zincrby(cls.key, user_id)
except RedisError as e:
current_app.logger.error(e)
raise e
# ( 1)
@classmethod
def reset(cls, db_query_ret):
""" """
# redis
pipe = current_app.redis_master.pipeline(transaction=False)
pipe.delete(cls.key)
# mysql redis
for user_id, count in db_query_ret:
pipe.zadd(cls.key, count, user_id)
pipe.execute() # redis
class UserArticleCountStorage(BaseCountStorage):
"""
count:user:arts zset [{value: id, score: }, {}]
"""
key = 'count:user:arts' #
# mysql ( 2)
@classmethod
def db_query(cls):
return db.session.query(Article.user_id, func.count(Article.id)).filter(
Article.status == Article.STATUS.APPROVED).group_by(Article.user_id).all()
class UserFollowingsCountStorage(BaseCountStorage):
"""
count:user:followings zset [{value: id, score: }, {}]
"""
key = 'count:user:followings' #
# mysql ( 3)
@classmethod
def db_query(cls):
return db.session.query(Article.user_id, func.count(Article.id)).filter(
Article.status == Article.STATUS.APPROVED).group_by(Article.user_id).all()
class UserFansCountStorage(BaseCountStorage):
"""
count:user:fans zset [{value: id, score: }, {}]
"""
key = 'count:user:fans' #
# mysql ( 4)
@classmethod
def db_query(cls):
return db.session.query(Article.user_id, func.count(Article.id)).filter(
Article.status == Article.STATUS.APPROVED).group_by(Article.user_id).all()