KafkaソースコードのKafkaConsumer分析のpoll方法分析

2231 ワード

私たちが情報を取得するのはpoll方法です.今、消費の流れを全体的に見てみましょう.
public ConsumerRecords poll(long timeout) {
		// 
        acquire();
        try {
            if (timeout < 0)
                throw new IllegalArgumentException("Timeout must not be negative");

            // 
            long start = time.milliseconds();
            long remaining = timeout;
            do {
            	// 
                Map>> records = pollOnce(remaining);
                if (!records.isEmpty()) {
                    // , , 
                    fetcher.sendFetches();
                    client.pollNoWakeup();
					// 
                    if (this.interceptors == null)
                        return new ConsumerRecords<>(records);
                    else
                        return this.interceptors.onConsume(new ConsumerRecords<>(records));
                }
				// 
                long elapsed = time.milliseconds() - start;
                remaining = timeout - elapsed;
            } while (remaining > 0);

            return ConsumerRecords.empty();
        } finally {
        	// 
            release();
        }
    }

この方法では、まずacquireが同時動作を防止するために呼び出され、次に、メッセージがプルされた場合、ブロックなしで中断できない要求が実行され、ブロック処理されたメッセージのセットに戻るように、所定の時間内にメッセージをプルアップしようと試みる.
private Map>> pollOnce(long timeout) {
        //  coordinator 
        coordinator.ensureCoordinatorReady();

        //  rebalace 
        if (subscriptions.partitionsAutoAssigned())
            coordinator.ensurePartitionAssignment();

        //  position
        if (!subscriptions.hasAllFetchPositions())
            updateFetchPositions(this.subscriptions.missingFetchPositions());

        long now = time.milliseconds();

        //  
        client.executeDelayedTasks(now);

        // completed 
        Map>> records = fetcher.fetchedRecords();

        // 
        if (!records.isEmpty())
            return records;
	// 
        fetcher.sendFetches();
        client.poll(timeout, now);
        return fetcher.fetchedRecords();
    }