IPアドレスをCIDR表記にまとめる


「IPv4 AddressとCIDRリストを可能な限りCIDRでまとめる君」

やりたいこと

ホスト表記のIPアドレス、またはCIDR表記のIPアドレスを、可能な限りCIDR表記でまとめたい。
※IPv6は対象外

動作環境

python3系であれば動作する想定。
主は3.6.4で実行

入出力

IPv4アドレスが記載されているファイルを読み込んで、可能な限りまとめたCIDR表記の一覧を別ファイルに出力する。
※動作確認は「/23」まで。「/10」とかでも動作はする想定。

入力

ip_list.txt
172.30.43.190
172.30.43.191
172.30.43.192
172.30.43.193
172.30.43.194/31
172.30.43.196/30

出力

out_file.txt
172.30.43.190/31
172.30.43.192/29

スクリプト

大まかな流れ

以下main内の処理

  1. ファイル読み込み
      ついでに空行と改行コードの削除

  2. CIDRをIPAddressのリストに変換する
      ”/”があればCIDR表記認識して処理

  3. IPをBitに変換
      0≦オクテット値≦255 じゃなければエラーで終了

  4. 重複カット
      入力に重複があっても、ホスト表記のIPアドレスとCIDR表記内のIPアドレスに重複があってもOK

  5. bitでソート

  6. IPアドレスが連続しているか判定し、2次元配列を作成

  7. 連続するIPの判定し、連続するIP群をリスト化
      bitの右から0が何個連続しているかで最大サブネットをチェック
      最大サブネットでまとめられるIP数と、IPが連続している数を比較
      小さい方でまとめる。
      連続するIPが残っていれば再帰処理

  8. ファイル書き出し
      7で貯めたリストを出力

コード

to_cidr.py
import ipaddress
import math


HOST_CIDR = False     # True :「/32を記載する」
IN_FILE = "ip_list.txt"
OUT_FILE = "out_file.txt"

# 出力用
OUT_LIST = []


# ファイル読み込み
def read_file(file):
    try:
        with open(file, "r", encoding="utf-8") as f:
            ip_list = f.readlines()
            # 改行コードの処理
            ip_list = [line.rstrip('\r\n') for line in ip_list]
            ip_list = [line.rstrip('\n') for line in ip_list]

            # 空行の削除処理
            ip_list = [line for line in ip_list if line]
        return ip_list

    except IOError:
        print("Error : {}が開けない".format(IN_FILE))

    except UnicodeDecodeError:
        print("Error : {}が読み込めない".format(IN_FILE))


# ファイル書き込み
def write_file(file):
    try:
        with open(file, "w", encoding="utf-8") as f:
            f.write('\n'.join(OUT_LIST))
    except IOError:
        print("Error : {}が開けない".format(OUT_FILE))


# IPAddressをbit文字列へ変換
def ip_to_bit(ip):

    # octetごとに分ける
    octet_1 = int(ip.split(".")[0])
    octet_2 = int(ip.split(".")[1])
    octet_3 = int(ip.split(".")[2])
    octet_4 = int(ip.split(".")[3])

    # それぞれのオクテットの値が0~255の範囲でなければError
    if 0 <= octet_1 <= 255 and \
            0 <= octet_2 <= 255 and \
            0 <= octet_3 <= 255 and \
            0 <= octet_4 <= 255:

        bit_str = format(octet_1, "08b")
        bit_str += format(octet_2, "08b")
        bit_str += format(octet_3, "08b")
        bit_str += format(octet_4, "08b")
    else:
        print("Func : ip_to_bit")
        print("Error : {}".format(ip))
        exit()

    return bit_str


# bit文字列をIPAddressへ変換
def bit_to_ip(bit_str):
    octet_1 = int(bit_str[0:8], 2)
    octet_2 = int(bit_str[8:16], 2)
    octet_3 = int(bit_str[16:24], 2)
    octet_4 = int(bit_str[24:32], 2)
    return "{}.{}.{}.{}".format(octet_1, octet_2, octet_3, octet_4)


