Rippleでの高同時要求の処理方法
5882 ワード
1.役割と制限
次のコードは、要求を送信したクライアントipに基づいてクライアントのロールを判断し、ロールによって異なる処理を行い、adminロールのユーザーは同時制限がなく、一般ユーザーは現在接続を切断すべきかどうかを判断します.func:ServerHandlerImp::processRequest
...
Resource::Consumer usage;
if (isUnlimited(role))
{
usage = m_resourceManager.newUnlimitedEndpoint(
remoteIPAddress.to_string());
}
else
{
usage = m_resourceManager.newInboundEndpoint(remoteIPAddress);
if (usage.disconnect())
{
HTTPReply(503, "Server is overloaded", output, rpcJ);
return;
}
}
adminユーザー
adminユーザーはプロファイルで構成できます.[port_ws_public]
port = 6006
ip = 0.0.0.0
admin = 192.168.0.133,192.168.0.144
protocol = ws
adminに構成されたipは同時制約を受けないため、既知のクライアントがノードに大量の要求を送信する場合は、接続が切断されないように、このクライアントのipをadminリストに追加できます.
一般ユーザー
Rippleでは、クライアントごとに1つの同時限度額があり、各要求は要求タイプによって対応する同時費用を受け取り、同時費用が限度額(15000*32)を超えると、次回の要求は接続を切断します.
同時費用の計算:送信要求費用が増加し、要求を送信しない場合、費用が減少し、ルールが減少します.// A span larger than four times the window decays the
// value to an insignificant amount so just reset it.
//
if (elapsed > 4 * Window)
{
m_value = value_type();
}
else
{
while (elapsed--)
m_value -= (m_value + Window - 1) / Window;
}
上のコード:
func:ServerHandlerImp::processRequest
...
Resource::Consumer usage;
if (isUnlimited(role))
{
usage = m_resourceManager.newUnlimitedEndpoint(
remoteIPAddress.to_string());
}
else
{
usage = m_resourceManager.newInboundEndpoint(remoteIPAddress);
if (usage.disconnect())
{
HTTPReply(503, "Server is overloaded", output, rpcJ);
return;
}
}
[port_ws_public]
port = 6006
ip = 0.0.0.0
admin = 192.168.0.133,192.168.0.144
protocol = ws
// A span larger than four times the window decays the
// value to an insignificant amount so just reset it.
//
if (elapsed > 4 * Window)
{
m_value = value_type();
}
else
{
while (elapsed--)
m_value -= (m_value + Window - 1) / Window;
}
2.FeeEscalation特性
オープンモード
FeeEscalationプロパティを開くには、2つの方法があります.プロファイルで直接構成できます.[features]
FeeEscalation
チェーン起動2週間で自動的に開くこともできます
機能
FeeEscalationプロパティをオンにすると、ノードが現在の共通認識の取引数が多いことを発見すると、新しい取引が並んでterQUEUEDに戻り、このエラーコードを返すと、取引は依然として成功する可能性が高いので、tesSUCCESSに戻るように共通認識結果を確認する必要があります.
キューのパラメータについては、次のいずれかが重要です.[transaction_queue]
minimum_txn_in_ledger = 200
この配置は、現在のブロックで共通認識できる取引数を示しています.つまり、現在共通認識されている取引量が200に達した場合、後の取引はキューに並ぶ必要があります.
実装方法:
1つの取引が送られてくると、次の流れが通ります.
[features]
FeeEscalation
[transaction_queue]
minimum_txn_in_ledger = 200
// We may need the base fee for multiple transactions
// or transaction replacement, so just pull it up now.
// TODO: Do we want to avoid doing it again during
// preclaim?
auto const baseFee = calculateBaseFee(app, view, *tx, j);
auto const feeLevelPaid = getFeeLevelPaid(*tx,
baseLevel, baseFee, setup_);
auto const requiredFeeLevel = [&]()
{
auto feeLevel = FeeMetrics::scaleFeeLevel(metricsSnapshot, view);
if ((flags & tapPREFER_QUEUE) && byFee_.size())
{
return std::max(feeLevel, byFee_.begin()->feeLevel);
}
return feeLevel;
}();
scaleFeeLevelは重要です.次は実装です.
std::uint64_t
TxQ::FeeMetrics::scaleFeeLevel(Snapshot const& snapshot,
OpenView const& view, std::uint32_t txCountPadding)
{
// Transactions in the open ledger so far
auto const current = view.txCount() + txCountPadding;
auto const target = snapshot.txnsExpected;
auto const multiplier = snapshot.escalationMultiplier;
// Once the open ledger bypasses the target,
// escalate the fee quickly.
if (current > target)
{
// Compute escalated fee level
// Don't care about the overflow flag
return mulDiv(multiplier, current * current,
target * target).second;
}
return baseLevel;
}
std::uint32_t minimumEscalationMultiplier = baseLevel * 500;
中のtxnsExpectedの値は私たちが構成したminimumです.txn_in_ledgerここでmultiplierのデフォルト値は128000で、currentとtargetの差が大きいほど、最終的な費用が高くなることがわかります.
FeeMultiSet byFee_;
AccountMap byAccount_;
byFeeは、費用順の取引セットbyAccount_です.口座別mapです
RCLConsensus::Adaptor::doAccept
...
// Build new open ledger
auto lock = make_lock(app_.getMasterMutex(), std::defer_lock);
auto sl = make_lock(ledgerMaster_.peekMutex(), std::defer_lock);
std::lock(lock, sl);
auto const lastVal = ledgerMaster_.getValidatedLedger();
boost::optional rules;
if (lastVal)
rules.emplace(*lastVal, app_.config().features);
else
rules.emplace(app_.config().features);
app_.openLedger().accept(
app_,
*rules,
sharedLCL.ledger_,
localTxs_.getTxSet(),
anyDisputes,
retriableTxs,
tapNONE,
"consensus",
[&](OpenView& view, beast::Journal j) {
// Stuff the ledger with transactions from the queue.
return app_.getTxQ().accept(app_, view);
});
...
TxQ::acceptではまだ料金の制限があります
TxQ::accept
...
auto const requiredFeeLevel = FeeMetrics::scaleFeeLevel(
metricSnapshot, view);
auto const feeLevelPaid = candidateIter->feeLevel;
if (feeLevelPaid >= requiredFeeLevel)
{
auto firstTxn = candidateIter->txn;
JLOG(j_.trace()) << "Applying queued transaction " <<
candidateIter->txID << " to open ledger.";
TER txnResult;
bool didApply;
std::tie(txnResult, didApply) = candidateIter->apply(app, view);
...
TxQ::acceptで反発量が使用されている
std::lock_guard<:mutex> lock(mutex_);
TxQ::applyと同時反発