KafkaController分析2-NetworkClient分析
7631 ワード
2.KafkaControllerのコントロールと他のbrokerとの通信は、NetworkClientを使用して実現される.
InFlightRequestsクラス
private final Map> requests = new HashMap>();
は、接続ごとにDeque
データ構造を使用してすべてのrequestを保存する.Deque
は両端キューです.addFirst
を通じてチームトップpublic void add(ClientRequest request) {
Deque reqs = this.requests.get(request.request().destination());
if (reqs == null) {
reqs = new ArrayDeque<>();
this.requests.put(request.request().destination(), reqs);
}
reqs.addFirst(request);
}
pollLast()
によりpublic ClientRequest completeNext(String node) {
return requestQueue(node).pollLast();
}
public boolean canSendMore(String node)
は、NetworkClientを介して送信されたリクエストについて、NetworkClientを介して送信するリクエストをNetworkClientを介して送信することができるかどうかを決定し、以前に送信されたリクエストが最下位のsocketを介して実際に送信されなかった場合、新しいリクエストの送信を許可しないpublic boolean canSendMore(String node) {
Deque queue = requests.get(node);
return queue == null || queue.isEmpty() ||
(queue.peekFirst().request().completed() && queue.size() < this.maxInFlightRequestsPerConnection);
}
```
# ClusterConnectionStates
* :clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
* broker node :
`private final Map nodeState`
* node , :
`private final long reconnectBackoffMs`
* :
```
ConnectionState.DISCONNECTED --
ConnectionState.DISCONNECTING --
ConnectionState.CONNECTED --
```
* `canConnect`: node: `reconnectBackoffMs`, ;
```
public boolean canConnect(String id, long now) {
NodeConnectionState state = nodeState.get(id);
if (state == null)
return true;
else
return state.state == ConnectionState.DISCONNECTED && now - state.lastConnectAttemptMs >= this.reconnectBackoffMs;
}
```
# NetworkClien
* : clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
*
* `KafkaClient`
* `org.apache.kafka.common.network.Selector` IO, [ => Kafka - ](http://www.jianshu.com/p/8cbc7618abcb)
* broker node , :
>A network client for asynchronous request/response network i/o. This is an internal class used to implement the user-facing producer and consumer clients.
* `poll`
`selector.poll` socket ;
```
long metadataTimeout = metadataUpdater.maybeUpdate(now);
try {
this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
} catch (IOException e) {
log.error("Unexpected error during I/O", e);
}
```
`selector.poll` , ** requet**, ** response**, ** **, ** ** `selector` ;
* request
```
private void handleCompletedSends(List responses, long now) {
// if no response is expected then when the send is completed, return it
for (Send send : this.selector.completedSends()) {
ClientRequest request = this.inFlightRequests.lastSent(send.destination());
if (!request.expectResponse()) {
this.inFlightRequests.completeLastSent(send.destination());
responses.add(new ClientResponse(request, now, false, null));
}
}
}
```
response , `ifFlightRequests` ;
* response
```
private void handleCompletedReceives(List responses, long now) {
for (NetworkReceive receive : this.selector.completedReceives()) {
String source = receive.source();
ClientRequest req = inFlightRequests.completeNext(source);
ResponseHeader header = ResponseHeader.parse(receive.payload());
// Always expect the response version id to be the same as the request version id
short apiKey = req.request().header().apiKey();
short apiVer = req.request().header().apiVersion();
Struct body = ProtoUtils.responseSchema(apiKey, apiVer).read(receive.payload());
correlate(req.request().header(), header);
if (!metadataUpdater.maybeHandleCompletedReceive(req, now, body))
responses.add(new ClientResponse(req, now, false, body));
}
}
```
`metadata` response, `metadataUpdater.maybeHandleCompletedReceive` metadata ;
*
```
private void handleConnections() {
for (String node : this.selector.connected()) {
log.debug("Completed connection to node {}", node);
this.connectionStates.connected(node);
}
}
```
* `handle***` responses
```
List responses = new ArrayList<>();
handleCompletedSends(responses, updatedNow);
handleCompletedReceives(responses, updatedNow);
handleDisconnections(responses, updatedNow);
handleConnections();
handleTimedOutRequests(responses, updatedNow);
// invoke callbacks
for (ClientResponse response : responses) {
if (response.request().hasCallback()) {
try {
response.request().callback().onComplete(response);
} catch (Exception e) {
log.error("Uncaught error in request completion:", e);
}
}
}
```
# NetworkClientBlockingOps
* : core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala
* `NetworkClient` , ;
* `Client.ready`
```
def blockingReady(node: Node, timeout: Long)(implicit time: JTime): Boolean = {
client.ready(node, time.milliseconds()) || pollUntil(timeout) { (_, now) =>
if (client.isReady(node, now))
true
else if (client.connectionFailed(node))
throw new IOException(s"Connection to $node failed")
else false
}
}
```
* request response
```
def blockingSendAndReceive(request: ClientRequest, timeout: Long)(implicit time: JTime): Option[ClientResponse] = {
client.send(request, time.milliseconds())
pollUntilFound(timeout) { case (responses, _) =>
val response = responses.find { response =>
response.request.request.header.correlationId == request.request.header.correlationId
}
response.foreach { r =>
if (r.wasDisconnected) {
val destination = request.request.destination
throw new IOException(s"Connection to $destination was disconnected before the response was read")
}
}
response
}
}
```
##### [Kafka - ](http://www.jianshu.com/p/aa274f8fe00f)