Swiftソース分析----swift-object-auditor(2)


このブログをサポートしてくれた友達に感謝します.交流を歓迎します.能力と時間が限られていますので、間違いは避けられません.ご指摘ください.
転載する場合は、著者の情報を残してください.ブログのアドレス:http://blog.csdn.net/gaoxingnengjisuan メールアドレス:[email protected]
PS:最近はブログにログインしていません.たくさんの友達のメッセージが見えませんでした.ここで謝ります.また、私はQQにあまり行かないので、メールで交流できます.
def object_audit(self, location):
    """
    Audits the given object location.
     partition                   ;
       AuditException     ,  partition    ,    ,    ;
        
            ,      obj server DiskFile ;
         _handle_close_quarantine  ,             ;
          ,              ;
        
    1   location           ,         ,      :
                        ,      etag     ;
    2          ,              ;
    3              ;
    """
   def raise_dfq(msg):
       raise DiskFileQuarantined(msg)

   try:
     #       object   ;
     #      object,           object   ;
            
     # get_diskfile_from_audit_location:           ;
     #    audit_location =(path, device, partition)        ;
     #              ;
     df = self.diskfile_mgr.get_diskfile_from_audit_location(location)
            
     with df.open():
         #          ;
         metadata = df.get_metadata()
         #        ;
         obj_size = int(metadata['Content-Length'])
                
         if self.stats_sizes:
             self.record_stats(obj_size)
                
         #          ;
         if self.zero_byte_only_at_fps and obj_size:
             self.passes += 1
             return
                
         # df.reader     for       class DiskFileReader    __iter__,
         #               close,  close          :
         #                         ,      etag     ,
         #              ,       ;
         #              ,              ;
         #                   ,                   ;
         #            fp;
         reader = df.reader(_quarantine_hook=raise_dfq)
                
     with closing(reader):
         for chunk in reader:
             chunk_len = len(chunk)
             self.bytes_running_time = ratelimit_sleep(
                  self.bytes_running_time,
                  self.max_bytes_per_second,
                  incr_by=chunk_len)
                    
             self.bytes_processed += chunk_len
             self.total_bytes_processed += chunk_len
         except DiskFileNotExist:
             return
         except DiskFileQuarantined as err:
             self.quarantines += 1
             self.logger.error(_('ERROR Object %(obj)s failed audit and was'
                                 ' quarantined: %(err)s'),
                                {'obj': location, 'err': err})
         self.passes += 1
1.呼び出し方法get_disfile_fromaudit_locationは指定されたオブジェクトの具体的なパスを取得します.2.指定されたオブジェクトファイルを開いて、そのメタデータ情報を取得し、メタデータから指定されたオブジェクトのサイズを取得する.3.変数ゼロ_byte_onlyuat唵fpsとobjsizeは指定された対象データが破損されているかどうかを判断し、破損されていない場合はそのまま戻る.4.開いているオブジェクトに対してreader方法を呼び出し、reader方法は後のforサイクルでクラスクラスクラスクラスクラスのDisc FileReaderの下の方法を呼び出す.iter_,この方法では、方法closeをさらに呼び出し、close方法では、以下のステップが実現される.  4.1ファイル長と読み取りファイル長の値が同じかどうかを検出し、同じ値であるかを検出することにより、ファイルが壊れているかどうかを判断し、隔離が必要かを判断する.  4.2ファイルが破損した場合、破損対象ファイルのハッシュ値が空に設定される.  4.3破損したオブジェクトファイルを分離エリアに移動し、その後コピー操作により破損ファイルの回復を実現する.  4.4開いている指定対象ファイルfpを閉じます.
def reader(self, keep_cache=False, _quarantine_hook=lambda m: None):
    dr = DiskFileReader(
                self._fp, self._data_file, int(self._metadata['Content-Length']),
                self._metadata['ETag'], self._threadpool, self._disk_chunk_size,
                self._mgr.keep_cache_size, self._device_path, self._logger,
                quarantine_hook=_quarantine_hook, keep_cache=keep_cache)

    self._fp = None
    return dr
