Flumeの失敗選択マシンとBACKOFFの仕組み
6368 ワード
OrderSelectorでJAvaで
デフォルトはround_robin、すなわちポーリング.
backoffアルゴリズムは指数関数ロールバックであり、コードは以下の通りである.
一度失敗すると、restoreTimeはmaxTimeoutと左に移動します.
使用可能なマシンに戻る
RoundRobinOrderSelector.java
転載先:https://www.cnblogs.com/briller/p/3741003.html
デフォルトはround_robin、すなわちポーリング.
backoffアルゴリズムは指数関数ロールバックであり、コードは以下の通りである.
public void informFailure(T failedObject) {
//If there is no backoff this method is a no-op.
if (!shouldBackOff) {
return;
}
FailureState state = stateMap.get(failedObject);
long now = System.currentTimeMillis();
long delta = now - state.lastFail;
/*
* When do we increase the backoff period?
* We basically calculate the time difference between the last failure
* and the current one. If this failure happened within one hour of the
* last backoff period getting over, then we increase the timeout,
* since the object did not recover yet. Else we assume this is a fresh
* failure and reset the count.
*/
long lastBackoffLength = Math.min(maxTimeout, 1000 * (1 << state.sequentialFails));
long allowableDiff = lastBackoffLength + CONSIDER_SEQUENTIAL_RANGE;
if (allowableDiff > delta) {
if (state.sequentialFails < EXP_BACKOFF_COUNTER_LIMIT) {
state.sequentialFails++;
}
} else {
state.sequentialFails = 1;
}
state.lastFail = now;
//Depending on the number of sequential failures this component had, delay
//its restore time. Each time it fails, delay the restore by 1000 ms,
//until the maxTimeOut is reached.
state.restoreTime = now + Math.min(maxTimeout, 1000 * (1 << state.sequentialFails));
}
一度失敗すると、restoreTimeはmaxTimeoutと左に移動します.
/**
*
* @return - List of indices currently active objects
*/
protected List getIndexList() {
long now = System.currentTimeMillis();
List indexList = new ArrayList();
int i = 0;
for (T obj : stateMap.keySet()) {
if (!isShouldBackOff() || stateMap.get(obj).restoreTime < now) {
indexList.add(i);
}
i++;
}
return indexList;
}
使用可能なマシンに戻る
RoundRobinOrderSelector.java
public class RoundRobinOrderSelector extends OrderSelector {
private int nextHead = 0;
public RoundRobinOrderSelector(boolean shouldBackOff) {
super(shouldBackOff);
}
@Override
public Iterator createIterator() {
List activeIndices = getIndexList();
int size = activeIndices.size();
// possible that the size has shrunk so gotta adjust nextHead for that
if (nextHead >= size) {
nextHead = 0;
}
int begin = nextHead++;
if (nextHead == activeIndices.size()) {
nextHead = 0;
}
int[] indexOrder = new int[size];
for (int i = 0; i < size; i++) {
indexOrder[i] = activeIndices.get((begin + i) % size);
}
return new SpecificOrderIterator(indexOrder, getObjects());
}
}
転載先:https://www.cnblogs.com/briller/p/3741003.html