Swiftソース分析----swift-object-auditor(1)
友达がこのブログを支持することに感谢して、共に交流を探求することを歓迎して、能力と时间が限られているため、间违ったところは避けられないで、指摘を歓迎します!
転載する場合は、作者情報を保持してください.ブログのアドレス:http://blog.csdn.net/gaoxingnengjisuan メールアドレス:[email protected]
PS:最近ブログに登录していないで、多くの友达の伝言は见ていないで、ここは谢ります!また、私はQQに行くことが少なく、メールで交流することができます.
概要:
オブジェクト監査デーモンプロセス; オブジェクト監査デーモンプロセスは主に監査設備ディレクトリの下のすべてのobjectの操作を実現する.1つのオブジェクトに対する監査操作の流れは以下の通りである:1指定されたオブジェクトデータに対して検査を行い、ファイルが破損しているかどうかを判断する.検査方法は以下の通りである. ファイル長と読み取りファイル長の値が同じかどうかを検出し、etag値が同じかどうかを検出する.2ファイルが破損した場合、破損したオブジェクトファイルのハッシュ値を空に設定します.3破損したオブジェクトファイルを隔離領域に移動する.ここで定義したonce=Trueは、システムがデーモンクラスDaemonのrun_をデフォルトで呼び出すことを示しています.onceメソッド;これにより、ObjectAuditorクラスのrun_を呼び出すことが最終的に実現される.onceメソッド;
ソース解析:
以下はこの部分のコードの主な実行プロセスであり、コードの重要な部分はすでに関連する注釈を行っている.
転載する場合は、作者情報を保持してください.ブログのアドレス:http://blog.csdn.net/gaoxingnengjisuan メールアドレス:[email protected]
PS:最近ブログに登录していないで、多くの友达の伝言は见ていないで、ここは谢ります!また、私はQQに行くことが少なく、メールで交流することができます.
概要:
オブジェクト監査デーモンプロセス; オブジェクト監査デーモンプロセスは主に監査設備ディレクトリの下のすべてのobjectの操作を実現する.1つのオブジェクトに対する監査操作の流れは以下の通りである:1指定されたオブジェクトデータに対して検査を行い、ファイルが破損しているかどうかを判断する.検査方法は以下の通りである. ファイル長と読み取りファイル長の値が同じかどうかを検出し、etag値が同じかどうかを検出する.2ファイルが破損した場合、破損したオブジェクトファイルのハッシュ値を空に設定します.3破損したオブジェクトファイルを隔離領域に移動する.ここで定義したonce=Trueは、システムがデーモンクラスDaemonのrun_をデフォルトで呼び出すことを示しています.onceメソッド;これにより、ObjectAuditorクラスのrun_を呼び出すことが最終的に実現される.onceメソッド;
ソース解析:
以下はこの部分のコードの主な実行プロセスであり、コードの重要な部分はすでに関連する注釈を行っている.
from swift.obj.auditor import ObjectAuditor
from swift.common.utils import parse_options
from swift.common.daemon import run_daemon
from optparse import OptionParser
if __name__ == '__main__':
parser = OptionParser("%prog CONFIG [options]")
parser.add_option('-z', '--zero_byte_fps',
help='Audit only zero byte files at specified files/sec')
parser.add_option('-d', '--devices',
help='Audit only given devices. Comma-separated list')
conf_file, options = parse_options(parser=parser, once=True)
run_daemon(ObjectAuditor, conf_file, **options)
def run_daemon(klass, conf_file, section_name='', once=False, **kwargs):
"""
, “klass” kwarg;
"""
......
try:
klass(conf).run(once=once, **kwargs)
except KeyboardInterrupt:
logger.info('User quit')
logger.info('Exited')
def run(self, once=False, **kwargs):
"""
;
run_once, run_forever;
"""
# ;
utils.validate_configuration()
utils.drop_privileges(self.conf.get('user', 'swift'))
# ;
utils.capture_stdio(self.logger, **kwargs)
def kill_children(*args):
signal.signal(signal.SIGTERM, signal.SIG_IGN)
os.killpg(0, signal.SIGTERM)
sys.exit()
signal.signal(signal.SIGTERM, kill_children)
if once:
self.run_once(**kwargs)
else:
self.run_forever(**kwargs)
def run_once(self, *args, **kwargs):
"""
Override this to run the script once
;
"""
raise NotImplementedError('run_once not implemented')
def run_once(self, *args, **kwargs):
"""
object ;
audit_all_objects, object;
object ;
:
1 , , :
, etag ;
2 , ;
3 ;
"""
# zero byte only command line option
zbo_fps = kwargs.get('zero_byte_fps', 0)
override_devices = list_from_csv(kwargs.get('devices'))
# Remove bogus entries and duplicates from override_devices
override_devices = list(
set(listdir(self.devices)).intersection(set(override_devices)))
parent = False
if zbo_fps:
# only start parent
parent = True
kwargs = {'mode': 'once'}
try:
self.audit_loop(parent, zbo_fps, override_devices=override_devices,
**kwargs)
except (Exception, Timeout):
self.logger.exception(_('ERROR auditing'))
def audit_loop(self, parent, zbo_fps, override_devices=None, **kwargs):
"""Audit loop"""
self.clear_recon_cache('ALL')
self.clear_recon_cache('ZBF')
kwargs['device_dirs'] = override_devices
if parent:
kwargs['zero_byte_fps'] = zbo_fps
self.run_audit(**kwargs)
else:
pids = []
if self.conf_zero_byte_fps:
zbf_pid = self.fork_child(zero_byte_fps=True, **kwargs)
pids.append(zbf_pid)
pids.append(self.fork_child(**kwargs))
while pids:
pid = os.wait()[0]
# ZBF scanner must be restarted as soon as it finishes
if self.conf_zero_byte_fps and pid == zbf_pid and \
len(pids) > 1:
kwargs['device_dirs'] = override_devices
zbf_pid = self.fork_child(zero_byte_fps=True, **kwargs)
pids.append(zbf_pid)
pids.remove(pid)
def run_audit(self, **kwargs):
"""
Run the object audit
object ;
:
1 , , :
, etag ;
2 , ;
3 ;
"""
mode = kwargs.get('mode')
zero_byte_only_at_fps = kwargs.get('zero_byte_fps', 0)
device_dirs = kwargs.get('device_dirs')
# AuditorWorker: ;
worker = AuditorWorker(self.conf, self.logger, self.rcache,
self.devices,
zero_byte_only_at_fps=zero_byte_only_at_fps)
# object ;
# :
# 1 , , :
# , etag ;
# 2 , ;
# 3 ;
worker.audit_all_objects(mode=mode, device_dirs=device_dirs)
def audit_all_objects(self, mode='once', device_dirs=None):
"""
object ;
:
1 , , :
, etag ;
2 , ;
3 ;
"""
description = ''
if device_dirs:
device_dir_str = ','.join(sorted(device_dirs))
description = _(' - %s') % device_dir_str
self.logger.info(_('Begin object audit "%s" mode (%s%s)') % (mode, self.auditor_type,
description))
begin = reported = time.time()
self.total_bytes_processed = 0
self.total_files_processed = 0
total_quarantines = 0
total_errors = 0
time_auditing = 0
# , (path, device,
partition);
# audit_location_generator path,device,partition;
# ;
# device_dirs hash
# hsh_path, device, partition
# all_locs self.device device_dirs , AuditLocation(hsh_path, device,
partition) ;
all_locs = self.diskfile_mgr.object_audit_location_generator(device_dirs=device_dirs)
for location in all_locs:
loop_time = time.time()
# 1 location , , :
# , etag ;
# 2 , ;
# 3 ;
self.failsafe_object_audit(location)
self.logger.timing_since('timing', loop_time)
self.files_running_time = ratelimit_sleep(
self.files_running_time, self.max_files_per_second)
self.total_files_processed += 1
now = time.time()
if now - reported >= self.log_time:
self.logger.info(_(
'Object audit (%(type)s). '
'Since %(start_time)s: Locally: %(passes)d passed, '
'%(quars)d quarantined, %(errors)d errors '
'files/sec: %(frate).2f , bytes/sec: %(brate).2f, '
'Total time: %(total).2f, Auditing time: %(audit).2f, '
'Rate: %(audit_rate).2f') % {
'type': '%s%s' % (self.auditor_type, description),
'start_time': time.ctime(reported),
'passes': self.passes, 'quars': self.quarantines,
'errors': self.errors,
'frate': self.passes / (now - reported),
'brate': self.bytes_processed / (now - reported),
'total': (now - begin), 'audit': time_auditing,
'audit_rate': time_auditing / (now - begin)})
cache_entry = self.create_recon_nested_dict(
'object_auditor_stats_%s' % (self.auditor_type),
device_dirs,
{'errors': self.errors, 'passes': self.passes,
'quarantined': self.quarantines,
'bytes_processed': self.bytes_processed,
'start_time': reported, 'audit_time': time_auditing})
dump_recon_cache(cache_entry, self.rcache, self.logger)
reported = now
total_quarantines += self.quarantines
total_errors += self.errors
self.passes = 0
self.quarantines = 0
self.errors = 0
self.bytes_processed = 0
# ;
time_auditing += (now - loop_time)
# Avoid divide by zero during very short runs
# , 0.000001, 0;
elapsed = (time.time() - begin) or 0.000001
self.logger.info(_(
'Object audit (%(type)s) "%(mode)s" mode '
'completed: %(elapsed).02fs. Total quarantined: %(quars)d, '
'Total errors: %(errors)d, Total files/sec: %(frate).2f, '
'Total bytes/sec: %(brate).2f, Auditing time: %(audit).2f, '
'Rate: %(audit_rate).2f') % {
'type': '%s%s' % (self.auditor_type, description),
'mode': mode, 'elapsed': elapsed,
'quars': total_quarantines + self.quarantines,
'errors': total_errors + self.errors,
'frate': self.total_files_processed / elapsed,
'brate': self.total_bytes_processed / elapsed,
'audit': time_auditing, 'audit_rate': time_auditing / elapsed})
# Clear recon cache entry if device_dirs is set
if device_dirs:
cache_entry = self.create_recon_nested_dict(
'object_auditor_stats_%s' % (self.auditor_type),
device_dirs, {})
dump_recon_cache(cache_entry, self.rcache, self.logger)
if self.stats_sizes:
self.logger.info(_('Object audit stats: %s') % json.dumps(self.stats_buckets))
def failsafe_object_audit(self, location):
"""
Entrypoint to object_audit, with a failsafe generic exception handler.
1 location , , :
, etag ;
2 , ;
3 ;
"""
# object_audit: , obj server DiskFile ;
# _handle_close_quarantine , ;
# , ;
try:
self.object_audit(location)
except (Exception, Timeout):
self.logger.increment('errors')
self.errors += 1
self.logger.exception(_('ERROR Trying to audit %s'), location)
次のブログでは、swift-object-auditorの分析を続けます.