JsonSchemaを勉強して、私は1つの検査コードをカスタマイズしました

16457 ワード

JsonSchema
fastjsonschemaを使用してデータを検証

#      
import json
import fastjsonschema

#   schema
with open('../schema/oneof-schema.json', encoding='utf8') as f:
    my_schema = json.load(f)

# json  :
with open('../data/test.json', encoding='utf8') as f:
    json_data = json.load(f)

#   :
fastjsonschema.validate(my_schema, json_data)

jsonschemaを使用してデータを検証

import json
#      
from jsonschema import validate, draft7_format_checker, SchemaError, ValidationError

if __name__ == '__main__':
    with open('../schema/MySchema.json', encoding='utf8') as f:
        my_schema = json.load(f)

    # json  :
    with open('../data/cece.json', encoding='utf8') as f:
        json_data = json.load(f)

    # error_list = check_type(my_schema, json_data)
    # print(error_list)

    #   :
    try:
        validate(instance=json_data, schema=my_schema, format_checker=draft7_format_checker)
        # Draft7Validator.format_checker
    except SchemaError as serr:
        print("schema    【%s】 
schema " % str(serr)) except ValidationError as verr: print(" 【%s】
" % str(verr))

MySchema
JSONSSchemaの欠点
  • エラーメッセージ英語
  • 検査データは一歩一歩検査であり、エラーに遭遇して
  • を停止する.
    カスタムJSONSchema
    schema準拠http://json-schema.org/,
    "$schema": "http://json-schema.org/draft-07/schema#",
    使い方>>>>こちらをクリック
    コード#コード#
    個人が作成した検証のコードは,カスタム成分が多くstringタイプのデータformatの選択判断のみを拡張している.
    CheckDataUti.py
    
    import re
    import time
    
    # email      
    EMAIL_REGEX = "^\w+([-+.]\w+)*@\w+([-.]\w+)*\.\w+([-.]\w+)*$"
    # URL      
    URL_REGEX = "^[a-zA-z]+://[^\s]*$"
    # PHONE      
    PHONE_REGEX = "^([1][3,4,5,6,7,8,9])\d{9}$"
    #          
    ID_CARD_REGEX = "^((\d{18})|([0-9x]{18})|([0-9X]{18}))$"
    #           
    ZIP_CODE_REGEX = "^[1-9]\d{5}(?!\d)$"
    # IP         
    IP_REGEX = "^\d+\.\d+\.\d+\.\d+$"
    #    
    INTEGER_REGEX = "^[1-9]\d*$"
    
    ERR_LIST = []
    COMMON_ERR_LIST = []
    
    
    def log_error(msg, data, schema, is_common=False):
        """
              
        """
        err_log = "%s,  :【%s】,    : %s" % (str(msg), str(data) + " type of " + str(type(data).__name__), str(schema))
    
        if not is_common:
            ERR_LIST.append(err_log)
            print("=================================================")
            print(err_log)
            print("=================================================")
        else:
            COMMON_ERR_LIST.append(err_log)
    
    
    def check_object(data, schema, is_common):
        """
              
        【 properties、required、minProperties、maxProperties、patternProperties、additionalProperties 】
        """
        if type(data) != dict:
            log_error("     json        ", data, schema, is_common)
        else:
            #            key
            keys = dict.keys(data)
    
            #      
            if "required" in schema:
                required_schema = schema['required']
                for schema_key in required_schema:
                    if schema_key not in keys:
                        log_error("  【%s】  " % schema_key, data, schema, is_common)
    
            #     key   key
            if "minProperties" in schema:
                min_properties = schema['minProperties']
                if len(keys) < min_properties:
                    log_error("     key    【%s】" % str(min_properties), data, schema, is_common)
    
            if "maxProperties" in schema:
                max_properties = schema['maxProperties']
                if len(keys) > max_properties:
                    log_error("     key    【%s】" % str(max_properties), data, schema, is_common)
    
            #      key
            if "properties" in schema:
                #    properties
                properties_schema = schema['properties']
                schema_keys = dict.keys(properties_schema)
                for data_key in schema_keys:
                    if data_key in data:
                        check_data(properties_schema[data_key], data[data_key])
    
            #           key
            if "patternProperties" in schema:
                #    properties
                pattern_properties = schema['patternProperties']
                schema_keys = dict.keys(pattern_properties)
    
                #           key
                for schema_key in schema_keys:
                    #           key
                    for data_key in keys:
                        #             key  
                        if re.match(schema_key, data_key):
                            check_data(pattern_properties[schema_key], data[data_key])
    
    
    def check_array(data, schema, is_common):
        """
              
        【 items、additionalItems、minItems、maxItems、uniqueItems 】
        """
        if type(data) != list:
            log_error("     json      ", data, schema, is_common)
        else:
            # minItems、maxItems
            #      
            if "minItems" in schema:
                min_items = schema['minItems']
                if len(data) < min_items:
                    log_error("             【%s】" % str(min_items), data, schema, is_common)
    
            #      
            if "maxItems" in schema:
                max_properties = schema['maxItems']
                if len(data) > max_properties:
                    log_error("             【%s】" % str(max_properties), data, schema, is_common)
    
            # uniqueItems true         
            if "uniqueItems" in schema:
                unique_items_schema = schema['uniqueItems']
    
                if unique_items_schema:
                    #         
                    try:
                        if len(set(data)) != len(data):
                            log_error("               ", data, schema, is_common)
                    except TypeError:
                        #          dict  
                        pass
            #      items
            if "items" in schema:
                items_schema = schema["items"]
                #   items_schema        
                if type(items_schema) is list:
                    #          item    jsonSchema              
                    index = 0
                    for item_sc in items_schema:
                        check_data(item_sc, data[index])
                        index += 1
    
                    # additionalItems       items          
                    # additionalItems                     
                    if "additionalItems" in schema:
                        additional_items_schema = schema['additionalItems']
    
                        for i in range(index, len(data)):
                            check_data(additional_items_schema, data[i])
    
                # items         schema             
                elif type(items_schema) is dict:
                    for item_data in data:
                        check_data(items_schema, item_data)
    
    
    def check_number(data, schema, is_common):
        """
              
        """
        if type(data) not in (int, float):
            log_error("     json        ", data, schema, is_common)
        else:
            #       maximum   exclusiveMaximum     True     
            if "maximum" in schema:
                maximum_schema = schema['maximum']
                if 'exclusiveMaximum' in schema and schema['exclusiveMaximum']:
                    if data >= maximum_schema:
                        log_error("           【%s】" % maximum_schema, data, schema, is_common)
                else:
                    if data > maximum_schema:
                        log_error("         【%s】" % maximum_schema, data, schema, is_common)
    
            # minimum、exclusiveMinimum
            if "minimum" in schema:
                minimum_schema = schema['minimum']
                if 'exclusiveMinimum' in schema and schema['exclusiveMinimum']:
                    if data <= minimum_schema:
                        log_error("           【%s】" % minimum_schema, data, schema, is_common)
                else:
                    if data < minimum_schema:
                        log_error("         【%s】" % minimum_schema, data, schema, is_common)
    
            # multipleOf      
            if "multipleOf" in schema:
                multiple_of_schema = schema['multipleOf']
                if not data % multiple_of_schema == 0:
                    log_error("          %s  " % multiple_of_schema, data, schema, is_common)
    
    
    def check_str(data, schema, is_common):
        """
               
               【maxLength、minLength、pattern、format】
        """
        if type(data) != str:
            log_error("                ", data, schema, is_common)
        else:
            # maxLength
            if "maxLength" in schema:
                max_length_schema = schema['maxLength']
                if len(data) > max_length_schema:
                    log_error("           %d" % max_length_schema, data, schema, is_common)
    
            # minLength
            if "minLength" in schema:
                min_length_schema = schema['minLength']
                if len(data) < min_length_schema:
                    log_error("           %d" % min_length_schema, data, schema, is_common)
    
            # pattern
            if "pattern" in schema:
                pattern_schema = schema['pattern']
                if not re.match(pattern_schema, data):
                    log_error("                 【%s】" % pattern_schema, data, schema, is_common)
            # format
            if 'format' in schema:
                format_schema = schema['format']
    
                if format_schema == 'email' and not re.match(EMAIL_REGEX, data):
                    log_error("                ", data, schema, is_common)
    
                elif format_schema == 'phone' and not re.match(PHONE_REGEX, data):
                    log_error("                  ", data, schema, is_common)
    
                elif format_schema == 'hostname' and not re.match(IP_REGEX, data):
                    log_error("            IP    ", data, schema, is_common)
    
                elif format_schema == 'idCard' and not re.match(ID_CARD_REGEX, data):
                    log_error("                 ", data, schema, is_common)
    
                elif format_schema == 'date':
                    format_patten = '%Y-%m-%d'
                    if 'format_patten' in schema:
                        format_patten = schema['format_patten']
                    try:
                        time.strptime(data, format_patten)
                    except ValueError:
                        log_error("                  【%s】" % format_patten, data, schema, is_common)
    
    
    def check_common(schema, data):
        """
             
              :
        【 enum、const、allOf、anyOf、oneOf、not、 if……then…… 】
        """
        if "enum" in schema:
            enum_schema = schema['enum']
            if data not in enum_schema:
                log_error("           【%s】 " % str(enum_schema), data, schema)
    
        if "const" in schema:
            const_schema = schema['const']
            if data != const_schema:
                log_error("          【%s】" % str(const_schema), data, schema)
    
        if "allOf" in schema:
            all_of_schema = schema['allOf']
            for item_schema in all_of_schema:
                check_data(item_schema, data)
        if "anyOf" in schema:
            any_of_schema = schema['anyOf']
    
            begin_len = len(COMMON_ERR_LIST)
    
            for item_schema in any_of_schema:
                check_data(item_schema, data, True)
    
            end_len = len(COMMON_ERR_LIST)
    
            if end_len - begin_len == len(any_of_schema):
                log_error("            anyof      ", data, schema)
    
        if "oneOf" in schema:
            one_of_schema = schema['oneOf']
    
            begin_len = len(COMMON_ERR_LIST)
    
            for item_schema in one_of_schema:
                check_data(item_schema, data, True)
    
            end_len = len(COMMON_ERR_LIST)
    
            if end_len - begin_len != len(one_of_schema) - 1:
                log_error("   JSON      oneOf   ", data, schema)
    
        if "not" in schema:
            not_schema = schema['not']
            begin_len = len(COMMON_ERR_LIST)
            check_data(not_schema, data, True)
            end_len = len(COMMON_ERR_LIST)
    
            if end_len == begin_len:
                log_error("   JSON      not     ", data, schema)
    
        # if……then……
        if 'if' in schema:
            if_schmea = schema['if']
            begin_len = len(COMMON_ERR_LIST)
            check_data(if_schmea, data, True)
            end_len = len(COMMON_ERR_LIST)
    
            if end_len == begin_len:
                if "then" in schema:
                    then_schema = schema['then']
                    check_data(then_schema, data, False)
            else:
                if "else" in schema:
                    else_schema = schema['else']
                    check_data(else_schema, data, False)
    
    def get_data_type(data):
        """
          type
        """
        if type(data) == dict:
            return 'object'
        if type(data) == list:
            return 'array'
        if type(data) in (int, float):
            return 'number'
        if type(data) == str:
            return 'string'
        if type(data) == bool:
            return 'boolean'
    
    
    def check_data(schema, data, is_common=False):
        #         
        check_common(schema, data)
    
        #   type   
        # type    string
        type_name = schema['type'] if "type" in schema else get_data_type(data)
    
        if type_name == 'object':
            check_object(data, schema, is_common)
        elif type_name == 'array':
            check_array(data, schema, is_common)
        elif type_name in ['integer', 'number']:
            check_number(data, schema, is_common)
        elif type_name == 'string':
            check_str(data, schema, is_common)
        # type     
        elif type_name == 'boolean':
            if type(data) != bool:
                log_error("           boolean  ", data, schema, is_common)
    
    

    JsonSchmeaのデータ例
    
    {
      "$schema": "http://json-schema.org/draft-07/schema#",
      "type": "object",
      "minProperties": 1,
      "maxProperties": 200,
      "properties": {
        "name": {
          "type": "string",
          "enum": [
            "shaofei",
            "upuptop",
            "pyfysf"
          ]
        },
        "email": {
          "type": "string",
          "format": "email",
          "const": "[email protected]"
        },
        "idCard": {
          "type": "string",
          "format": "idCard",
          "pattern": "\\d+"
        },
        "phone": {
          "type": "string",
          "format": "phone"
        },
        "hostname": {
          "type": "string",
          "format": "hostname"
        },
        "createTime": {
          "format": "date",
          "format_patten": "%Y%m%d"
        },
        "is": {
          "type": "boolean"
        },
        "age": {
          "type": "integer",
          "maximum": 20,
          "minimum": 1,
          "multipleOf": 2
        },
        "like": {
          "type": "array"
        }
      },
      "allOf": [
        {
          "type": "string"
        }
      ],
      "patternProperties": {
        "^\\S+123$": {
          "type": "integer"
        }
      },
      "required": [
        "email"
      ]
    }
    

    使用方法
    
    import json
    
    from CheckDataUti import check_data
    
    if __name__ == '__main__':
        with open('../schema/MySchema.json', encoding='utf8') as f:
            my_schema = json.load(f)
    
        # json  :
        with open('../data/cece.json', encoding='utf8') as f:
            json_data = json.load(f)
    
        check_data(my_schema, json_data)
        # print(ERR_LIST)
    

    参照先:
    schema準拠http://json-schema.org/,
    "$schema": "http://json-schema.org/draft-07/schema#",
    使い方>>>>こちらをクリック