Swiftソース分析----swift-object-auditor(2)
このブログをサポートしてくれた友達に感謝します.交流を歓迎します.能力と時間が限られていますので、間違いは避けられません.ご指摘ください.
転載する場合は、著者の情報を残してください.ブログのアドレス:http://blog.csdn.net/gaoxingnengjisuan メールアドレス:[email protected]
PS:最近はブログにログインしていません.たくさんの友達のメッセージが見えませんでした.ここで謝ります.また、私はQQにあまり行かないので、メールで交流できます.
転載する場合は、著者の情報を残してください.ブログのアドレス:http://blog.csdn.net/gaoxingnengjisuan メールアドレス:[email protected]
PS:最近はブログにログインしていません.たくさんの友達のメッセージが見えませんでした.ここで謝ります.また、私はQQにあまり行かないので、メールで交流できます.
def object_audit(self, location):
"""
Audits the given object location.
partition ;
AuditException , partition , , ;
, obj server DiskFile ;
_handle_close_quarantine , ;
, ;
1 location , , :
, etag ;
2 , ;
3 ;
"""
def raise_dfq(msg):
raise DiskFileQuarantined(msg)
try:
# object ;
# object, object ;
# get_diskfile_from_audit_location: ;
# audit_location =(path, device, partition) ;
# ;
df = self.diskfile_mgr.get_diskfile_from_audit_location(location)
with df.open():
# ;
metadata = df.get_metadata()
# ;
obj_size = int(metadata['Content-Length'])
if self.stats_sizes:
self.record_stats(obj_size)
# ;
if self.zero_byte_only_at_fps and obj_size:
self.passes += 1
return
# df.reader for class DiskFileReader __iter__,
# close, close :
# , etag ,
# , ;
# , ;
# , ;
# fp;
reader = df.reader(_quarantine_hook=raise_dfq)
with closing(reader):
for chunk in reader:
chunk_len = len(chunk)
self.bytes_running_time = ratelimit_sleep(
self.bytes_running_time,
self.max_bytes_per_second,
incr_by=chunk_len)
self.bytes_processed += chunk_len
self.total_bytes_processed += chunk_len
except DiskFileNotExist:
return
except DiskFileQuarantined as err:
self.quarantines += 1
self.logger.error(_('ERROR Object %(obj)s failed audit and was'
' quarantined: %(err)s'),
{'obj': location, 'err': err})
self.passes += 1
1.呼び出し方法get_disfile_fromaudit_locationは指定されたオブジェクトの具体的なパスを取得します.2.指定されたオブジェクトファイルを開いて、そのメタデータ情報を取得し、メタデータから指定されたオブジェクトのサイズを取得する.3.変数ゼロ_byte_onlyuat唵fpsとobjsizeは指定された対象データが破損されているかどうかを判断し、破損されていない場合はそのまま戻る.4.開いているオブジェクトに対してreader方法を呼び出し、reader方法は後のforサイクルでクラスクラスクラスクラスクラスのDisc FileReaderの下の方法を呼び出す.iter_,この方法では、方法closeをさらに呼び出し、close方法では、以下のステップが実現される. 4.1ファイル長と読み取りファイル長の値が同じかどうかを検出し、同じ値であるかを検出することにより、ファイルが壊れているかどうかを判断し、隔離が必要かを判断する. 4.2ファイルが破損した場合、破損対象ファイルのハッシュ値が空に設定される. 4.3破損したオブジェクトファイルを分離エリアに移動し、その後コピー操作により破損ファイルの回復を実現する. 4.4開いている指定対象ファイルfpを閉じます.def reader(self, keep_cache=False, _quarantine_hook=lambda m: None):
dr = DiskFileReader(
self._fp, self._data_file, int(self._metadata['Content-Length']),
self._metadata['ETag'], self._threadpool, self._disk_chunk_size,
self._mgr.keep_cache_size, self._device_path, self._logger,
quarantine_hook=_quarantine_hook, keep_cache=keep_cache)
self._fp = None
return dr
def __iter__(self):
"""Returns an iterator over the data file."""
try:
......
finally:
# close :
# , etag ,
# , ;
# , ;
# , ;
# fp;
if not self._suppress_file_closing:
self.close()
def close(self):
"""
Close the open file handle if present.
For this specific implementation, this method will handle quarantining
the file if necessary.
, etag ,
, ;
, ;
, ;
fp;
"""
if self._fp:
# , etag ,
# , ;
# , ;
# , ;
try:
if self._started_at_0 and self._read_to_eof:
self._handle_close_quarantine()
except DiskFileQuarantined:
raise
except (Exception, Timeout) as e:
self._logger.error(_(
'ERROR DiskFile %(data_file)s'
' close failure: %(exc)s : %(stack)s'),
{'exc': e, 'stack': ''.join(traceback.format_stack()),
'data_file': self._data_file})
finally:
fp, self._fp = self._fp, None
fp.close()
def _handle_close_quarantine(self):
"""
Check if file needs to be quarantined
, etag ,
, ;
, ;
, ;
"""
# , _quarantine ;
# etag , _quarantine md5 ;
if self._bytes_read != self._obj_size:
self._quarantine(
"Bytes read: %s, does not match metadata: %s" % (
self._bytes_read, self._obj_size))
elif self._iter_etag and self._etag != self._iter_etag.hexdigest():
self._quarantine(
"ETag %s and file's md5 %s do not match" % (
self._etag, self._iter_etag.hexdigest()))
def _quarantine(self, msg):
"""
, ;
, ;
"""
# , ;
# , ;
self._quarantined_dir = self._threadpool.run_in_thread(
quarantine_renamer, self._device_path, self._data_file)
self._logger.warn("Quarantined object %s: %s" % (self._data_file, msg))
self._logger.increment('quarantines')
self._quarantine_hook(msg)
def quarantine_renamer(device_path, corrupted_file_path):
"""
, ;
, ;
"""
# ;
from_dir = dirname(corrupted_file_path)
# ;
to_dir = join(device_path, 'quarantined', 'objects', basename(from_dir))
# ;
invalidate_hash(dirname(from_dir))
# ;
try:
renamer(from_dir, to_dir)
except OSError as e:
if e.errno not in (errno.EEXIST, errno.ENOTEMPTY):
raise
to_dir = "%s-%s" % (to_dir, uuid.uuid4().hex)
renamer(from_dir, to_dir)
return to_dir
def invalidate_hash(suffix_dir):
"""
Invalidates the hash for a suffix_dir in the partition's hashes file.
suffix_dir ;
"""
suffix = basename(suffix_dir)
partition_dir = dirname(suffix_dir)
hashes_file = join(partition_dir, HASH_FILE)
with lock_path(partition_dir):
try:
with open(hashes_file, 'rb') as fp:
hashes = pickle.load(fp)
if suffix in hashes and not hashes[suffix]:
return
except Exception:
return
hashes[suffix] = None
write_pickle(hashes, hashes_file, partition_dir, PICKLE_PROTOCOL)
def renamer(old, new):
"""
;
"""
try:
mkdirs(os.path.dirname(new))
os.rename(old, new)
except OSError:
mkdirs(os.path.dirname(new))
os.rename(old, new)