Swiftソース分析----swift-account-reapper(2)


このブログをサポートしてくれた友達に感謝します.交流を歓迎します.能力と時間が限られていますので、間違いは避けられません.ご指摘ください.
転載する場合は、著者の情報を残してください.ブログのアドレス:http://blog.csdn.net/gaoxingnengjisuan メールアドレス:[email protected]
PS:最近はブログにログインしていません.たくさんの友達のメッセージが見えませんでした.ここで謝ります.また、私はQQにあまり行かないので、メールで交流できます.
前のブログに続きます.
def reap_container(self, account, account_partition, account_nodes, container):
     """        
         container  ;
           container      container  ;
               object       ,          container   object   ,
          object                        ;
            object         ,      (                           );
         object     ,                container  ,     container  ;
       container            account  ,  account    container;
     """
    account_nodes = list(account_nodes)
        
    # get_container_ring:  swift.common.ring.Ring  ,   'container';
    #  account/container/object         ;
    #     (  ,      );
    #             id、weight、zone、ip、port、device、meta;
    part, nodes = self.get_container_ring().get_nodes(account, container)
    node = nodes[-1]
    #          ;
    pool = GreenPool(size=self.object_concurrency)
    marker = ''
        
    #    container      object;
    while True:
        objects = None
        try:
            # direct_get_container:    'GET'     ,       ( )            ;
            # objects     object;
            objects = direct_get_container(
                        node, part, account, container,
                        marker=marker,
                        conn_timeout=self.conn_timeout,
                        response_timeout=self.node_timeout)[1]
            self.stats_return_codes[2] = self.stats_return_codes.get(2, 0) + 1
            self.logger.increment('return_codes.2')
        except ClientException as err:
            if self.logger.getEffectiveLevel() <= DEBUG:
                self.logger.exception(_('Exception with %(ip)s:%(port)s/%(device)s'), node)
            self.stats_return_codes[err.http_status / 100] = self.stats_return_codes.get(err.http_status / 100, 0) + 1
            self.logger.increment('return_codes.%d' % (err.http_status / 100,))
        if not objects:
            break
            
        #    container      object;
        try:
            for obj in objects:
                if isinstance(obj['name'], unicode):
                    obj['name'] = obj['name'].encode('utf8')
                    
                #           reap_object;
                # reap_object:              object  ,       obj['name'];
                #       ,  object                ;
                #   container      object;
                pool.spawn(self.reap_object, account, container, part, nodes, obj['name'])
            pool.waitall()
        except (Exception, Timeout):
            self.logger.exception(_('Exception with objects for container '
                                     '%(container)s for account %(account)s'),
                                   {'container': container, 'account': account})
        marker = objects[-1]['name']
        if marker == '':
            break
    successes = 0
    failures = 0
        
    #         ,  container     ;
    for node in nodes:
        anode = account_nodes.pop()
        try:
            # direct_delete_container:    'DELETE'     ,   account  ( )    container;
            direct_delete_container(
                    node, part, account, container,
                    conn_timeout=self.conn_timeout,
                    response_timeout=self.node_timeout,
                    headers={'X-Account-Host': '%(ip)s:%(port)s' % anode,
                             'X-Account-Partition': str(account_partition),
                             'X-Account-Device': anode['device'],
                             'X-Account-Override-Deleted': 'yes'})

            successes += 1
            self.stats_return_codes[2] = self.stats_return_codes.get(2, 0) + 1
            self.logger.increment('return_codes.2')
        except ClientException as err:
            if self.logger.getEffectiveLevel() <= DEBUG:
                self.logger.exception(_('Exception with %(ip)s:%(port)s/%(device)s'), node)
            failures += 1
            self.logger.increment('containers_failures')
            self.stats_return_codes[err.http_status / 100] = self.stats_return_codes.get(err.http_status / 100, 0) + 1
            self.logger.increment('return_codes.%d' % (err.http_status / 100,))
    if successes > failures:
        self.stats_containers_deleted += 1
        self.logger.increment('containers_deleted')
    elif not successes:
        self.stats_containers_remaining += 1
        self.logger.increment('containers_remaining')
    else:
        self.stats_containers_possibly_remaining += 1
        self.logger.increment('containers_possibly_remaining')
