Cassandra失効検査原理
6820 ワード
Cassandra失効検査原理
一、伝統失効検査とその不足
従来の故障検出方法
分散システムでは常に心拍数を使ってServerの健康状態を測定していますが、理論的には、相手がcrashかどうかを本当に検出することができません。従来の測定方法は、タイムアウト時間Tを設定していますが、T内に相手の心拍カバンを受け取っていない限り、相手のあたまだと思っています。方法は簡単で乱暴ですが、広く使われています。
従来のエラー検出に存在する欠陥
上述のように、ターゲットホストは、従来の方法では、間隔t秒ごとに心拍が開始され、受信者は、タイムアウト時間T(t<T)を用いて、ターゲットがあたごであるかどうかを判断し、受信者は、まずターゲットの心拍規則(周期tの間隔)を明確にしてから、タイムアウト時間Tを正確に設定することができ、Tの選択は現在のネットワーク状況、ターゲットホストの処理能力など多くの不確定要素があるので、実際にはテストまたは推定によってTに上限値が与えられます。上限値の設定が大きすぎると「遅延」と判断されますが、判断の正確性が高まります。小さすぎると判断効率が上がりますが、誤審の可能性が高まります。しかし、次のいくつかのシーンでは、従来の検出方法は使用できません。
1.ゴシップ通信
しかし、実際のアプリケーションでは、例えば、Gossip通信アプリケーションに基づいて、ランダム通信のために、二つのServerの間に規則的な心拍が存在しないため、Tを非常に大きく設定しない限り、適切なタイムアウト時間Tを見つけることは困難であるが、このように検出する過程は「遅延」して耐えられない。
2.ネットワーク負荷のダイナミック変化
もう一つの場合、ネットワーク負荷が大きくなるにつれて、Server心拍の受信時間は上限値Tより大きくなる可能性があります。しかし、ネットワークの圧力が減少すると、心拍受信時間がT以下になります。変わらないTで心拍状況を反映すると、判断が遅くなります。
3.心拍検出と結果の分離
各アプリケーションが一つの目的の本体のあたごの有無を知るだけでなく、自分で動悸の結果を説明して、異なる処理動作をするアプリケーションが多いです。例えば、目標の本体3 s内に動悸がない場合は、Aを使ってあたごと読み返し、Bを使って解読することを目標とします。活発ではありません。つまり、ターゲットホストが「あたご」かどうかは業務ロジックで決めるべきで、タイムアウト時間Tで決めるのではなく、心拍検出プロセスと結果の解釈を分離して、より良い柔軟性を提供するべきです。
二Gossiperで採用されているΦ故障検出方法
失効検査による古典論文The Phi accrual failure detector(http://vsedach.googlepages.com/HDY04.pdf)で証明されたように、分散環境においてホストの心拍数は、従来の心拍間隔の経験値に基づいて、ホストがあたごかどうかを次のように判断することができます。
1.与えられたしきい値Φ
2.一定時間、各心拍間隔を記録する
3.心拍の間隔値に指数分布(Exponentialdistribution)確率を求める:
P=E^(-1*(now-lastTimeStation)/mean)(Eは対数2.1828...で、メンはその前の間隔時間平均値)
これは、前回の統計以来、心臓の到達時間がnow-lastTimeStationの確率を超えるということです。
4.計算φ= - ロゴ10 P
5.当φ> Φ その時、本体はすでにあたまになっていると考えられます。
もちろんこれは誤審の可能性があります。誤審の可能性は以下の通りです。
Φ= 1,1%
Φ= 2,0.1%
Φ= 3,0.01%
……
このことから分かるように、当Φ= 8時、誤審率はすでに小さいです。cassandraではデフォルトで採用されます。Φ= 8です
以下、Phiの故障検出アルゴリズムに関するjava実装があります。Cassandraでは、これに類似したものが実現されます。
一、伝統失効検査とその不足
従来の故障検出方法
分散システムでは常に心拍数を使ってServerの健康状態を測定していますが、理論的には、相手がcrashかどうかを本当に検出することができません。従来の測定方法は、タイムアウト時間Tを設定していますが、T内に相手の心拍カバンを受け取っていない限り、相手のあたまだと思っています。方法は簡単で乱暴ですが、広く使われています。
従来のエラー検出に存在する欠陥
上述のように、ターゲットホストは、従来の方法では、間隔t秒ごとに心拍が開始され、受信者は、タイムアウト時間T(t<T)を用いて、ターゲットがあたごであるかどうかを判断し、受信者は、まずターゲットの心拍規則(周期tの間隔)を明確にしてから、タイムアウト時間Tを正確に設定することができ、Tの選択は現在のネットワーク状況、ターゲットホストの処理能力など多くの不確定要素があるので、実際にはテストまたは推定によってTに上限値が与えられます。上限値の設定が大きすぎると「遅延」と判断されますが、判断の正確性が高まります。小さすぎると判断効率が上がりますが、誤審の可能性が高まります。しかし、次のいくつかのシーンでは、従来の検出方法は使用できません。
1.ゴシップ通信
しかし、実際のアプリケーションでは、例えば、Gossip通信アプリケーションに基づいて、ランダム通信のために、二つのServerの間に規則的な心拍が存在しないため、Tを非常に大きく設定しない限り、適切なタイムアウト時間Tを見つけることは困難であるが、このように検出する過程は「遅延」して耐えられない。
2.ネットワーク負荷のダイナミック変化
もう一つの場合、ネットワーク負荷が大きくなるにつれて、Server心拍の受信時間は上限値Tより大きくなる可能性があります。しかし、ネットワークの圧力が減少すると、心拍受信時間がT以下になります。変わらないTで心拍状況を反映すると、判断が遅くなります。
3.心拍検出と結果の分離
各アプリケーションが一つの目的の本体のあたごの有無を知るだけでなく、自分で動悸の結果を説明して、異なる処理動作をするアプリケーションが多いです。例えば、目標の本体3 s内に動悸がない場合は、Aを使ってあたごと読み返し、Bを使って解読することを目標とします。活発ではありません。つまり、ターゲットホストが「あたご」かどうかは業務ロジックで決めるべきで、タイムアウト時間Tで決めるのではなく、心拍検出プロセスと結果の解釈を分離して、より良い柔軟性を提供するべきです。
二Gossiperで採用されているΦ故障検出方法
失効検査による古典論文The Phi accrual failure detector(http://vsedach.googlepages.com/HDY04.pdf)で証明されたように、分散環境においてホストの心拍数は、従来の心拍間隔の経験値に基づいて、ホストがあたごかどうかを次のように判断することができます。
1.与えられたしきい値Φ
2.一定時間、各心拍間隔を記録する
3.心拍の間隔値に指数分布(Exponentialdistribution)確率を求める:
P=E^(-1*(now-lastTimeStation)/mean)(Eは対数2.1828...で、メンはその前の間隔時間平均値)
これは、前回の統計以来、心臓の到達時間がnow-lastTimeStationの確率を超えるということです。
4.計算φ= - ロゴ10 P
5.当φ> Φ その時、本体はすでにあたまになっていると考えられます。
もちろんこれは誤審の可能性があります。誤審の可能性は以下の通りです。
Φ= 1,1%
Φ= 2,0.1%
Φ= 3,0.01%
……
このことから分かるように、当Φ= 8時、誤審率はすでに小さいです。cassandraではデフォルトで採用されます。Φ= 8です
以下、Phiの故障検出アルゴリズムに関するjava実装があります。Cassandraでは、これに類似したものが実現されます。
/**
java demo for phi failure detector
*/
import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class PhiAccrualFailureDetector {
private static final int sampleWindowSize = 1000;
private static int phiSuspectThreshold = 8;
private SamplingWindow simpleingWindow = new SamplingWindow(sampleWindowSize);
public PhiAccrualFailureDetector() {
}
public void addSample() {
simpleingWindow.add(System.currentTimeMillis());
}
public void addSample(double sample) {
simpleingWindow.add(sample);
}
public void interpret() {
double phi = simpleingWindow.phi(System.currentTimeMillis());
System.out.println("PHI = " + phi);
if (phi > phiSuspectThreshold) {
System.out.println("We are assuming the moniored machine is down!");
} else {
System.out.println("We are assuming the moniored machine is still running!");
}
}
/**
* @param args
* the command line arguments
*/
public static void main(String[] args) {
PhiAccrualFailureDetector pafd = new PhiAccrualFailureDetector();
// first try with phi < phiSuspectThreshold
for (int i = 0; i < 10; i++) {
pafd.addSample();
try {
Thread.sleep(10L);
} catch (InterruptedException ex) {
// no op
}
}
try {
Thread.sleep(500L);
} catch (InterruptedException ex) {
// no op
}
System.out.println(pafd.simpleingWindow.toString());
pafd.interpret();
// second try result phi > phiSuspectThreshold
for (int i = 0; i < 10; i++) {
pafd.addSample();
try {
Thread.sleep(10L);
} catch (InterruptedException ex) {
// no op
}
}
try {
Thread.sleep(1500L);
} catch (InterruptedException ex) {
// no op
}
System.out.println(pafd.simpleingWindow.toString());
pafd.interpret();
}
static class SamplingWindow {
private final Lock lock = new ReentrantLock();
private double lastTimeStamp = 0L;
private StatisticDeque arrivalIntervals;
SamplingWindow(int size) {
arrivalIntervals = new StatisticDeque(size);
}
void add(double value) {
lock.lock();
try {
double interval;
if (lastTimeStamp > 0L) {
interval = (value - lastTimeStamp);
} else {
interval = 1000 / 2;
}
lastTimeStamp = value;
arrivalIntervals.add(interval);
} finally {
lock.unlock();
}
}
double sum() {
lock.lock();
try {
return arrivalIntervals.sum();
} finally {
lock.unlock();
}
}
double sumOfDeviations() {
lock.lock();
try {
return arrivalIntervals.sumOfDeviations();
} finally {
lock.unlock();
}
}
double mean() {
lock.lock();
try {
return arrivalIntervals.mean();
} finally {
lock.unlock();
}
}
double variance() {
lock.lock();
try {
return arrivalIntervals.variance();
} finally {
lock.unlock();
}
}
double stdev() {
lock.lock();
try {
return arrivalIntervals.stdev();
} finally {
lock.unlock();
}
}
void clear() {
lock.lock();
try {
arrivalIntervals.clear();
} finally {
lock.unlock();
}
}
/**
*
* p = E ^ (-1 * (tnow - lastTimeStamp) / mean)
*/
double p(double t) {
double mean = mean();
double exponent = (-1) * (t) / mean;
return Math.pow(Math.E, exponent);
}
double phi(long tnow) {
int size = arrivalIntervals.size();
double log = 0d;
if (size > 0) {
double t = tnow - lastTimeStamp;
double probability = p(t);
log = (-1) * Math.log10(probability);
}
return log;
}
@Override
public String toString() {
StringBuilder s = new StringBuilder();
for (Iterator<Double> it = arrivalIntervals.iterator(); it.hasNext();) {
s.append(it.next()).append(" ");
}
return s.toString();
}
}
static class StatisticDeque implements Iterable<Double> {
private final int size;
protected final ArrayDeque<Double> queue;
public StatisticDeque(int size) {
this.size = size;
queue = new ArrayDeque<Double>(size);
}
public Iterator<Double> iterator() {
return queue.iterator();
}
public int size() {
return queue.size();
}
public void clear() {
queue.clear();
}
public void add(double o) {
if (size == queue.size()) {
queue.remove();
}
queue.add(o);
}
public double sum() {
double sum = 0D;
for (Double interval : this) {
sum += interval;
}
return sum;
}
public double sumOfDeviations() {
double sumOfDeviations = 0D;
double mean = mean();
for (Double interval : this) {
double d = interval - mean;
sumOfDeviations += d * d;
}
return sumOfDeviations;
}
public double mean() {
return sum() / size();
}
public double variance() {
return sumOfDeviations() / size();
}
public double stdev() {
return Math.sqrt(variance());
}
}
}