Cassandra失効検査原理

6820 ワード

Cassandra失効検査原理
一、伝統失効検査とその不足
従来の故障検出方法
分散システムでは常に心拍数を使ってServerの健康状態を測定していますが、理論的には、相手がcrashかどうかを本当に検出することができません。従来の測定方法は、タイムアウト時間Tを設定していますが、T内に相手の心拍カバンを受け取っていない限り、相手のあたまだと思っています。方法は簡単で乱暴ですが、広く使われています。
従来のエラー検出に存在する欠陥
上述のように、ターゲットホストは、従来の方法では、間隔t秒ごとに心拍が開始され、受信者は、タイムアウト時間T(t<T)を用いて、ターゲットがあたごであるかどうかを判断し、受信者は、まずターゲットの心拍規則(周期tの間隔)を明確にしてから、タイムアウト時間Tを正確に設定することができ、Tの選択は現在のネットワーク状況、ターゲットホストの処理能力など多くの不確定要素があるので、実際にはテストまたは推定によってTに上限値が与えられます。上限値の設定が大きすぎると「遅延」と判断されますが、判断の正確性が高まります。小さすぎると判断効率が上がりますが、誤審の可能性が高まります。しかし、次のいくつかのシーンでは、従来の検出方法は使用できません。
1.ゴシップ通信
しかし、実際のアプリケーションでは、例えば、Gossip通信アプリケーションに基づいて、ランダム通信のために、二つのServerの間に規則的な心拍が存在しないため、Tを非常に大きく設定しない限り、適切なタイムアウト時間Tを見つけることは困難であるが、このように検出する過程は「遅延」して耐えられない。
2.ネットワーク負荷のダイナミック変化
もう一つの場合、ネットワーク負荷が大きくなるにつれて、Server心拍の受信時間は上限値Tより大きくなる可能性があります。しかし、ネットワークの圧力が減少すると、心拍受信時間がT以下になります。変わらないTで心拍状況を反映すると、判断が遅くなります。
3.心拍検出と結果の分離
各アプリケーションが一つの目的の本体のあたごの有無を知るだけでなく、自分で動悸の結果を説明して、異なる処理動作をするアプリケーションが多いです。例えば、目標の本体3 s内に動悸がない場合は、Aを使ってあたごと読み返し、Bを使って解読することを目標とします。活発ではありません。つまり、ターゲットホストが「あたご」かどうかは業務ロジックで決めるべきで、タイムアウト時間Tで決めるのではなく、心拍検出プロセスと結果の解釈を分離して、より良い柔軟性を提供するべきです。
二Gossiperで採用されているΦ故障検出方法
失効検査による古典論文The Phi accrual failure detector(http://vsedach.googlepages.com/HDY04.pdf)で証明されたように、分散環境においてホストの心拍数は、従来の心拍間隔の経験値に基づいて、ホストがあたごかどうかを次のように判断することができます。
1.与えられたしきい値Φ
2.一定時間、各心拍間隔を記録する
3.心拍の間隔値に指数分布(Exponentialdistribution)確率を求める:
P=E^(-1*(now-lastTimeStation)/mean)(Eは対数2.1828...で、メンはその前の間隔時間平均値)
これは、前回の統計以来、心臓の到達時間がnow-lastTimeStationの確率を超えるということです。
4.計算φ= - ロゴ10 P
5.当φ> Φ その時、本体はすでにあたまになっていると考えられます。
もちろんこれは誤審の可能性があります。誤審の可能性は以下の通りです。
Φ= 1,1%
Φ= 2,0.1%
Φ= 3,0.01%
……
このことから分かるように、当Φ= 8時、誤審率はすでに小さいです。cassandraではデフォルトで採用されます。Φ= 8です
以下、Phiの故障検出アルゴリズムに関するjava実装があります。Cassandraでは、これに類似したものが実現されます。
/**
       java demo for phi failure detector
*/
import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class PhiAccrualFailureDetector {
	private static final int sampleWindowSize = 1000;
	private static int phiSuspectThreshold = 8;
	private SamplingWindow simpleingWindow = new SamplingWindow(sampleWindowSize);

	public PhiAccrualFailureDetector() {
	}

	public void addSample() {
		simpleingWindow.add(System.currentTimeMillis());
	}

	public void addSample(double sample) {
		simpleingWindow.add(sample);
	}

	public void interpret() {
		double phi = simpleingWindow.phi(System.currentTimeMillis());
		System.out.println("PHI = " + phi);
		if (phi > phiSuspectThreshold) {
			System.out.println("We are assuming the moniored machine is down!");
		} else {
			System.out.println("We are assuming the moniored machine is still running!");
		}
	}

	/**
	 * @param args
	 *                the command line arguments
	 */
	public static void main(String[] args) {
		PhiAccrualFailureDetector pafd = new PhiAccrualFailureDetector();
		// first try with phi < phiSuspectThreshold
		for (int i = 0; i < 10; i++) {
			pafd.addSample();
			try {
				Thread.sleep(10L);
			} catch (InterruptedException ex) {
				// no op
			}
		}
		try {
			Thread.sleep(500L);
		} catch (InterruptedException ex) {
			// no op
		}
		System.out.println(pafd.simpleingWindow.toString());
		pafd.interpret();
		// second try result phi > phiSuspectThreshold
		for (int i = 0; i < 10; i++) {
			pafd.addSample();
			try {
				Thread.sleep(10L);
			} catch (InterruptedException ex) {
				// no op
			}
		}
		try {
			Thread.sleep(1500L);
		} catch (InterruptedException ex) {
			// no op
		}
		System.out.println(pafd.simpleingWindow.toString());
		pafd.interpret();
	}

	static class SamplingWindow {
		private final Lock lock = new ReentrantLock();
		private double lastTimeStamp = 0L;
		private StatisticDeque arrivalIntervals;

		SamplingWindow(int size) {
			arrivalIntervals = new StatisticDeque(size);
		}

		void add(double value) {
			lock.lock();
			try {
				double interval;
				if (lastTimeStamp > 0L) {
					interval = (value - lastTimeStamp);
				} else {
					interval = 1000 / 2;
				}
				lastTimeStamp = value;
				arrivalIntervals.add(interval);
			} finally {
				lock.unlock();
			}
		}

		double sum() {
			lock.lock();
			try {
				return arrivalIntervals.sum();
			} finally {
				lock.unlock();
			}
		}

		double sumOfDeviations() {
			lock.lock();
			try {
				return arrivalIntervals.sumOfDeviations();
			} finally {
				lock.unlock();
			}
		}

		double mean() {
			lock.lock();
			try {
				return arrivalIntervals.mean();
			} finally {
				lock.unlock();
			}
		}

		double variance() {
			lock.lock();
			try {
				return arrivalIntervals.variance();
			} finally {
				lock.unlock();
			}
		}

		double stdev() {
			lock.lock();
			try {
				return arrivalIntervals.stdev();
			} finally {
				lock.unlock();
			}
		}

		void clear() {
			lock.lock();
			try {
				arrivalIntervals.clear();
			} finally {
				lock.unlock();
			}
		}

		/**
		 * 
		 * p = E ^ (-1 * (tnow - lastTimeStamp) / mean)
		 */
		double p(double t) {
			double mean = mean();
			double exponent = (-1) * (t) / mean;
			return Math.pow(Math.E, exponent);
		}

		double phi(long tnow) {
			int size = arrivalIntervals.size();
			double log = 0d;
			if (size > 0) {
				double t = tnow - lastTimeStamp;
				double probability = p(t);
				log = (-1) * Math.log10(probability);
			}
			return log;
		}

		@Override
		public String toString() {
			StringBuilder s = new StringBuilder();
			for (Iterator<Double> it = arrivalIntervals.iterator(); it.hasNext();) {
				s.append(it.next()).append(" ");
			}
			return s.toString();
		}
	}

	static class StatisticDeque implements Iterable<Double> {
		private final int size;
		protected final ArrayDeque<Double> queue;

		public StatisticDeque(int size) {
			this.size = size;
			queue = new ArrayDeque<Double>(size);
		}

		public Iterator<Double> iterator() {
			return queue.iterator();
		}

		public int size() {
			return queue.size();
		}

		public void clear() {
			queue.clear();
		}

		public void add(double o) {
			if (size == queue.size()) {
				queue.remove();
			}
			queue.add(o);
		}

		public double sum() {
			double sum = 0D;
			for (Double interval : this) {
				sum += interval;
			}
			return sum;
		}

		public double sumOfDeviations() {
			double sumOfDeviations = 0D;
			double mean = mean();
			for (Double interval : this) {
				double d = interval - mean;
				sumOfDeviations += d * d;
			}
			return sumOfDeviations;
		}

		public double mean() {
			return sum() / size();
		}

		public double variance() {
			return sumOfDeviations() / size();
		}

		public double stdev() {
			return Math.sqrt(variance());
		}
	}
}