1.指定されたaccount指定containerのセグメント番号とすべてのコピーがあるノード情報を取得し、ノードリストからノードを取得する(すべてのノードのコピー情報が一致するので).2.呼び出し方法direct_ゲットするcontainerは、指定されたコンテナの下のすべてのオブジェクトリストをコンテナサービスから直接取得するための'GET'メソッドを呼び出す要求を送信することを実現する.3.すべてのオブジェクトを巡回して、各オブジェクトに対して方法を呼び出します.オブジェクトサーバから指定対象のデータ情報を削除することができます.4.containerのすべてのコピーがあるノードを巡回して、各ノードに対して方法direct_を呼び出します.delete_containerは、サービスから直接にcontainer関連データ情報(例えば、メタデータ情報データベース情報など)を削除することを実現する.
3に回して、方法を見にきます.objectの実現:
def reap_object(self, account, container, container_partition, container_nodes, obj):
     """        
         object  ;
                   object  ,       object;
           ,  object                ;
       container      object;
     """
    # container      ;
    container_nodes = list(container_nodes)
        
    #  account/container/object         ;
    #     (  ,      );
    #             id、weight、zone、ip、port、device、meta;
    # get_object_ring:  swift.common.ring.Ring  ,   'object';        
    part, nodes = self.get_object_ring().get_nodes(account, container, obj)
    successes = 0
    failures = 0
    for node in nodes:
        cnode = container_nodes.pop()
        try:
            #     HTTPConnection    ;
            #    HTTP     'DELETE'    ;
            #        ( )    ;
            #           ;
            direct_delete_object(
                    node, part, account, container, obj,
                    conn_timeout=self.conn_timeout,
                    response_timeout=self.node_timeout,
                    headers={'X-Container-Host': '%(ip)s:%(port)s' % cnode,
                             'X-Container-Partition': str(container_partition),
                             'X-Container-Device': cnode['device']})

            successes += 1
            self.stats_return_codes[2] = self.stats_return_codes.get(2, 0) + 1
            self.logger.increment('return_codes.2')
        except ClientException as err:
            if self.logger.getEffectiveLevel() <= DEBUG:
                self.logger.exception(_('Exception with %(ip)s:%(port)s/%(device)s'), node)
            failures += 1
            self.logger.increment('objects_failures')
            self.stats_return_codes[err.http_status / 100] = self.stats_return_codes.get(err.http_status / 100, 0) + 1
            self.logger.increment('return_codes.%d' % (err.http_status / 100,))
        if successes > failures:
            self.stats_objects_deleted += 1
            self.logger.increment('objects_deleted')
        elif not successes:
            self.stats_objects_remaining += 1
            self.logger.increment('objects_remaining')
        else:
            self.stats_objects_possibly_remaining += 1
            self.logger.increment('objects_possibly_remaining')
3.1.指定されたaccount指定container指定objectの識別番号とすべてのコピーの所在ノード情報を取得する.3.2すべてのコピーがあるノードを巡回して、各ノードに対して方法direct(u)を呼び出します.delete_objectは、「DELETE」メソッドを呼び出す要求を送信し、対象サービスから指定対象データを直接削除することを実現する.
3.2に移動して、方法direct_を見にきます.delete_objectの実現:
def direct_delete_object(node, part, account, container, obj, conn_timeout=5, response_timeout=15, headers=None):
     """
            ( )    ;
         HTTPConnection    ;
        HTTP     'DELETE'    ;
            ( )    ;
               ;
     """
    if headers is None:
        headers = {}

    path = '/%s/%s/%s' % (account, container, obj)
    with Timeout(conn_timeout):
        #     HTTPConnection    ;
        #    HTTP     'DELETE';
        #   HTTPConnection    ;
        conn = http_connect(node['ip'], node['port'], node['device'], part,
                            'DELETE', path, headers=gen_headers(headers, True))
    with Timeout(response_timeout):
        # getresponse:          ;
        resp = conn.getresponse()
        resp.read()
    if not is_success(resp.status):
        raise ClientException(
            'Object server %s:%s direct DELETE %s gave status %s' %
            (node['ip'], node['port'],
             repr('/%s/%s%s' % (node['device'], part, path)),
             resp.status),
            http_host=node['ip'], http_port=node['port'],
            http_device=node['device'], http_status=resp.status,
            http_reason=resp.reason)
4に変えて、見にきた方法はdirect_を見にきます.delete_containerの実現:
def direct_delete_container(node, part, account, container, conn_timeout=5, response_timeout=15, headers=None):
     """
         'DELETE'     ,       ( )    container    ;
     """
    if headers is None:
        headers = {}

    path = '/%s/%s' % (account, container)
    with Timeout(conn_timeout):
        conn = http_connect(node['ip'], node['port'], node['device'], part,
                            'DELETE', path, headers=gen_headers(headers, True))
    with Timeout(response_timeout):
        resp = conn.getresponse()
        resp.read()
    if not is_success(resp.status):
        raise ClientException(
            'Container server %s:%s direct DELETE %s gave status %s' %
            (node['ip'], node['port'],
             repr('/%s/%s%s' % (node['device'], part, path)), resp.status),
            http_host=node['ip'], http_port=node['port'],
            http_device=node['device'], http_status=resp.status,
            http_reason=resp.reason)