Kafka&Zookeeperクラスタ自動化メンテナンス管理スクリプト


文書ディレクトリ
  • zookeeprクラスタ次元管理スクリプト
  • kafka 2.4.1に基づくkafkaクラスタ次元管理スクリプト
  • kafkaとzookeeperベースの自動化メンテナンス管理スクリプト
    zookeeprクラスタのメンテナンス管理スクリプト
    #!/usr/bin/env bash
    # --------------------------------------------------------------
    #  Author        :pony
    #  Create Time   :2020-04-16 12:56
    #  Description   :zookeeper cluster     
    # --------------------------------------------------------------
    
    . ~/.bashrc
    ZK_HOME="/home/hadoop/zk"
    alias zkServer.sh=$ZK_HOME/bin/zkServer.sh
    alias zkCli.sh=$ZK_HOME/bin/zkCli.sh
    ZOOKEEPERS="127.0.0.1:8181,127.0.0.1:8182,127.0.0.1:8183"
    ZK_NODES=3
    
    function start_zk(){
        for((i=1;i<=$ZK_NODES;i++))
        do 
            zkServer.sh start $ZK_HOME/conf/zoo$i.cfg
        done
    }
    
    function stop_zk(){
        for((i=1;i<=$ZK_NODES;i++))
        do 
            zkServer.sh stop $ZK_HOME/conf/zoo$i.cfg
        done
    }
    function status_zk(){
        for((i=1;i<=$ZK_NODES;i++))
        do 
            zkServer.sh status $ZK_HOME/conf/zoo$i.cfg
        done
    }
    function restart_zk(){
        for((i=1;i<=$ZK_NODES;i++))
        do 
            zkServer.sh restart $ZK_HOME/conf/zoo$i.cfg
        done
    }
    
    function init_clean(){
        stop_zk
        rm -rf $ZK_HOME/data/*
        rm -rf $ZK_HOME/logs/*
        for((i=1;i<=$ZK_NODES;i++))
        do  
            mkdir -p $ZK_HOME/data/$i
            echo $i > $ZK_HOME/data/$i/myid
        done
    }
    
    function init_zk(){
        init_clean
        start_zk
    }
    
    function conn_cluster(){
        zkCli.sh -server $ZOOKEEPERS
    }
    
    
    case "$1" in
        start)
            start_zk
            ;;
        stop)
            stop_zk
            ;;
        status)
            status_zk
            ;;
        restart)
            restart_zk
            ;;
        conn)
            conn_cluster
            ;;
        clean)
            init_clean
            ;;
        init)
            init_zk
            ;;
        *)
            echo "Usage: $0 {start|stop|restart|status|init|clean|conn}"
            RETVAL=1
    esac
    
    exit $RETVAL
    

    kafka 2.4.1ベースのkafkaクラスタのメンテナンス管理スクリプト
    #!/usr/bin/env bash
    # --------------------------------------------------------------
    #  Author        :pony
    #  Create Time   :2020-04-16 12:56
    #  Description   :Kafka cluster     
    # --------------------------------------------------------------
    # This script is based on the new version kafka
    
    . ~/.bashrc
    # Start, stop, status,etc.
    KAFKA_LAUNCH_COMMAND=$@
    
    KAFKA_HOME=/home/hadoop/kafka
    ZK_HOME=/home/hadoop/zk
    PATH=$PATH:$KAFKA_HOME/bin:$ZK_HOME/bin
    
    #Kafka PID_DIR
    PID_DIR="$KAFKA_HOME/pid_dir"
    [ ! -d $PID_DIR ] && mkdir -p $PID_DIR
    
    #PID file
    PID_FILE="$PID_DIR/kafka.pid"
    
    #The number of Kafka brokers
    BROKER_NUMS=3
    
    #kill max wait time Unit: Second
    [ -z $MAX_WAIT_TIME ] && MAX_WAIT_TIME=60
    
    BROKER_LIST="127.0.0.1:8091,127.0.0.1:8092,127.0.0.1:8093"
    #  :znode      kafka broker  zookeeper.connect     
    ZOOKEEPER_CONNECT="127.0.0.1:8181,127.0.0.1:8182,127.0.0.1:8183/kafka"
    ZOOKEEPERS="127.0.0.1:8181,127.0.0.1:8182,127.0.0.1:8183"
    #  sasl JAAS, zk client          
    JAAS_CONF=" -Djava.security.auth.login.config=$KAFKA_HOME/config/kafka_server_jaas.conf"
    export KAFKA_HEAP_OPTS=$KAFKA_HEAP_OPTS$JAAS_CONF
    export JVMFLAGS=$JVMFLAGS$JAAS_CONF
    # running return 1
    function isRunning {
        PID=0
        
        if [ ! -d $PID_DIR ]; then
            printf "Can't find pid dir.
    "
    exit 1 fi if [ ! -f $1 ]; then return 0 fi PID="$(<$1)" if [ x$PID = x ]; then return 0 fi ps -p $PID > /dev/null if [ $? -eq 1 ]; then return 0 else return 1 fi } #kill success return 1 function kafkaKill { local localPID=$(<$1) local i=0 kill $localPID for ((i=0; i<MAX_WAIT_TIME; i++)); do isRunning $1 if [ $? -eq 0 ]; then return 0; fi sleep 1 done kill -s KILL $localPID for ((i=0; i<MAX_WAIT_TIME; i++)); do isRunning $1 if [ $? -eq 0 ]; then return 0; fi sleep 1 done return 1 } function kafkaStart(){ printf "Starting Kafka Cluster.
    "
    local COUNT=0; for((i=1;i<=$BROKER_NUMS;i++)) do isRunning $PID_FILE$i if [ $? -eq 1 ]; then printf "Kafka Broker$i is already running with PID=$(<$PID_FILE$i), PID_FILE=$PID_FILE$i.
    "
    ((COUNT=$COUNT+1)) continue fi printf "Started Kafka Broker$i "; #change kafka running logs path,defualt: $KAFKA_HOME/logs export LOG_DIR=$KAFKA_HOME/logs/$i #kafka-server-start.sh -daemon $KAFKA_HOME/config/server$i.properties # pid kafka-server-start.sh $KAFKA_HOME/config/server$i.properties > /dev/null 2>&1 & #save pid echo $! > $PID_FILE$i isRunning $PID_FILE$i if [ $? -eq 1 ]; then ((COUNT=$COUNT+1)) printf "Success with PID=$(<$PID_FILE$i), PID_FILE=$PID_FILE$i.
    "
    else printf "Failed with PID=$(<$PID_FILE$i), PID_FILE=$PID_FILE$i.
    "
    break fi done if [ $COUNT -eq $BROKER_NUMS ]; then printf "Kafka Cluster Started Success.
    "
    else printf "Kafka Cluster Started Failed.
    "
    fi } function kafkaStop(){ printf "Stopping Kafka Cluster.
    "
    local COUNT=0; local i=0 for((i=1;i<=$BROKER_NUMS;i++)) do isRunning $PID_FILE$i if [ $? -eq 0 ]; then printf "Kafka broker$i is not running with PID_FILE=$PID_FILE$i.
    "
    rm -f $PID_FILE$i ((COUNT=$COUNT+1)) continue fi printf "Stopped Kafka broker$i "; kafkaKill $PID_FILE$i wait if [ $? -eq 1 ]; then printf "failed with PID=$(<$PID_FILE$i), PID_FILE=$PID_FILE$i.
    "
    else printf "succeeded with PID=$(<$PID_FILE$i), PID_FILE=$PID_FILE$i.
    "
    rm -f $PID_FILE$i ((COUNT=$COUNT+1)) fi done if [ $COUNT -eq $BROKER_NUMS ]; then printf "Kafka Cluster Stopped Success.
    "
    else printf "Kafka Cluster Stopped Failed.
    "
    fi } function kafkaInit(){ printf "Starting Init Kafka Cluster.
    "
    kafkaStop printf "Deleting Kafka znode(/kafka) from Zookeeper.
    "
    zkCli.sh -server $ZOOKEEPERS rmr /kafka 1>/dev/null #2>&1 printf "Deleting Kafka Data And Logs from Kafka Cluster.
    "
    rm -rf $KAFKA_HOME/data $KAFKA_HOME/logs kafkaStart printf "Init Kafka Cluster Success.
    "
    } function kafkaClean(){ printf "Starting Clean Kafka Cluster.
    "
    kafkaStop printf "Deleting Kafka znode(/kafka) from Zookeeper.
    "
    zkCli.sh -server $ZOOKEEPERS rmr /kafka 1>/dev/null #2>&1 printf "Deleting Kafka Data And Logs from Kafka Cluster.
    "
    rm -rf $KAFKA_HOME/data $KAFKA_HOME/logs printf "Clean Kafka Cluster Success.
    "
    } function kafkaStatus { printf "Kafka Cluster Status:
    "
    local COUNT=0; for((i=1;i<=$BROKER_NUMS;i++)) do printf "Kafka broker$i " isRunning $PID_FILE$i if [ $? -eq 1 ]; then ((COUNT=$COUNT+1)) printf "is running with PID=$(<$PID_FILE$i), PID_FILE=$PID_FILE$i.
    "
    else printf "is not running with PID_FILE=$PID_FILE$i.
    "
    fi done if [ $COUNT -eq $BROKER_NUMS ]; then printf "Kafka Cluster Status is Good.
    "
    else printf "Kafka Cluster Status is Bad.
    "
    fi } function topicCreate(){ kafka-topics.sh --create --zookeeper $ZOOKEEPER_CONNECT --replication-factor $1 --partitions $2 --topic $3 } function topicDelete(){ kafka-topics.sh --delete --zookeeper $ZOOKEEPER_CONNECT --topic $1 } function topicDesc(){ kafka-topics.sh --describe --zookeeper $ZOOKEEPER_CONNECT --topic $1 } function topicList(){ kafka-topics.sh --list --zookeeper $ZOOKEEPER_CONNECT } function kafkaProducer(){ kafka-console-producer.sh --broker-list $BROKER_LIST --topic $1 } function kafkaConsumer(){ if [ "x$3" = "xbegin" ] ; then kafka-console-consumer.sh --bootstrap-server $BROKER_LIST --topic $1 --from-beginning --group $2 else kafka-console-consumer.sh --bootstrap-server $BROKER_LIST --topic $1 --group $2 fi } function showConsumerGroup(){ kafka-consumer-groups.sh --bootstrap-server $BROKER_LIST --describe --group $1 } function listConsumerGroups(){ kafka-consumer-groups.sh --bootstrap-server $BROKER_LIST --list } function main { case "$1" in start) kafkaStart ;; stop) kafkaStop ;; status) kafkaStatus ;; init) kafkaInit ;; clean) kafkaClean ;; create) if [ $# -eq 4 ] ; then topicCreate $2 $3 $4 else printf "Usage: $0 create {replics|partitions|topic}
    "
    fi ;; delete) if [ $# -eq 2 ] ; then topicDelete $2 else printf "Usage: $0 delete {topic}
    "
    fi ;; topics) topicList ;; desc) if [ $# -eq 2 ] ; then topicDesc $2 else printf "Usage: $0 desc {topic}
    "
    fi ;; producer) if [ $# -eq 2 ] ; then kafkaProducer $2 else printf "Usage: $0 producer {topic}
    "
    fi ;; groups) listConsumerGroups ;; consumer) if [ $# -ge 3 ] ; then kafkaConsumer $2 $3 $4 else printf "Usage: $0 consumer {topic|consumerGroup|[begin]}
    "
    fi ;; group) if [ $# -eq 2 ] ; then showConsumerGroup $2 else printf "Usage: $0 group {topic}
    "
    fi ;; *) printf "Usage: $0 {start|stop|status|init|clean|create|delete|topics|desc|producer|consumer|groups|group}
    "
    ;; esac } #Starting main main $KAFKA_LAUNCH_COMMAND