OpenStock Nova-schedulerコンポーネントのソースコード解析(2)
このブログをサポートしてくれた友達に感謝します.交流を歓迎します.能力と時間が限られていますので、間違いは避けられません.ご指摘ください.転載する場合は、著者の情報を残してください.ブログのアドレス:http://blog.csdn.net/gaoxingnengjisuan メールアドレス:[email protected]
このブログでは、仮想マシンのインスタンスを作成する要求に対して、Novaスケジューラが最適なホストノードを選択する過程を解析します.
まず方法/nova/scheduler/manager.py----def run_を見にきます.instance:
この方法のクラスScheduler Managerの初期化方法を見てください.
また方法schedule_を見にきます.run_instanceのコード実現:
1.文:hosts=self.host_manager.get_all_ホスト.states(elevated)
利用できないホストノードをフィルタリングし、利用可能なホストノードリストを取得することができます.
2文:hosts=self.host_manager.get_ファイトドドドhosts(hosts,filter properties)
仮想マシンのインスタンスを作成するための要求に対して、システムで指定されたフィルタで上記で取得された利用可能なホストノードのリストをフィルタリングし、さらにフィルタ要求に合致するホストリストを得ることができる.
3文:weighed_hosts=self.host_manager.get_weighed_hosts(hosts,filter properties)
フィルタリングされたホストリストの各ホストノードを秤量操作し、仮想マシンのインスタンスを作成するターゲットホストとして、ある標準の下で最適なホストノードを選択する.
はい、具体的にこれらの語句を解析します.
1.hosts=self.host_manager.get_all_ホスト.states(elevated)
具体的には方法get_all_ホスト.statesのコード実現:
このステートメントは、データベースからすべてのcomputteNodesを取得し、具体的には方法をcomputte_node_ゲットするallのコード実現:
この文はホストを初期化するいくつかのパラメータを実現しています.
このステートメントは、コンピュータ情報からホスト情報を更新することを実現しています.
具体的には方法get_ファイトドドドhostsのコード実現:
この文は検証されたフィルタを返すことを実現しています.具体的には方法を説明します.チョイスホスト.filtersのコード実現:
このブログでは、仮想マシンのインスタンスを作成する要求に対して、Novaスケジューラが最適なホストノードを選択する過程を解析します.
まず方法/nova/scheduler/manager.py----def run_を見にきます.instance:
def run_instance(self, context, request_spec, admin_password,
injected_files, requested_networks, is_first_time,
filter_properties):
"""
schedule_run_instance;
vm_state ERROR;
context: ;
request_spec: ;
admin_password:admin ;
injected_files: ;
requested_networks: ;
is_first_time: ;
filter_properties: ;
"""
# UUID ;
instance_uuids = request_spec['instance_uuids']
# EventReporter: ;
# EventReporter ;
with compute_utils.EventReporter(context, conductor_api.LocalAPI(),
'schedule', *instance_uuids):
# schedule_run_instance /nova/scheduler/chance.py /nova/scheduler/filter_scheduler.py ;
# , driver ;
# self.driver = importutils.import_object(scheduler_driver);
# , , scheduler_driver ;
# , scheduler_driver , ;
# scheduler_driver = CONF.scheduler_driver;
# CONF.scheduler_driver nova.scheduler.filter_scheduler.FilterScheduler;
# , scheduler_driver , /nova/scheduler/filter_scheduler.py schedule_run_instance ;
# ;
# /nova/scheduler/filter_scheduler.py schedule_run_instance ;
# ;
# ;
# ;
# ;
# ;
# ;
# /nova/scheduler/chance.py schedule_run_instance ;
# ;
# ;
# ;
# ;
# : ;
# ;
try:
# driver = nova.scheduler.filter_scheduler.FilterScheduler
return self.driver.schedule_run_instance(context,
request_spec, admin_password, injected_files,
requested_networks, is_first_time, filter_properties)
except exception.NoValidHost as ex:
# don't re-raise
self._set_vm_state_and_notify('run_instance',
{'vm_state': vm_states.ERROR,
'task_state': None},
context, ex, request_spec)
except Exception as ex:
with excutils.save_and_reraise_exception():
self._set_vm_state_and_notify('run_instance',
{'vm_state': vm_states.ERROR,
'task_state': None},
context, ex, request_spec)
ステートメントreturn self.driver.schedule_を見に来ました.run_instance(context、request uspec、admin uplassityword、inject edufiles、requested dunetworks、is_firstties)この方法のクラスScheduler Managerの初期化方法を見てください.
class SchedulerManager(manager.Manager):
"""
;
"""
RPC_API_VERSION = '2.6'
def __init__(self, scheduler_driver=None, *args, **kwargs):
# scheduler_driver , CONF.scheduler_driver scheduler_driver;
# scheduler_driver: nova.scheduler.filter_scheduler.FilterScheduler;
if not scheduler_driver:
scheduler_driver = CONF.scheduler_driver
# import_object: , ;
# scheduler_driver ;
# scheduler_driver nova.scheduler.filter_scheduler.FilterScheduler;
# nova.scheduler.filter_scheduler.FilterScheduler ;
# ;
self.driver = importutils.import_object(scheduler_driver)
super(SchedulerManager, self).__init__(*args, **kwargs)
またscheduler_に来てくださいdriverの定義:scheduler_driver_opt = cfg.StrOpt('scheduler_driver',
default='nova.scheduler.filter_scheduler.FilterScheduler',
help='Default driver to use for the scheduler')
スケジューラが適用しているホストの選択方法は動的に導入されていることが分かります.デフォルトではnova.scheduler.filter_を適用します.scheduler.FilterSchdulerクラスによって定義されるホストフィルタに基づくスケジューラ方法.また方法schedule_を見にきます.run_instanceのコード実現:
def schedule_run_instance(self, context, request_spec,
admin_password, injected_files,
requested_networks, is_first_time,
filter_properties):
"""
nova.compute.api , ;
;
;
, ;
;
;
;
"""
# request_spec: ;
# ;
payload = dict(request_spec=request_spec)
# notifier.publisher_id("scheduler"): ID;
# "scheduler.host";
# notify: ;( )
notifier.notify(context, notifier.publisher_id("scheduler"),'scheduler.run_instance.start', notifier.INFO, payload)
# request_spec instance_uuids ;
instance_uuids = request_spec.pop('instance_uuids')
# ;
num_instances = len(instance_uuids)
LOG.debug(_("Attempting to build %(num_instances)d instance(s)") % locals())
# ;
# , ;
weighed_hosts = self._schedule(context, request_spec, filter_properties, instance_uuids)
# @@@ , ;
filter_properties.pop('context', None)
# instance_uuids;
for num, instance_uuid in enumerate(instance_uuids):
request_spec['instance_properties']['launch_index'] = num
try:
# ;
try:
weighed_host = weighed_hosts.pop(0)
except IndexError:
raise exception.NoValidHost(reason="")
# context: ;
# weighed_host: ;
# request_spec: ;
# filter_properties: ;
# requested_networks: ;
# injected_files: ;
# admin_password:admin ;
# is_first_time: ;
# instance_uuid: UUID;
# _provision_resource: ;
# ;
self._provision_resource(context, weighed_host,
request_spec,
filter_properties,
requested_networks,
injected_files, admin_password,
is_first_time,
instance_uuid=instance_uuid)
except Exception as ex:
driver.handle_schedule_error(context, ex, instance_uuid, request_spec)
retry = filter_properties.get('retry', {})
retry['hosts'] = []
# @@@notify: ;
# ;
notifier.notify(context, notifier.publisher_id("scheduler"),
'scheduler.run_instance.end', notifier.INFO, payload)
私たちは注目の言葉weighed_を比較します.hosts=self.uschedule(context、request uspec、filterCauproperties、instance uuids)は、この文は呼び出し方法を通じて(u)scheduleは、各インスタンスにループして適切なホストを取得した後、利用可能なホストリストを返すことを実現する.具体的に方法を説明しますscheduleのコード実現:def _schedule(self, context, request_spec, filter_properties, instance_uuids=None):
"""
;
, ;
;
FilterManager Scheduler ;
Scheduler , filter ;
scheduler_default_filters filter;
filter host_passes() , True, ;
;
"""
# elevated: admin context ;
elevated = context.elevated()
# (instance_properties);
instance_properties = request_spec['instance_properties']
# (instance_type);
instance_type = request_spec.get("instance_type", None)
update_group_hosts = False
# scheduler_hints;
scheduler_hints = filter_properties.get('scheduler_hints') or {}
# group ;
group = scheduler_hints.get('group', None)
if group:
# group_hosts: group ;
group_hosts = self.group_hosts(elevated, group)
# group_hosts;
update_group_hosts = True
# filter_properties 'group_hosts', 'group_hosts' filter_properties ;
if 'group_hosts' not in filter_properties:
filter_properties.update({'group_hosts': []})
# filter_properties 'group_hosts';
# 'group_hosts' ;(@@@ , ;)
configured_hosts = filter_properties['group_hosts']
filter_properties['group_hosts'] = configured_hosts + group_hosts
# ;
config_options = self._get_configuration_options()
# ;
properties = instance_properties.copy()
# instance_uuids properties['uuid'];
if instance_uuids:
properties['uuid'] = instance_uuids[0]
# ;
self._populate_retry(filter_properties, properties)
# ;
filter_properties.update({'context': context,
'request_spec': request_spec,
'config_options': config_options,
'instance_type': instance_type})
# request_spec , ;
# filter_properties['os_type'] filter_properties['project_id'];
self.populate_filter_properties(request_spec, filter_properties)
# ;
# , ;
# HostStates ;
# ;
hosts = self.host_manager.get_all_host_states(elevated)
selected_hosts = []
# ;
if instance_uuids:
num_instances = len(instance_uuids)
else:
num_instances = request_spec.get('num_instances', 1)
# num_instances , ;
for num in xrange(num_instances):
# Filter local hosts based on requirements ...
# ;
# get_filtered_hosts: , ;
hosts = self.host_manager.get_filtered_hosts(hosts, filter_properties)
if not hosts:
break
LOG.debug(_("Filtered %(hosts)s") % locals())
# ;
# WeighedObjects ( );
weighed_hosts = self.host_manager.get_weighed_hosts(hosts, filter_properties)
# scheduler_host_subset_size: , ( )N ;
# , ;
# 1, ;
# 1, 1 , 1 ;
# 1;
scheduler_host_subset_size = CONF.scheduler_host_subset_size
if scheduler_host_subset_size > len(weighed_hosts):
scheduler_host_subset_size = len(weighed_hosts)
if scheduler_host_subset_size < 1:
scheduler_host_subset_size = 1
# , , ;
# ;
chosen_host = random.choice(
weighed_hosts[0:scheduler_host_subset_size])
LOG.debug(_("Choosing host %(chosen_host)s") % locals())
# selected_hosts ;
selected_hosts.append(chosen_host)
# , , ;
chosen_host.obj.consume_from_instance(instance_properties)
if update_group_hosts is True:
filter_properties['group_hosts'].append(chosen_host.obj.host)
# , ;
return selected_hosts
この方法の実現過程では、主に三段階である.1.文:hosts=self.host_manager.get_all_ホスト.states(elevated)
利用できないホストノードをフィルタリングし、利用可能なホストノードリストを取得することができます.
2文:hosts=self.host_manager.get_ファイトドドドhosts(hosts,filter properties)
仮想マシンのインスタンスを作成するための要求に対して、システムで指定されたフィルタで上記で取得された利用可能なホストノードのリストをフィルタリングし、さらにフィルタ要求に合致するホストリストを得ることができる.
3文:weighed_hosts=self.host_manager.get_weighed_hosts(hosts,filter properties)
フィルタリングされたホストリストの各ホストノードを秤量操作し、仮想マシンのインスタンスを作成するターゲットホストとして、ある標準の下で最適なホストノードを選択する.
はい、具体的にこれらの語句を解析します.
1.hosts=self.host_manager.get_all_ホスト.states(elevated)
具体的には方法get_all_ホスト.statesのコード実現:
def get_all_host_states(self, context):
"""
HostStates ;
HostStates HostManager ;
,HostState consumable resources ;
;
;
"""
# ;
# computeNodes( );
compute_nodes = db.compute_node_get_all(context)
# ;
seen_nodes = set()
for compute in compute_nodes:
service = compute['service']
if not service:
LOG.warn(_("No service for compute ID %s") % compute['id'])
continue
# host;
host = service['host']
# hypervisor_hostname ;
node = compute.get('hypervisor_hostname')
# state_key;
state_key = (host, node)
# capabilities;
capabilities = self.service_states.get(state_key, None)
# host_state;
host_state = self.host_state_map.get(state_key)
# capabilities ;
if host_state:
host_state.update_capabilities(capabilities, dict(service.iteritems()))
# @@@ ;
# , ;
else:
host_state = self.host_state_cls(host, node,capabilities=capabilities,service=dict(service.iteritems()))
self.host_state_map[state_key] = host_state
# update_from_compute_node: compute ;
host_state.update_from_compute_node(compute)
# seen_nodes;
seen_nodes.add(state_key)
# host_state_map ;
# ;
dead_nodes = set(self.host_state_map.keys()) - seen_nodes
for state_key in dead_nodes:
host, node = state_key
LOG.info(_("Removing dead compute node %(host)s:%(node)s "
"from scheduler") % locals())
del self.host_state_map[state_key]
return self.host_state_map.itervalues()
1.1 computte_nodes=db.com mpute_node_ゲットするall(context)このステートメントは、データベースからすべてのcomputteNodesを取得し、具体的には方法をcomputte_node_ゲットするallのコード実現:
def compute_node_get_all(context):
"""
ComputeNode( );
"""
return model_query(context, models.ComputeNode).\
options(joinedload('service')).\
options(joinedload('stats')).\
all()
1.2 host_state=self.host_state_cls(host,node,capabilitys=capabilityes,service=dict(service.iteritems))この文はホストを初期化するいくつかのパラメータを実現しています.
host_state_cls = HostState
class HostState(object):
def __init__(self, host, node, capabilities=None, service=None):
self.host = host
self.nodename = node
self.update_capabilities(capabilities, service)
# Mutable available resources.
# These will change as resources are virtually "consumed".
self.total_usable_disk_gb = 0
self.disk_mb_used = 0
self.free_ram_mb = 0
self.free_disk_mb = 0
self.vcpus_total = 0
self.vcpus_used = 0
# Valid vm types on this host: 'pv', 'hvm' or 'all'
if 'allowed_vm_type' in self.capabilities:
self.allowed_vm_type = self.capabilities['allowed_vm_type']
else:
self.allowed_vm_type = 'all'
# Additional host information from the compute node stats:
self.vm_states = {}
self.task_states = {}
self.num_instances = 0
self.num_instances_by_project = {}
self.num_instances_by_os_type = {}
self.num_io_ops = 0
# Resource oversubscription values for the compute host:
self.limits = {}
self.updated = None
1.3 host_state.udate_fromcomputenode(compute)このステートメントは、コンピュータ情報からホスト情報を更新することを実現しています.
def update_from_compute_node(self, compute):
"""
compute_node ;
"""
# , ;
if (self.updated and compute['updated_at'] and self.updated > compute['updated_at']):
return
# all_ram_mb;
all_ram_mb = compute['memory_mb']
# qcow2 , ;
least = compute.get('disk_available_least')
# free_disk_mb;
free_disk_mb = least if least is not None else compute['free_disk_gb']
free_disk_mb *= 1024
# ;
self.disk_mb_used = compute['local_gb_used'] * 1024
# free_ram_mb ;
self.free_ram_mb = compute['free_ram_mb']
# total_usable_ram_mb;
self.total_usable_ram_mb = all_ram_mb
# total_usable_disk_gb;
self.total_usable_disk_gb = compute['local_gb']
self.free_disk_mb = free_disk_mb
self.vcpus_total = compute['vcpus']
self.vcpus_used = compute['vcpus_used']
self.updated = compute['updated_at']
stats = compute.get('stats', [])
statmap = self._statmap(stats)
# ;
self.num_instances = int(statmap.get('num_instances', 0))
# project_id ;
project_id_keys = [k for k in statmap.keys() if
k.startswith("num_proj_")]
for key in project_id_keys:
project_id = key[9:]
self.num_instances_by_project[project_id] = int(statmap[key])
# vm_states ;
vm_state_keys = [k for k in statmap.keys() if k.startswith("num_vm_")]
for key in vm_state_keys:
vm_state = key[7:]
self.vm_states[vm_state] = int(statmap[key])
# task_states ;
task_state_keys = [k for k in statmap.keys() if
k.startswith("num_task_")]
for key in task_state_keys:
task_state = key[9:]
self.task_states[task_state] = int(statmap[key])
# host_type ;
os_keys = [k for k in statmap.keys() if k.startswith("num_os_type_")]
for key in os_keys:
os = key[12:]
self.num_instances_by_os_type[os] = int(statmap[key])
# num_io_ops;
self.num_io_ops = int(statmap.get('io_workload', 0))
2.hosts=self.host_manager.get_ファイトドドドhosts(hosts,filter properties)具体的には方法get_ファイトドドドhostsのコード実現:
def get_filtered_hosts(self, hosts, filter_properties, filter_class_names=None):
"""
, ;
"""
def _strip_ignore_hosts(host_map, hosts_to_ignore):
ignored_hosts = []
for host in hosts_to_ignore:
if host in host_map:
del host_map[host]
ignored_hosts.append(host)
ignored_hosts_str = ', '.join(ignored_hosts)
msg = _('Host filter ignoring hosts: %(ignored_hosts_str)s')
LOG.debug(msg, locals())
def _match_forced_hosts(host_map, hosts_to_force):
for host in host_map.keys():
if host not in hosts_to_force:
del host_map[host]
if not host_map:
forced_hosts_str = ', '.join(hosts_to_force)
msg = _("No hosts matched due to not matching 'force_hosts'"
"value of '%(forced_hosts_str)s'")
LOG.debug(msg, locals())
return
forced_hosts_str = ', '.join(host_map.iterkeys())
msg = _('Host filter forcing available hosts to '
'%(forced_hosts_str)s')
LOG.debug(msg, locals())
# ;
filter_classes = self._choose_host_filters(filter_class_names)
ignore_hosts = filter_properties.get('ignore_hosts', [])
force_hosts = filter_properties.get('force_hosts', [])
if ignore_hosts or force_hosts:
name_to_cls_map = dict([(x.host, x) for x in hosts])
if ignore_hosts:
_strip_ignore_hosts(name_to_cls_map, ignore_hosts)
if not name_to_cls_map:
return []
if force_hosts:
_match_forced_hosts(name_to_cls_map, force_hosts)
# NOTE(vish): Skip filters on forced hosts.
if name_to_cls_map:
return name_to_cls_map.values()
hosts = name_to_cls_map.itervalues()
return self.filter_handler.get_filtered_objects(filter_classes, hosts, filter_properties)
2.1 filter_clases=self.uチョイスホスト.filters(filterCauclass unames)この文は検証されたフィルタを返すことを実現しています.具体的には方法を説明します.チョイスホスト.filtersのコード実現:
def _choose_host_filters(self, filter_cls_names):
"""
;
"""
# ;
if filter_cls_names is None:
# CONF.scheduler_default_filters: , ;
# :
# ['RetryFilter','AvailabilityZoneFilter','RamFilter','ComputeFilter','ComputeCapabilitiesFilter','ImagePropertiesFilter']
filter_cls_names = CONF.scheduler_default_filters
if not isinstance(filter_cls_names, (list, tuple)):
filter_cls_names = [filter_cls_names]
good_filters = []
bad_filters = []
# ( );
for filter_name in filter_cls_names:
found_class = False
# ;
for cls in self.filter_classes:
# , ;
if cls.__name__ == filter_name:
good_filters.append(cls)
found_class = True
break
if not found_class:
bad_filters.append(filter_name)
if bad_filters:
msg = ", ".join(bad_filters)
raise exception.SchedulerHostFilterNotFound(filter_name=msg)
return good_filters
2.2 return self.filter_handler.get_ファイトドドドobject(filterCauclasss,hosts,filterCauproperties)def get_filtered_objects(self, filter_classes, objs, filter_properties):
for filter_cls in filter_classes:
objs = filter_cls().filter_all(objs, filter_properties)
return list(objs)
def filter_all(self, filter_obj_list, filter_properties):
for obj in filter_obj_list:
if self._filter_one(obj, filter_properties):
yield obj
def _filter_one(self, obj, filter_properties):
"""
TRUE, FALSE;
"""
return self.host_passes(obj, filter_properties)
3.weighed_hosts=self.host_manager.get_weighed_hosts(hosts,filter properties)def get_weighed_hosts(self, hosts, weight_properties):
"""
;
WeighedObjects ( );
"""
# get_weighed_objects: WeighedObjects ( );
return self.weight_handler.get_weighed_objects(self.weight_classes, hosts, weight_properties)
# scheduler_weight_classes: ;
# nova.scheduler.weights.all_weighers;
# CONF.scheduler_available_filters ;
# ;
# nova.scheduler.weights.all_weighers ;
self.weight_classes = self.weight_handler.get_matching_classes(CONF.scheduler_weight_classes)
cfg.ListOpt('scheduler_weight_classes',
default=['nova.scheduler.weights.all_weighers'],
help='Which weight class names to use for weighing hosts'),
# ;
# nova.scheduler.weights.all_weighers;
def all_weighers():
"""
Return a list of weight plugin classes found in this directory.
"""
# least_cost_functions: LeastCostScheduler;
# None;
# compute_fill_first_cost_fn_weight: None;
if (CONF.least_cost_functions is not None or
CONF.compute_fill_first_cost_fn_weight is not None):
LOG.deprecated(('least_cost has been deprecated in favor of the RAM Weigher.'))
return least_cost.get_least_cost_weighers()
return HostWeightHandler().get_all_classes()
def get_weighed_objects(self, weigher_classes, obj_list, weighing_properties):
"""
WeighedObjects ( );
"""
if not obj_list:
return []
# object_class = WeighedObject
weighed_objs = [self.object_class(obj, 0.0) for obj in obj_list]
for weigher_cls in weigher_classes:
weigher = weigher_cls()
weigher.weigh_objects(weighed_objs, weighing_properties)
return sorted(weighed_objs, key=lambda x: x.weight, reverse=True)
object_class = WeighedObject
class WeighedObject(object):
"""
;
"""
def __init__(self, obj, weight):
self.obj = obj
self.weight = weight
def get_least_cost_weighers():
cost_functions = _get_cost_functions()
# Unfortunately we need to import this late so we don't have an
# import loop.
from nova.scheduler import weights
class _LeastCostWeigher(weights.BaseHostWeigher):
def weigh_objects(self, weighted_hosts, weight_properties):
for host in weighted_hosts:
host.weight = sum(weight * fn(host.obj, weight_properties)
for weight, fn in cost_functions)
return [_LeastCostWeigher]
def _get_cost_functions():
"""
Returns a list of tuples containing weights and cost functions to
use for weighing hosts
"""
cost_fns_conf = CONF.least_cost_functions
if cost_fns_conf is None:
# The old default. This will get fixed up below.
fn_str = 'nova.scheduler.least_cost.compute_fill_first_cost_fn'
cost_fns_conf = [fn_str]
cost_fns = []
for cost_fn_str in cost_fns_conf:
short_name = cost_fn_str.split('.')[-1]
if not (short_name.startswith('compute_') or
short_name.startswith('noop')):
continue
# Fix up any old paths to the new paths
if cost_fn_str.startswith('nova.scheduler.least_cost.'):
cost_fn_str = ('nova.scheduler.weights.least_cost' +
cost_fn_str[25:])
try:
# NOTE: import_class is somewhat misnamed since
# the weighing function can be any non-class callable
# (i.e., no 'self')
cost_fn = importutils.import_class(cost_fn_str)
except ImportError:
raise exception.SchedulerCostFunctionNotFound(
cost_fn_str=cost_fn_str)
try:
flag_name = "%s_weight" % cost_fn.__name__
weight = getattr(CONF, flag_name)
except AttributeError:
raise exception.SchedulerWeightFlagNotFound(
flag_name=flag_name)
# Set the original default.
if (flag_name == 'compute_fill_first_cost_fn_weight' and
weight is None):
weight = -1.0
cost_fns.append((weight, cost_fn))
return cost_fns
実は、流れは比較的に簡単で、濾過と重さを量る過程のです.