ユニットテストのembedded-kafka

3905 ワード

プロジェクトでは,チームもメッセージミドルウェアとしてKafkaを用いた.組み込みredisの選択の問題を経て、筆者は組み込みkafkaの選択時に更新を継続する傾向があり、メンテナンススタッフは個人や緩やかな組織ではなくチームである.結局,筆者はsalesforceからのオープンソースプロジェクトを選択した.
com.salesforce.kafka.test
    kafka-junit
    3.0.1

以下に、プロジェクトに付属するテストケースコードを少し変更した例を示します.
package com.salesforce.kafka.test.junit4;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

import java.util.Collection;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;

import org.apache.kafka.common.Node;
import org.junit.ClassRule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.salesforce.kafka.test.KafkaBroker;
import com.salesforce.kafka.test.KafkaTestServer;

public class KafkaBrokerTest {
    private static final Logger logger = LoggerFactory.getLogger(KafkaBrokerTest.class);

    /**
     * Validate that we started 2 brokers.
     * @throws Exception 
     */
    @Test
    public void testTwoBrokersStarted() throws Exception {
        Properties overrideProperties = new Properties();
        overrideProperties.put("broker.id", "1");
        overrideProperties.put("port", "6666");
        KafkaTestServer server1= new KafkaTestServer(overrideProperties);
        server1.start();
       String string= server1.getKafkaBrokers().getBrokerById(1).getConnectString();
        logger.info("
"+string+"
"); overrideProperties.put("broker.id", "2"); overrideProperties.put("port", "8888"); KafkaTestServer server2= new KafkaTestServer(overrideProperties); server2.start(); String string2= server2.getKafkaBrokers().getBrokerById(2).getConnectString(); logger.info("
"+string2+"
"); } }

に質問
上記の例から,実際のkafka使用ではIP+ポート番号はkafka brokerごとに異なることが分かる.ただし、salesforce/kafka-coreで提供されるKafkaTestClusterクラスでは、broker portを外部に指定する機会はありません.
 /**
     * Starts the cluster.
     * @throws Exception on startup errors.
     * @throws TimeoutException When the cluster fails to start up within a timely manner.
     */
    public void start() throws Exception, TimeoutException {
        // Ensure zookeeper instance has been started.
        zkTestServer.start();

        // If we have no brokers defined yet...
        if (brokers.isEmpty()) {
            // Loop over brokers, starting with brokerId 1.
            for (int brokerId = 1; brokerId <= numberOfBrokers; brokerId++) {
                // Create properties for brokers
                final Properties brokerProperties = new Properties();

                // Add user defined properties.
                brokerProperties.putAll(overrideBrokerProperties);

                // Set broker.id
                brokerProperties.put("broker.id", String.valueOf(brokerId));

                // Create new KafkaTestServer and add to our broker list
                brokers.add(
                    new KafkaTestServer(brokerProperties, zkTestServer)
                );
            }
        }

        // Loop over each broker and start it
        for (final KafkaTestServer broker : brokers) {
            broker.start();
        }

        // Block until the cluster is 'up' or the timeout is exceeded.
        waitUntilClusterReady(10_000L);
    }

これは、IP+ポート番号を予め指定する必要があるシーンでは面倒です.後続の処理が必要です.例えば,このクラスに構造手法を追加し,以下のリストを用いて初期化が完了したリストbrokersをインパラメータとして渡す.
private final List brokers = new ArrayList<>();

新規構築方法:
public KafkaTestCluster(final List brokers)