詳細はredisキャッシュとデータベースの整合性の問題を解決します。
データベースとキャッシュの読み書きモードポリシー
データベースを書き終わったら、キャッシュをすぐ更新しますか?それともキャッシュを直接削除しますか?
(1)データベースを書く値がキャッシュ値に更新されるのと同じであれば、何の計算も必要なく、すぐにキャッシュを更新することができますが、頻繁にデータを書いて、データを読むシーンが少ない場合は、この解決策には適していません。まだ照会していないかもしれませんので、削除または修正されます。時間とリソースが無駄になります。
(2)データベースを書く値とキャッシュを更新する値が一致していない場合、キャッシュに書き込まれたデータはいくつかのテーブルの関連計算を経て得られた結果をキャッシュに挿入する必要があります。キャッシュをすぐに更新する必要はありません。キャッシュを削除すればいいです。クエリーのときに計算した結果をキャッシュに挿入してください。
したがって、一般的なポリシーは、データを更新するときに、キャッシュデータを削除して、キャッシュを更新するのではなく、データベースを更新することです。検索するときには、最新のデータをキャッシュに更新します。
データベースとキャッシュがダブルで書かれている場合、データが一致しない問題があります。
シーン1
データを更新する時、ある商品の在庫を更新すると、現在の商品の在庫は100です。今は99に更新して、まずデータベースを99に変更して、キャッシュを削除して、キャッシュを削除しました。これはデータ在庫の99を意味します。キャッシュは100です。これはデータベースとキャッシュが一致しません。
シーン一解決案
この場合はまずキャッシュを削除して、データベースを更新します。キャッシュを削除して失敗したら、データベースを更新しないでください。キャッシュの削除に成功したと言えば、データベースの更新に失敗しました。データベースから古いデータを調べただけです。データベースとキャッシュの整合性を維持できます。
シーン2
高合併の場合、キャッシュを削除したら、データベースを更新しますが、まだ更新されていません。もう一つの要求でデータを調べてみます。キャッシュにないことが分かりました。データベースを更新するスレッドがデータベースを99に更新し、データベースとキャッシュが一致しない場合があります。
シーン二解決案
このような場合は、20個の列を作って、商品のIDによってhash値を作って、列の個数を調べて、データ更新要求がある時は、先に列に捨てて、更新後は列から削除します。更新中に上記の場面に遭遇したら、キャッシュに行ってデータがあるかどうかを確認してください。もしないなら、先に列に行って同じ商品IDが更新されているかどうかを確認してもいいです。もし問い合わせの要求があれば、キューに送ってください。そして同時にキャッシュの更新が完了するのを待ちます。
ここには最適化点があります。もし列の中に検索要求があると見つけたら、新しいクエリ操作を入れないでください。一つのwhileでキャッシュを調べて、200 MSぐらい循環します。キャッシュにまだないなら、直接にデータベースの古いデータを取ります。
高いところに降りてシーンを解決する上で注意すべき問題
(1)読み込み要求時に長い渋滞
読む要求は非常に軽度な非同期化を行っていますので、必ずタイムアウトの問題に注意してください。各読み取り要求は時間を超えて返さなければなりません。この解決策の最大のリスクは、データ更新が頻繁に行われる可能性があり、キューの中に大量の更新操作が押し込まれてしまい、その後、大量のタイムアウトが発生します。このような場合は、十分なストレステストを行います。圧力が大きすぎると、実際の状況に応じてマシンを追加する必要があります。
(2)合併要求量が高すぎる
ここではやはり圧力テストを行います。真実の場面を多くシミュレーションして、同時に量が一番高い時にはQPSはいくらですか?耐えられないなら、機械を追加してください。また、読みと書きの割合はいくらですか?
(3)マルチサービスのインスタンス展開の要求ルート
このサービスは複数の例を展開しているかもしれないが、データ更新動作の実行、キャッシュ更新動作の実行要求は、同じサービスインスタンスにnginxサーバを介してルーティングされることを保証しなければならない。
(4)ホットスポット商品のルート問題により、請求の傾斜が生じる
いくつかの商品の読み取り要求がとても高くて、全部同じマシンの同じ列に打ち込んでしまいました。サーバーの圧力が大きすぎるかもしれません。商品データの更新の時だけキャッシュを空にしてしまうので、読み書きを同時にしてしまいます。更新頻度があまり高くないと、この問題の影響は大きくありません。しかし、いくつかのサーバーの負荷が高いかもしれません。
データベースとキャッシュデータの整合性ソリューションフロー図
データベースとキャッシュデータの整合性ソリューション対応コード
商品在庫の実体
データベースを書き終わったら、キャッシュをすぐ更新しますか?それともキャッシュを直接削除しますか?
(1)データベースを書く値がキャッシュ値に更新されるのと同じであれば、何の計算も必要なく、すぐにキャッシュを更新することができますが、頻繁にデータを書いて、データを読むシーンが少ない場合は、この解決策には適していません。まだ照会していないかもしれませんので、削除または修正されます。時間とリソースが無駄になります。
(2)データベースを書く値とキャッシュを更新する値が一致していない場合、キャッシュに書き込まれたデータはいくつかのテーブルの関連計算を経て得られた結果をキャッシュに挿入する必要があります。キャッシュをすぐに更新する必要はありません。キャッシュを削除すればいいです。クエリーのときに計算した結果をキャッシュに挿入してください。
したがって、一般的なポリシーは、データを更新するときに、キャッシュデータを削除して、キャッシュを更新するのではなく、データベースを更新することです。検索するときには、最新のデータをキャッシュに更新します。
データベースとキャッシュがダブルで書かれている場合、データが一致しない問題があります。
シーン1
データを更新する時、ある商品の在庫を更新すると、現在の商品の在庫は100です。今は99に更新して、まずデータベースを99に変更して、キャッシュを削除して、キャッシュを削除しました。これはデータ在庫の99を意味します。キャッシュは100です。これはデータベースとキャッシュが一致しません。
シーン一解決案
この場合はまずキャッシュを削除して、データベースを更新します。キャッシュを削除して失敗したら、データベースを更新しないでください。キャッシュの削除に成功したと言えば、データベースの更新に失敗しました。データベースから古いデータを調べただけです。データベースとキャッシュの整合性を維持できます。
シーン2
高合併の場合、キャッシュを削除したら、データベースを更新しますが、まだ更新されていません。もう一つの要求でデータを調べてみます。キャッシュにないことが分かりました。データベースを更新するスレッドがデータベースを99に更新し、データベースとキャッシュが一致しない場合があります。
シーン二解決案
このような場合は、20個の列を作って、商品のIDによってhash値を作って、列の個数を調べて、データ更新要求がある時は、先に列に捨てて、更新後は列から削除します。更新中に上記の場面に遭遇したら、キャッシュに行ってデータがあるかどうかを確認してください。もしないなら、先に列に行って同じ商品IDが更新されているかどうかを確認してもいいです。もし問い合わせの要求があれば、キューに送ってください。そして同時にキャッシュの更新が完了するのを待ちます。
ここには最適化点があります。もし列の中に検索要求があると見つけたら、新しいクエリ操作を入れないでください。一つのwhileでキャッシュを調べて、200 MSぐらい循環します。キャッシュにまだないなら、直接にデータベースの古いデータを取ります。
高いところに降りてシーンを解決する上で注意すべき問題
(1)読み込み要求時に長い渋滞
読む要求は非常に軽度な非同期化を行っていますので、必ずタイムアウトの問題に注意してください。各読み取り要求は時間を超えて返さなければなりません。この解決策の最大のリスクは、データ更新が頻繁に行われる可能性があり、キューの中に大量の更新操作が押し込まれてしまい、その後、大量のタイムアウトが発生します。このような場合は、十分なストレステストを行います。圧力が大きすぎると、実際の状況に応じてマシンを追加する必要があります。
(2)合併要求量が高すぎる
ここではやはり圧力テストを行います。真実の場面を多くシミュレーションして、同時に量が一番高い時にはQPSはいくらですか?耐えられないなら、機械を追加してください。また、読みと書きの割合はいくらですか?
(3)マルチサービスのインスタンス展開の要求ルート
このサービスは複数の例を展開しているかもしれないが、データ更新動作の実行、キャッシュ更新動作の実行要求は、同じサービスインスタンスにnginxサーバを介してルーティングされることを保証しなければならない。
(4)ホットスポット商品のルート問題により、請求の傾斜が生じる
いくつかの商品の読み取り要求がとても高くて、全部同じマシンの同じ列に打ち込んでしまいました。サーバーの圧力が大きすぎるかもしれません。商品データの更新の時だけキャッシュを空にしてしまうので、読み書きを同時にしてしまいます。更新頻度があまり高くないと、この問題の影響は大きくありません。しかし、いくつかのサーバーの負荷が高いかもしれません。
データベースとキャッシュデータの整合性ソリューションフロー図
データベースとキャッシュデータの整合性ソリューション対応コード
商品在庫の実体
package com.shux.inventory.entity;
/**
**********************************************
* :
* Simba.Hua
* 2017 8 30
**********************************************
**/
public class InventoryProduct {
private Integer productId;
private Long InventoryCnt;
public Integer getProductId() {
return productId;
}
public void setProductId(Integer productId) {
this.productId = productId;
}
public Long getInventoryCnt() {
return InventoryCnt;
}
public void setInventoryCnt(Long inventoryCnt) {
InventoryCnt = inventoryCnt;
}
}
要求インターフェース
/**
**********************************************
* :
* Simba.Hua
* 2017 8 27
**********************************************
**/
public interface Request {
public void process();
public Integer getProductId();
public boolean isForceFefresh();
}
データ更新要求
package com.shux.inventory.request;
import org.springframework.transaction.annotation.Transactional;
import com.shux.inventory.biz.InventoryProductBiz;
import com.shux.inventory.entity.InventoryProduct;
/**
**********************************************
* :
* 1、
* 2、
* Simba.Hua
* 2017 8 30
**********************************************
**/
public class InventoryUpdateDBRequest implements Request{
private InventoryProductBiz inventoryProductBiz;
private InventoryProduct inventoryProduct;
public InventoryUpdateDBRequest(InventoryProduct inventoryProduct,InventoryProductBiz inventoryProductBiz){
this.inventoryProduct = inventoryProduct;
this.inventoryProductBiz = inventoryProductBiz;
}
@Override
@Transactional
public void process() {
inventoryProductBiz.removeInventoryProductCache(inventoryProduct.getProductId());
inventoryProductBiz.updateInventoryProduct(inventoryProduct);
}
@Override
public Integer getProductId() {
// TODO Auto-generated method stub
return inventoryProduct.getProductId();
}
@Override
public boolean isForceFefresh() {
// TODO Auto-generated method stub
return false;
}
}
クエリ要求
package com.shux.inventory.request;
import com.shux.inventory.biz.InventoryProductBiz;
import com.shux.inventory.entity.InventoryProduct;
/**
**********************************************
* :
* 1、
* 2、
* Simba.Hua
* 2017 8 30
**********************************************
**/
public class InventoryQueryCacheRequest implements Request {
private InventoryProductBiz inventoryProductBiz;
private Integer productId;
private boolean isForceFefresh;
public InventoryQueryCacheRequest(Integer productId,InventoryProductBiz inventoryProductBiz,boolean isForceFefresh) {
this.productId = productId;
this.inventoryProductBiz = inventoryProductBiz;
this.isForceFefresh = isForceFefresh;
}
@Override
public void process() {
InventoryProduct inventoryProduct = inventoryProductBiz.loadInventoryProductByProductId(productId);
inventoryProductBiz.setInventoryProductCache(inventoryProduct);
}
@Override
public Integer getProductId() {
// TODO Auto-generated method stub
return productId;
}
public boolean isForceFefresh() {
return isForceFefresh;
}
public void setForceFefresh(boolean isForceFefresh) {
this.isForceFefresh = isForceFefresh;
}
}
spring起動時に列のプログラムを初期化します。
package com.shux.inventory.thread;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.shux.inventory.request.Request;
import com.shux.inventory.request.RequestQueue;
import com.shux.utils.other.SysConfigUtil;
/**
**********************************************
* : ,
* Simba.Hua
* 2017 8 27
**********************************************
**/
public class RequestProcessorThreadPool {
private static final int blockingQueueNum = SysConfigUtil.get("request.blockingqueue.number")==null?10:Integer.valueOf(SysConfigUtil.get("request.blockingqueue.number").toString());
private static final int queueDataNum = SysConfigUtil.get("request.everyqueue.data.length")==null?100:Integer.valueOf(SysConfigUtil.get("request.everyqueue.data.length").toString());
private ExecutorService threadPool = Executors.newFixedThreadPool(blockingQueueNum);
private RequestProcessorThreadPool(){
for(int i=0;i<blockingQueueNum;i++){//
ArrayBlockingQueue<Request> queue = new ArrayBlockingQueue<Request>(queueDataNum);// 100
RequestQueue.getInstance().addQueue(queue);
threadPool.submit(new RequestProcessorThread(queue));// queue , queue
}
}
public static class Singleton{
private static RequestProcessorThreadPool instance;
static{
instance = new RequestProcessorThreadPool();
}
public static RequestProcessorThreadPool getInstance(){
return instance;
}
}
public static RequestProcessorThreadPool getInstance(){
return Singleton.getInstance();
}
/**
*
*/
public static void init(){
getInstance();
}
}
要求処理スレッド
package com.shux.inventory.thread;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import com.shux.inventory.request.InventoryUpdateDBRequest;
import com.shux.inventory.request.Request;
import com.shux.inventory.request.RequestQueue;
/**
**********************************************
* :
* Simba.Hua
* 2017 8 27
**********************************************
**/
public class RequestProcessorThread implements Callable<Boolean>{
private ArrayBlockingQueue<Request> queue;
public RequestProcessorThread(ArrayBlockingQueue<Request> queue){
this.queue = queue;
}
@Override
public Boolean call() throws Exception {
Request request = queue.take();
Map<Integer,Boolean> flagMap = RequestQueue.getInstance().getFlagMap();
// ,
if (!request.isForceFefresh()){
if (request instanceof InventoryUpdateDBRequest) {// , false
flagMap.put(request.getProductId(), true);
} else {
Boolean flag = flagMap.get(request.getProductId());
/**
* ,
* 1、
* 2、
* 3、
* , , ,
* redis ,redis LRU ,
* , , false,
*/
if ( flag == null) {
flagMap.put(request.getProductId(), false);
}
/**
* , flag true, , ( ),
* , , false
*/
if ( flag != null && flag) {
flagMap.put(request.getProductId(), false);
}
/**
* , , ,
*/
if (flag != null && !flag) {
flagMap.put(request.getProductId(), false);
return true;
}
}
}
request.process();
return true;
}
}
要求待ち行列
package com.shux.inventory.request;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
/**
**********************************************
* :
* Simba.Hua
* 2017 8 27
**********************************************
**/
public class RequestQueue {
private List<ArrayBlockingQueue<Request>> queues = new ArrayList<>();
private Map<Integer,Boolean> flagMap = new ConcurrentHashMap<>();
private RequestQueue(){
}
private static class Singleton{
private static RequestQueue queue;
static{
queue = new RequestQueue();
}
public static RequestQueue getInstance() {
return queue;
}
}
public static RequestQueue getInstance(){
return Singleton.getInstance();
}
public void addQueue(ArrayBlockingQueue<Request> queue) {
queues.add(queue);
}
public int getQueueSize(){
return queues.size();
}
public ArrayBlockingQueue<Request> getQueueByIndex(int index) {
return queues.get(index);
}
public Map<Integer,Boolean> getFlagMap() {
return this.flagMap;
}
}
スプリングスタート初期化スレッドクラス
package com.shux.inventory.listener;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import com.shux.inventory.thread.RequestProcessorThreadPool;
/**
**********************************************
* :spring
* Simba.Hua
* 2017 8 27
**********************************************
**/
public class InitListener implements ApplicationListener<ContextRefreshedEvent>{
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
// TODO Auto-generated method stub
if(event.getApplicationContext().getParent() != null){
return;
}
RequestProcessorThreadPool.init();
}
}
非同期処理要求インターフェース
package com.shux.inventory.biz;
import com.shux.inventory.request.Request;
/**
**********************************************
* : ,
* Simba.Hua
* 2017 8 30
**********************************************
**/
public interface IRequestAsyncProcessBiz {
void process(Request request);
}
非同期処理要求インターフェース実装
package com.shux.inventory.biz.impl;
import java.util.concurrent.ArrayBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import com.shux.inventory.biz.IRequestAsyncProcessBiz;
import com.shux.inventory.request.Request;
import com.shux.inventory.request.RequestQueue;
/**
**********************************************
* : ,
* Simba.Hua
* 2017 8 30
**********************************************
**/
@Service("requestAsyncProcessService")
public class RequestAsyncProcessBizImpl implements IRequestAsyncProcessBiz {
private Logger logger = LoggerFactory.getLogger(getClass());
@Override
public void process(Request request) {
// , productId
ArrayBlockingQueue<Request> queue = getQueueByProductId(request.getProductId());
try {
queue.put(request);
} catch (InterruptedException e) {
logger.error(" ID{} ",request.getProductId(),e);
}
}
private ArrayBlockingQueue<Request> getQueueByProductId(Integer productId) {
RequestQueue requestQueue = RequestQueue.getInstance();
String key = String.valueOf(productId);
int hashcode;
int hash = (key == null) ? 0 : (hashcode = key.hashCode())^(hashcode >>> 16);
// hashcode
int index = (requestQueue.getQueueSize()-1) & hash;
return requestQueue.getQueueByIndex(index);
}
}
package com.shux.inventory.biz.impl;
import javax.annotation.Resource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.shux.inventory.biz.InventoryProductBiz;
import com.shux.inventory.entity.InventoryProduct;
import com.shux.inventory.mapper.InventoryProductMapper;
import com.shux.redis.biz.IRedisBiz;
/**
**********************************************
*
* Simba.Hua
* 2017 8 30
**********************************************
**/
@Service("inventoryProductBiz")
public class InventoryProductBizImpl implements InventoryProductBiz {
private @Autowired IRedisBiz<InventoryProduct> redisBiz;
private @Resource InventoryProductMapper mapper;
@Override
public void updateInventoryProduct(InventoryProduct inventoryProduct) {
// TODO Auto-generated method stub
mapper.updateInventoryProduct(inventoryProduct);
}
@Override
public InventoryProduct loadInventoryProductByProductId(Integer productId) {
// TODO Auto-generated method stub
return mapper.loadInventoryProductByProductId(productId);
}
@Override
public void setInventoryProductCache(InventoryProduct inventoryProduct) {
redisBiz.set("inventoryProduct:"+inventoryProduct.getProductId(), inventoryProduct);
}
@Override
public void removeInventoryProductCache(Integer productId) {
redisBiz.delete("inventoryProduct:"+productId);
}
@Override
public InventoryProduct loadInventoryProductCache(Integer productId) {
// TODO Auto-generated method stub
return redisBiz.get("inventoryProduct:"+productId);
}
}
データ更新要求controller
package com.shux.inventory.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import com.shux.inventory.biz.IRequestAsyncProcessBiz;
import com.shux.inventory.biz.InventoryProductBiz;
import com.shux.inventory.entity.InventoryProduct;
import com.shux.inventory.request.InventoryUpdateDBRequest;
import com.shux.inventory.request.Request;
import com.shux.utils.other.Response;
/**
**********************************************
* :
* Simba.Hua
* 2017 9 1
**********************************************
**/
@Controller("/inventory")
public class InventoryUpdateDBController {
private @Autowired InventoryProductBiz inventoryProductBiz;
private @Autowired IRequestAsyncProcessBiz requestAsyncProcessBiz;
@RequestMapping("/updateDBInventoryProduct")
@ResponseBody
public Response updateDBInventoryProduct(InventoryProduct inventoryProduct){
Request request = new InventoryUpdateDBRequest(inventoryProduct,inventoryProductBiz);
requestAsyncProcessBiz.process(request);
return new Response(Response.SUCCESS," ");
}
}
データクエリ要求controller
package com.shux.inventory.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import com.shux.inventory.biz.IRequestAsyncProcessBiz;
import com.shux.inventory.biz.InventoryProductBiz;
import com.shux.inventory.entity.InventoryProduct;
import com.shux.inventory.request.InventoryQueryCacheRequest;
import com.shux.inventory.request.Request;
/**
**********************************************
* :
* 1、
* 2、 ,
* 3、 , 20 , , 200 , 200 , ,
* Simba.Hua
* 2017 9 1
**********************************************
**/
@Controller("/inventory")
public class InventoryQueryCacheController {
private @Autowired InventoryProductBiz inventoryProductBiz;
private @Autowired IRequestAsyncProcessBiz requestAsyncProcessBiz;
@RequestMapping("/queryInventoryProduct")
public InventoryProduct queryInventoryProduct(Integer productId) {
Request request = new InventoryQueryCacheRequest(productId,inventoryProductBiz,false);
requestAsyncProcessBiz.process(request);//
long startTime = System.currentTimeMillis();
long allTime = 0L;
long endTime = 0L;
InventoryProduct inventoryProduct = null;
while (true) {
if (allTime > 200){// 200ms, ,
break;
}
try {
inventoryProduct = inventoryProductBiz.loadInventoryProductCache(productId);
if (inventoryProduct != null) {
return inventoryProduct;
} else {
Thread.sleep(20);// 20
}
endTime = System.currentTimeMillis();
allTime = endTime - startTime;
} catch (Exception e) {
}
}
/**
* ,
* 1、 , redis ,redis LRU ,
* 2、 , ,
* 3、 , , , , , ,
*/
inventoryProduct = inventoryProductBiz.loadInventoryProductByProductId(productId);
if (inventoryProduct != null) {
Request forcRrequest = new InventoryQueryCacheRequest(productId,inventoryProductBiz,true);
requestAsyncProcessBiz.process(forcRrequest);// ,
return inventoryProduct;
}
return null;
}
}
この記事では、レディキャッシュとデータベースの整合性に関する詳細な問題を解決するための文章を紹介します。これに関連するレディキャッシュとデータベースの整合性については、以前の文章を検索したり、下記の関連記事を引き続き閲覧したりしてください。これからもよろしくお願いします。