def __iter__(self):
    """Returns an iterator over the data file."""
    try:
            ......

    finally:
        #    close  :
        #                     ,      etag     ,
        #          ,       ;
        #          ,              ;
        #               ,                   ;
        #        fp;
        if not self._suppress_file_closing:
            self.close()
def close(self):
     """
    Close the open file handle if present.

    For this specific implementation, this method will handle quarantining
    the file if necessary.
        
                         ,      etag     ,
              ,       ;
              ,              ;
                   ,                   ;
            fp;
     """
    if self._fp:
            
        #                     ,      etag     ,
        #          ,       ;
        #          ,              ;
        #               ,                   ;
        try:
            if self._started_at_0 and self._read_to_eof:
                self._handle_close_quarantine()
            except DiskFileQuarantined:
                raise
            except (Exception, Timeout) as e:
                self._logger.error(_(
                    'ERROR DiskFile %(data_file)s'
                    ' close failure: %(exc)s : %(stack)s'),
                    {'exc': e, 'stack': ''.join(traceback.format_stack()),
                     'data_file': self._data_file})
            finally:
                fp, self._fp = self._fp, None
                fp.close()
def _handle_close_quarantine(self):
     """
    Check if file needs to be quarantined
                         ,      etag     ,
              ,       ;
              ,              ;
                   ,                   ;
     """
        
    #                  ,  _quarantine              ;
    #   etag   ,  _quarantine      md5    ;
    if self._bytes_read != self._obj_size:
        self._quarantine(
                "Bytes read: %s, does not match metadata: %s" % (
                 self._bytes_read, self._obj_size))
    elif self._iter_etag and self._etag != self._iter_etag.hexdigest():
        self._quarantine(
                "ETag %s and file's md5 %s do not match" % (
                 self._etag, self._iter_etag.hexdigest()))
def _quarantine(self, msg):
     """
              ,              ;
                   ,                   ;
     """
        
    #          ,              ;
    #               ,                   ;
    self._quarantined_dir = self._threadpool.run_in_thread(
         quarantine_renamer, self._device_path, self._data_file)
    self._logger.warn("Quarantined object %s: %s" % (self._data_file, msg))
    self._logger.increment('quarantines')
    self._quarantine_hook(msg)
def quarantine_renamer(device_path, corrupted_file_path):
     """
             ,              ;
                  ,                   ;
     """
    #        ;
    from_dir = dirname(corrupted_file_path)
    #          ;
    to_dir = join(device_path, 'quarantined', 'objects', basename(from_dir))
    #               ;
    invalidate_hash(dirname(from_dir))
    
    #                ;
    try:
        renamer(from_dir, to_dir)
    except OSError as e:
        if e.errno not in (errno.EEXIST, errno.ENOTEMPTY):
            raise
        to_dir = "%s-%s" % (to_dir, uuid.uuid4().hex)
        renamer(from_dir, to_dir)
    return to_dir
def invalidate_hash(suffix_dir):
     """
    Invalidates the hash for a suffix_dir in the partition's hashes file.
      suffix_dir      ;
     """

    suffix = basename(suffix_dir)
    partition_dir = dirname(suffix_dir)
    hashes_file = join(partition_dir, HASH_FILE)
    with lock_path(partition_dir):
        try:
            with open(hashes_file, 'rb') as fp:
                hashes = pickle.load(fp)
            if suffix in hashes and not hashes[suffix]:
                return
        except Exception:
            return
        hashes[suffix] = None
        write_pickle(hashes, hashes_file, partition_dir, PICKLE_PROTOCOL)
def renamer(old, new):
     """
                    ;
     """
    try:
        mkdirs(os.path.dirname(new))
        os.rename(old, new)
    except OSError:
        mkdirs(os.path.dirname(new))
        os.rename(old, new)