# 連続したIPAddressをリストにする
def make_serial_list(bit_list):
    serial_list = []
    tmp_list = []

    for i in range(0, len(bit_list)-1):
        if len(tmp_list) == 0:
            tmp_list.append(bit_list[i])

        # 1足したら次の値と一致するか確認
        if int(bit_list[i], 2) + 1 == int(bit_list[i + 1], 2):
            tmp_list.append(bit_list[i + 1])
        else:
            serial_list.append(tmp_list[:])
            tmp_list.clear()
            tmp_list.append(bit_list[i+1])

    serial_list.append(tmp_list[:])
    return serial_list


# 最大でIPを何個含むマスクでまとめられるかを返す
def check_ip_num_in_subnet(bit_str):
    count = 0
    for s in bit_str[::-1]:
        if s == "0":
            count += 1
        else:
            return 2 ** count
    return 2 ** count


# num以下で最大の2のべき乗を返す
def the_largest_power_of_2_less_than_or_equal_to_num(num):
    count = 0
    while True:
        if num < 2 ** count:
            return 2 ** (count - 1)
        elif num == 2 ** count:
            return num
        else:
            count += 1


# 連続するIPAddressからCIDR表記を作成して出力準備する
def iplist_to_cidr(bit_list):

    # IPAddressが連続している数 serial_num >= 1
    serial_num = len(bit_list)

    # リスト中の一つ目のIPAddressを渡し、
    # 最大何個のIPAddressをまとめることができるかを返す max_ip_num >= 1
    max_ip_num = check_ip_num_in_subnet(bit_list[0])

    bit_str = ""

    # サブネットを切る範囲の決定
    if serial_num >= max_ip_num:
        # 連続するIPアドレスの数より、まとめることが可能なIPAddress数が小さい時
        summary_num = max_ip_num
    else:
        # 上記以外の場合、連続するIPアドレスの数の中で最大何個にまとめられるかを返す
        summary_num = the_largest_power_of_2_less_than_or_equal_to_num(serial_num)

    # 最初のbitを取り出して、残りは捨てる
    for i in range(summary_num):
        if i == 0:
            bit_str = bit_list.pop(0)
        else:
            bit_list.pop(0)

    # 出力処理 
    # /32の時、/32と表記するかどうかの判定を行う
    if int(math.log2(summary_num)) == 0 and not HOST_CIDR:
        print("{}".format(bit_to_ip(bit_str)))
        OUT_LIST.append("{}".format(bit_to_ip(bit_str)))
    else:
        print("{}/{}".format(
            bit_to_ip(bit_str), 32 - int(math.log2(summary_num))
        ))
        OUT_LIST.append("{}/{}".format(
            bit_to_ip(bit_str), 32 - int(math.log2(summary_num))
        ))

    # bitのListがなくなれば終了
    if len(bit_list) == 0:
        return
    else:
        # 再起させる
        iplist_to_cidr(bit_list)


# CIDR表記をIPAddressのリストに変換し、IPAddressの一覧を完成させる。
def cidr_to_ip(read_ip_list):
    cidr_to_ip_list = []
    try:
        for ip_or_cidr in read_ip_list:
            if "/" in ip_or_cidr:
                for ip in ipaddress.IPv4Network(ip_or_cidr):
                    cidr_to_ip_list.append(str(ip))
            # CIDR表記でないIPAddressはそのままリストに追加
            else:
                cidr_to_ip_list.append(ip_or_cidr)
    except ipaddress.AddressValueError:
        print("Func : cidr_to_ip")
        print("Error : {}".format(ip_or_cidr))

    return cidr_to_ip_list


def main():
    # ファイル読み込み
    read_ip_list = read_file(IN_FILE)

    # CIDRをIPAddressのリストに変換する
    cidr_to_ip_list = cidr_to_ip(read_ip_list)

    # IPをBitに変換
    bit_list = []
    for ip in cidr_to_ip_list:
        bit_list.append(ip_to_bit(ip))

    # 重複カット
    bit_list = list(set(bit_list[:]))

    # Bitでソート
    bit_list.sort()

    # 連続するIPの判定し、連続するIP群をリスト化
    serial_list = make_serial_list(bit_list)

    # IPのサブネットの決定と出力リスト作成
    for bit_list in serial_list:
        iplist_to_cidr(bit_list)

    # ファイル書き出し
    write_file(OUT_FILE)


if __name__ == '__main__':
    main()