Kafkaソース分析(3)
13492 ワード
三API Layer
1、KafkaApis
このクラスは様々なAPIのカプセル化であり、受信したrequestIdによってどのAPIが呼び出されるかを決定し、最も重要なhandle()メソッドは以下の通りである.def handle(request: RequestChannel.Request) {
try{
trace("Handling request: " + request.requestObj + " from client: " + request.remoteAddress)
request.requestId match {
case RequestKeys.ProduceKey => handleProducerOrOffsetCommitRequest(request)
case RequestKeys.FetchKey => handleFetchRequest(request)
case RequestKeys.OffsetsKey => handleOffsetRequest(request)
case RequestKeys.MetadataKey => handleTopicMetadataRequest(request)
case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request)
case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request)
case RequestKeys.UpdateMetadataKey => handleUpdateMetadataRequest(request)
case RequestKeys.ControlledShutdownKey => handleControlledShutdownRequest(request)
case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request)
case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request)
case RequestKeys.ConsumerMetadataKey => handleConsumerMetadataRequest(request)
case requestId => throw new KafkaException("Unknown api code " + requestId)
}
} catch {
case e: Throwable =>
request.requestObj.handleError(e, requestChannel, request)
error("error when handling request %s".format(request.requestObj), e)
} finally
request.apiLocalCompleteTimeMs = SystemTime.milliseconds
}
このバージョンでは、handleProducerRequestとhandleOffsetCommitRequestの2つのメソッドがhandleProducerOrOffsetCommitRequestの1つのメソッド(0.8以前は別々)に統合されています.producerはメッセージを生成した後もoffsetcommitを行うため、2つの操作のほとんどのコードは似ています.以下のコードからkafkaはrequestに従う.requestIdはrequestオブジェクトをカプセル化しproduceRequestに従う.requiredAcksの値(requestが確認する必要があるかどうか、すなわち同期するか非同期か)は、データを処理し、戻り値を生成するために使用されます. def handleProducerOrOffsetCommitRequest(request: RequestChannel.Request) {
val (produceRequest, offsetCommitRequestOpt) =
if (request.requestId == RequestKeys.OffsetCommitKey) {
val offsetCommitRequest = request.requestObj.asInstanceOf[OffsetCommitRequest]
OffsetCommitRequest.changeInvalidTimeToCurrentTime(offsetCommitRequest)
(producerRequestFromOffsetCommit(offsetCommitRequest), Some(offsetCommitRequest))
} else {
(request.requestObj.asInstanceOf[ProducerRequest], None)
}
if (produceRequest.requiredAcks > 1 || produceRequest.requiredAcks < -1) {
warn(("Client %s from %s sent a produce request with request.required.acks of %d, which is now deprecated and will " +
"be removed in next release. Valid values are -1, 0 or 1. Please consult Kafka documentation for supported " +
"and recommended configuration.").format(produceRequest.clientId, request.remoteAddress, produceRequest.requiredAcks))
}
val sTime = SystemTime.milliseconds
val localProduceResults = appendToLocalLog(produceRequest, offsetCommitRequestOpt.nonEmpty)
debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime))
val firstErrorCode = localProduceResults.find(_.errorCode != ErrorMapping.NoError).map(_.errorCode).getOrElse(ErrorMapping.NoError)
val numPartitionsInError = localProduceResults.count(_.error.isDefined)
if(produceRequest.requiredAcks == 0) {
// no operation needed if producer request.required.acks = 0; however, if there is any exception in handling the request, since
// no response is expected by the producer the handler will send a close connection response to the socket server
// to close the socket so that the producer client will know that some exception has happened and will refresh its metadata
if (numPartitionsInError != 0) {
info(("Send the close connection response due to error handling produce request " +
"[clientId = %s, correlationId = %s, topicAndPartition = %s] with Ack=0")
.format(produceRequest.clientId, produceRequest.correlationId, produceRequest.topicPartitionMessageSizeMap.keySet.mkString(",")))
requestChannel.closeConnection(request.processor, request)
} else {
if (firstErrorCode == ErrorMapping.NoError)
offsetCommitRequestOpt.foreach(ocr => offsetManager.putOffsets(ocr.groupId, ocr.requestInfo))
if (offsetCommitRequestOpt.isDefined) {
val response = offsetCommitRequestOpt.get.responseFor(firstErrorCode, config.offsetMetadataMaxSize)
requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
} else
requestChannel.noOperation(request.processor, request)
}
} else if (produceRequest.requiredAcks == 1 ||
produceRequest.numPartitions <= 0 ||
numPartitionsInError == produceRequest.numPartitions) {
if (firstErrorCode == ErrorMapping.NoError) {
offsetCommitRequestOpt.foreach(ocr => offsetManager.putOffsets(ocr.groupId, ocr.requestInfo) )
}
val statuses = localProduceResults.map(r => r.key -> ProducerResponseStatus(r.errorCode, r.start)).toMap
val response = offsetCommitRequestOpt.map(_.responseFor(firstErrorCode, config.offsetMetadataMaxSize))
.getOrElse(ProducerResponse(produceRequest.correlationId, statuses))
requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
} else {
// create a list of (topic, partition) pairs to use as keys for this delayed request
val producerRequestKeys = produceRequest.data.keys.toSeq
val statuses = localProduceResults.map(r =>
r.key -> DelayedProduceResponseStatus(r.end + 1, ProducerResponseStatus(r.errorCode, r.start))).toMap
val delayedRequest = new DelayedProduce(
producerRequestKeys,
request,
produceRequest.ackTimeoutMs.toLong,
produceRequest,
statuses,
offsetCommitRequestOpt)
// add the produce request for watch if it's not satisfied, otherwise send the response back
val satisfiedByMe = producerRequestPurgatory.checkAndMaybeWatch(delayedRequest)
if (satisfiedByMe)
producerRequestPurgatory.respond(delayedRequest)
}
// we do not need the data anymore
produceRequest.emptyData()
}
もう1つの注目すべき方法はhandleOffsetCommitRequestで、ある消費者がpartitionのデータを消費した後、offsetを自動的にコミットします(もちろん手動でコミットを呼び出すこともできます).しかし、ここで問題なのは、consumer 1とconsumer 2がpartition 1のデータを消費している場合、consumer 1がoffsetを先に提出し、この場合consumer 2 crash、この場合consumer 2がまだ処理していないoffset前のデータが失われ、これは典型的なat most onceメカニズムであり、つまり一定のデータが失われるリスクがあるということです.この方法ではoffsetCommitRequestと判断する.versionIdの値は、0(旧バージョン呼び出し)の場合はoffset値をzookeeperに格納し、1の場合は前述のhandleProducerOffsetCommitRequestメソッドを呼び出し、offset値を特定のtopicに格納する(OffsetManagerクラスはこの作業を処理するために使用される): def handleOffsetCommitRequest(request: RequestChannel.Request) {
val offsetCommitRequest = request.requestObj.asInstanceOf[OffsetCommitRequest]
if (offsetCommitRequest.versionId == 0) {
// version 0 stores the offsets in ZK
val responseInfo = offsetCommitRequest.requestInfo.map{
case (topicAndPartition, metaAndError) => {
val topicDirs = new ZKGroupTopicDirs(offsetCommitRequest.groupId, topicAndPartition.topic)
try {
ensureTopicExists(topicAndPartition.topic)
if(metaAndError.metadata != null && metaAndError.metadata.length > config.offsetMetadataMaxSize) {
(topicAndPartition, ErrorMapping.OffsetMetadataTooLargeCode)
} else {
ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" +
topicAndPartition.partition, metaAndError.offset.toString)
(topicAndPartition, ErrorMapping.NoError)
}
} catch {
case e: Throwable => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
}
}
}
val response = new OffsetCommitResponse(responseInfo, offsetCommitRequest.correlationId)
requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
} else {
// version 1 and above store the offsets in a special Kafka topic
handleProducerOrOffsetCommitRequest(request)
}
}
残りの方法は古いバージョンとコードが大きく変化せず、詳細には展開されません.
2、****Request/****Response
****オプションの文字は、OffsetCommit、LeaderAndIsr、StopReplica、UpdateMetadataなどがあります.これらのクラスはAPI層の様々な要求と応答メッセージをパッケージ化するために使用され、メッセージを実質的に処理しないため、中のコードも簡単で、基本的にはシーケンス化/逆シーケンス化の仕事である.通常、Request/Responseでは、次の方法があります.
readFrom():ByteBufferからデータを読み込み、オブジェクトを構築します.
writeTo():ByteBufferにデータを書き込む
sizeInBytes()sizeInBytes()sizeInBytes()sizeInBytes()sizeInBytes()sizeInBytes()sizeInBytes()sizeInBytes()sizeInBytes()
describe(),toString():メッセージをjsonのstring形式に変換
handleError():Requestにのみ存在し、異常時に特定のResponseを生成するために使用されます.
コードは列にありません.
3、RequestPurgatory
このクラスはProducerRequestPurgatoryとFetchRequestPurgatoryの2つのクラスの親です.Purgatoryの本意は「煉獄」であり、kafkaでは実際にバッファという意味であり、この2つのサブクラスの頭部注釈からも分かるように、The purgatory holding delayed producer requests/the purgatoryholding delayed fetch requests.
まず見に来てscalaファイルでは、ヘッダにDelayedRequestクラスが定義されています.このクラスはベースクラスDelayedItemを継承しています(詳しくは説明しません).また、値の付与操作が原子であることを保証する原子のブール値もあります.このクラスの機能は、遅延リクエストをカプセル化するために使用されるヘッダコメントページでよく説明されています./**
* A request whose processing needs to be delayed for at most the given delayMs
* The associated keys are used for bookeeping, and represent the "trigger" that causes this request to check if it is satisfied,
* for example a key could be a (topic, partition) pair.
*/
class DelayedRequest(val keys: Seq[Any], val request: RequestChannel.Request, delayMs: Long) extends DelayedItem[RequestChannel.Request](request, delayMs) {
val satisfied = new AtomicBoolean(false)
}
次に抽象クラスRequestPurgatoryです.
isSatisfiedByMe():このrequestのsatisfiedフィールドを設定しようとする具体的な方法
checkAndMaybeWatch():満足できるrequestを満足にし、満足できないrequestを観察キューに追加するための具体的な方法.スキャン観察キューのスレッドが1つより大きい可能性があるため、ここでは2段階スキャンのテクニックを用いて、コード内の注釈の説明を見ることができます.// The cost of checkSatisfied() is typically proportional to the number of keys. Calling
// checkSatisfied() for each key is going to be expensive if there are many keys. Instead,
// we do the check in the following way. Call checkSatisfied(). If the request is not satisfied,
// we just add the request to all keys. Then we call checkSatisfied() again. At this time, if
// the request is still not satisfied, we are guaranteed that it won't miss any future triggering
// events since the request is already on the watcher list for all keys. This does mean that
// if the request is satisfied (by another thread) between the two checkSatisfied() calls, the
// request is unnecessarily added for watch. However, this is a less severe issue since the
// expire reaper will clean it up periodically.
update():指定したkeyのすべてのwatcherの最新の満足要求リストを取得するための具体的な方法.
checkSatisfied():抽象メソッド、サブクラス独自の実装が必要
expire():抽象メソッド、サブクラス独自の実装が必要
4、ProducerRequestPurgatory/FetchRequestPurgatory
ProducerRequestPurgatoryクラス、2つの抽象的なメソッドの実装は次のとおりです./**
* Check if a specified delayed fetch request is satisfied
*/
def checkSatisfied(delayedProduce: DelayedProduce) = delayedProduce.isSatisfied(replicaManager)
/**
* When a delayed produce request expires answer it with possible time out error codes
*/
def expire(delayedProduce: DelayedProduce) {
debug("Expiring produce request %s.".format(delayedProduce.produce))
for ((topicPartition, responseStatus) <- delayedProduce.partitionStatus if responseStatus.acksPending)
recordDelayedProducerKeyExpired(topicPartition)
respond(delayedProduce)
}
このメッセージはDelayedProduceを使用して特化されており、このクラスのヘッダでは、このタイプのrequestが以下の条件で満たされていることが示されています.
A.brokerがリーダーでない場合、エラーを返します
B.brokerがleader,1である場合、localErrorが発生した場合、エラーが返される.2、それ以外の場合、少なくともrequiredAcks個のバックアップがこの要求に返されます.
FetchRequestPurgatoryクラスでは、2つの抽象的なメソッドが次のように実装されています./**
* Check if a specified delayed fetch request is satisfied
*/
def checkSatisfied(delayedFetch: DelayedFetch): Boolean = delayedFetch.isSatisfied(replicaManager)
/**
* When a delayed fetch request expires just answer it with whatever data is present
*/
def expire(delayedFetch: DelayedFetch) {
debug("Expiring fetch request %s.".format(delayedFetch.fetch))
val fromFollower = delayedFetch.fetch.isFromFollower
recordDelayedFetchExpired(fromFollower)
respond(delayedFetch)
}
このメッセージはDelayedFetchを使用して特化されており、このクラスのヘッダでは、このタイプのrequestが以下の条件で満たされていることが示されています.
A.現在のbrokerがfetch操作に必要な一部のpartitionのリーダーではない場合、他のリーダーであるpartitionのデータを返す
B.現在のbrokerはfetch操作に必要ないくつかのpartitionを認識できず、他のpartitionのデータを返す
C.fetch操作要求のoffsetはlogの最後のセグメント(segment、logはsegmentファイルの山の形で格納されている)内になく、そのセグメントのすべてのデータを返す必要がある
D.累計fetchのbyte数が当該FetchRequestが設定した最小需要byte数より大きい場合、利用可能な全てのデータを返す
def handle(request: RequestChannel.Request) {
try{
trace("Handling request: " + request.requestObj + " from client: " + request.remoteAddress)
request.requestId match {
case RequestKeys.ProduceKey => handleProducerOrOffsetCommitRequest(request)
case RequestKeys.FetchKey => handleFetchRequest(request)
case RequestKeys.OffsetsKey => handleOffsetRequest(request)
case RequestKeys.MetadataKey => handleTopicMetadataRequest(request)
case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request)
case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request)
case RequestKeys.UpdateMetadataKey => handleUpdateMetadataRequest(request)
case RequestKeys.ControlledShutdownKey => handleControlledShutdownRequest(request)
case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request)
case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request)
case RequestKeys.ConsumerMetadataKey => handleConsumerMetadataRequest(request)
case requestId => throw new KafkaException("Unknown api code " + requestId)
}
} catch {
case e: Throwable =>
request.requestObj.handleError(e, requestChannel, request)
error("error when handling request %s".format(request.requestObj), e)
} finally
request.apiLocalCompleteTimeMs = SystemTime.milliseconds
}
def handleProducerOrOffsetCommitRequest(request: RequestChannel.Request) {
val (produceRequest, offsetCommitRequestOpt) =
if (request.requestId == RequestKeys.OffsetCommitKey) {
val offsetCommitRequest = request.requestObj.asInstanceOf[OffsetCommitRequest]
OffsetCommitRequest.changeInvalidTimeToCurrentTime(offsetCommitRequest)
(producerRequestFromOffsetCommit(offsetCommitRequest), Some(offsetCommitRequest))
} else {
(request.requestObj.asInstanceOf[ProducerRequest], None)
}
if (produceRequest.requiredAcks > 1 || produceRequest.requiredAcks < -1) {
warn(("Client %s from %s sent a produce request with request.required.acks of %d, which is now deprecated and will " +
"be removed in next release. Valid values are -1, 0 or 1. Please consult Kafka documentation for supported " +
"and recommended configuration.").format(produceRequest.clientId, request.remoteAddress, produceRequest.requiredAcks))
}
val sTime = SystemTime.milliseconds
val localProduceResults = appendToLocalLog(produceRequest, offsetCommitRequestOpt.nonEmpty)
debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime))
val firstErrorCode = localProduceResults.find(_.errorCode != ErrorMapping.NoError).map(_.errorCode).getOrElse(ErrorMapping.NoError)
val numPartitionsInError = localProduceResults.count(_.error.isDefined)
if(produceRequest.requiredAcks == 0) {
// no operation needed if producer request.required.acks = 0; however, if there is any exception in handling the request, since
// no response is expected by the producer the handler will send a close connection response to the socket server
// to close the socket so that the producer client will know that some exception has happened and will refresh its metadata
if (numPartitionsInError != 0) {
info(("Send the close connection response due to error handling produce request " +
"[clientId = %s, correlationId = %s, topicAndPartition = %s] with Ack=0")
.format(produceRequest.clientId, produceRequest.correlationId, produceRequest.topicPartitionMessageSizeMap.keySet.mkString(",")))
requestChannel.closeConnection(request.processor, request)
} else {
if (firstErrorCode == ErrorMapping.NoError)
offsetCommitRequestOpt.foreach(ocr => offsetManager.putOffsets(ocr.groupId, ocr.requestInfo))
if (offsetCommitRequestOpt.isDefined) {
val response = offsetCommitRequestOpt.get.responseFor(firstErrorCode, config.offsetMetadataMaxSize)
requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
} else
requestChannel.noOperation(request.processor, request)
}
} else if (produceRequest.requiredAcks == 1 ||
produceRequest.numPartitions <= 0 ||
numPartitionsInError == produceRequest.numPartitions) {
if (firstErrorCode == ErrorMapping.NoError) {
offsetCommitRequestOpt.foreach(ocr => offsetManager.putOffsets(ocr.groupId, ocr.requestInfo) )
}
val statuses = localProduceResults.map(r => r.key -> ProducerResponseStatus(r.errorCode, r.start)).toMap
val response = offsetCommitRequestOpt.map(_.responseFor(firstErrorCode, config.offsetMetadataMaxSize))
.getOrElse(ProducerResponse(produceRequest.correlationId, statuses))
requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
} else {
// create a list of (topic, partition) pairs to use as keys for this delayed request
val producerRequestKeys = produceRequest.data.keys.toSeq
val statuses = localProduceResults.map(r =>
r.key -> DelayedProduceResponseStatus(r.end + 1, ProducerResponseStatus(r.errorCode, r.start))).toMap
val delayedRequest = new DelayedProduce(
producerRequestKeys,
request,
produceRequest.ackTimeoutMs.toLong,
produceRequest,
statuses,
offsetCommitRequestOpt)
// add the produce request for watch if it's not satisfied, otherwise send the response back
val satisfiedByMe = producerRequestPurgatory.checkAndMaybeWatch(delayedRequest)
if (satisfiedByMe)
producerRequestPurgatory.respond(delayedRequest)
}
// we do not need the data anymore
produceRequest.emptyData()
}
def handleOffsetCommitRequest(request: RequestChannel.Request) {
val offsetCommitRequest = request.requestObj.asInstanceOf[OffsetCommitRequest]
if (offsetCommitRequest.versionId == 0) {
// version 0 stores the offsets in ZK
val responseInfo = offsetCommitRequest.requestInfo.map{
case (topicAndPartition, metaAndError) => {
val topicDirs = new ZKGroupTopicDirs(offsetCommitRequest.groupId, topicAndPartition.topic)
try {
ensureTopicExists(topicAndPartition.topic)
if(metaAndError.metadata != null && metaAndError.metadata.length > config.offsetMetadataMaxSize) {
(topicAndPartition, ErrorMapping.OffsetMetadataTooLargeCode)
} else {
ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" +
topicAndPartition.partition, metaAndError.offset.toString)
(topicAndPartition, ErrorMapping.NoError)
}
} catch {
case e: Throwable => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
}
}
}
val response = new OffsetCommitResponse(responseInfo, offsetCommitRequest.correlationId)
requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
} else {
// version 1 and above store the offsets in a special Kafka topic
handleProducerOrOffsetCommitRequest(request)
}
}
/**
* A request whose processing needs to be delayed for at most the given delayMs
* The associated keys are used for bookeeping, and represent the "trigger" that causes this request to check if it is satisfied,
* for example a key could be a (topic, partition) pair.
*/
class DelayedRequest(val keys: Seq[Any], val request: RequestChannel.Request, delayMs: Long) extends DelayedItem[RequestChannel.Request](request, delayMs) {
val satisfied = new AtomicBoolean(false)
}
// The cost of checkSatisfied() is typically proportional to the number of keys. Calling
// checkSatisfied() for each key is going to be expensive if there are many keys. Instead,
// we do the check in the following way. Call checkSatisfied(). If the request is not satisfied,
// we just add the request to all keys. Then we call checkSatisfied() again. At this time, if
// the request is still not satisfied, we are guaranteed that it won't miss any future triggering
// events since the request is already on the watcher list for all keys. This does mean that
// if the request is satisfied (by another thread) between the two checkSatisfied() calls, the
// request is unnecessarily added for watch. However, this is a less severe issue since the
// expire reaper will clean it up periodically.
/**
* Check if a specified delayed fetch request is satisfied
*/
def checkSatisfied(delayedProduce: DelayedProduce) = delayedProduce.isSatisfied(replicaManager)
/**
* When a delayed produce request expires answer it with possible time out error codes
*/
def expire(delayedProduce: DelayedProduce) {
debug("Expiring produce request %s.".format(delayedProduce.produce))
for ((topicPartition, responseStatus) <- delayedProduce.partitionStatus if responseStatus.acksPending)
recordDelayedProducerKeyExpired(topicPartition)
respond(delayedProduce)
}
/**
* Check if a specified delayed fetch request is satisfied
*/
def checkSatisfied(delayedFetch: DelayedFetch): Boolean = delayedFetch.isSatisfied(replicaManager)
/**
* When a delayed fetch request expires just answer it with whatever data is present
*/
def expire(delayedFetch: DelayedFetch) {
debug("Expiring fetch request %s.".format(delayedFetch.fetch))
val fromFollower = delayedFetch.fetch.isFromFollower
recordDelayedFetchExpired(fromFollower)
respond(delayedFetch)
}