Openstack Cinderでvolumeプロセスを確立するソースコード解析(5)----およびtaskflow関連解析


友达がこのブログを支持することに感谢して、共に交流を探求することを歓迎して、能力と时间が限られているため、间违ったところは避けられないで、指摘を歓迎します!転載する場合は、作者情報を保持してください.ブログアドレス:http://blog.csdn.net/gaoxingnengjisuan メールアドレス:[email protected]
ここでは冒頭の言葉を書かずに、そのまま前のブログに続きます!
2.ボリュームを構築して戻すflow(続行)
まず、Flowクラスの初期化をどのように実現するかに注目し、文を見てみましょう.
api_flow = linear_flow.Flow(flow_name)
タスクフローベースクラスでどのような方法が定義されているかを見てみましょう.これは、タスクフローの全体的な概念を理解するのに役立ちます.
class Flow(object):     """    The base abstract class of all flow implementations.     Flow抽象クラス;    """    def __init__(self, name, parents=None, uuid=None):         super(Flow, self).__init__(name, parents, uuid)         # ロールバックメソッドアキュムレータクラスのインスタンス化オブジェクトを取得します.        self._accumulator = utils.RollbackAccumulator()         self.results = {}         self._leftoff_at = None         # 実行するすべてのタスクの集合;        self._runners = []         self._connected = False         # 使用するリカバリポリシーを決定します.        self.resumer = None
    def name(self)flow読み取り可能な非一意の名前;    def uuid(self)flow一意の識別;    def state(self)はflowに読み取り専用の状態情報を提供する.    def_change_state(self,context,new_state)は、現在のflow状態を新しい状態に変更し、通知操作を実行する.    def add(self,task)は、所定のtaskをワークフローに追加する.    def add_many(self,tasks)は、所定のいくつかのtaskをワークフローに追加する.    def interrupt(self)は、現在のflowと現在flowで実行されていないtaskを中断しようと試みる.    def reset(self)はflowの内部の状態を完全にリセットし、flowの再実行を許可する.    def soft_reset(self)はflowの内部状態を部分的にリセットし、flowを中止した状態から再実行させる.    def run(self,context,*args,**kwargs)ワークフロー(workflow)の実行操作;    def rollback(self,context,cause)はworkflowとその親workflowのロールバック操作を実行する.
ここでtaskの追加をどのように実現するかを見てみましょう.例えば、文:
api_flow.add(base.InjectTask(create_what, addons=[ACTION]))
方法addのソースコード実装を見てみましょう.
    @decorators.locked
    def add(self, task):
        """
      Adds a given task to this flow.
               task flow;
        """
        assert isinstance(task, collections.Callable)
        r = utils.Runner(task)
        r.runs_before = list(reversed(self._runners))
        self._runners.append(r)
        self._reset_internals()
        return r.uuid

実装される機能は、所与のtaskをflowに追加することであり、具体的な実装はself._runnersに追加することであり、各taskはRunnerクラスのオブジェクトにパッケージされ、runs_beforeには現在のtaskを除く以前のtaskが格納され、ここでの変数情報の後のflow実行実装で使用される.
ボリュームの作成に使用するflowにどのtaskが追加されているかに注目し続けます.
2.1 api_flow.add(base.InjectTask(create_what, addons=[ACTION]))
このクラスは辞書情報create_whatをflowに注入することを実現し、create_whatはボリュームを構築するために守らなければならない規範情報(辞書)である.
クラスInjectTaskの具体的なコードを見てみましょう.
class InjectTask(CinderTask):
    """
                 flow ;
    """

    def __init__(self, inject_what, addons=None):
        super(InjectTask, self).__init__(addons=addons)
        self.provides.update(inject_what.keys())
        self._inject = inject_what

    def __call__(self, context):
        return dict(self._inject)

このクラスはtaskの形でflowに追加され、このクラスの__call__メソッドから、このクラスは実際にcreate_whatの辞書化処理を実現していることがわかります.
このtaskクラスではrevertメソッドは具体的に実装されていない.このtaskではロールバック操作が必要ないからである.
2.2 api_flow.add(ExtractVolumeRequestTask(image_service, az_check_functor))
このtaskクラスが実現する機能は、入力された要求情報の関連パラメータを検証し、これらのパラメータを有効な集合に変換し、これらの検証と変換されたパラメータを返し、これらのパラメータは後続のtaskの実現過程に適用される.
クラスExtractVolumeRequestTaskの具体的なコードを見てみましょう.
class ExtractVolumeRequestTask(base.CinderTask):
    """
                      ;

      task               ,                    ;
                        ,               ;
                    ,            task 。
    """

    # image_service:          
    # az_check_functor:  availability_zone      (        zone    );
    def __init__(self, image_service, az_check_functor=None):
        super(ExtractVolumeRequestTask, self).__init__(addons=[ACTION])
        self.provides.update(['availability_zone', 'size', 'snapshot_id',
                              'source_volid', 'volume_type', 'volume_type_id',
                              'encryption_key_id'])
        self.requires.update(['availability_zone', 'image_id', 'metadata',
                              'size', 'snapshot', 'source_volume',
                              'volume_type', 'key_manager',
                              'backup_source_volume'])
        self.image_service = image_service
        self.az_check_functor = az_check_functor
        if not self.az_check_functor:
            self.az_check_functor = lambda az: True

    def __call__(self, context, size, snapshot, image_id, source_volume,
                 availability_zone, volume_type, metadata,
                 key_manager, backup_source_volume):
        """
          task               ,                    ;
                            ,               ;
                        ,            task 。
        """

        #                       ;
        utils.check_exclusive_options(snapshot=snapshot,
                                      imageRef=image_id,
                                      source_volume=source_volume)
        #             ACTION    ;
        # ACTION = 'volume:create';
        policy.enforce_action(context, ACTION)
        
        #             id  ;
        snapshot_id = self._extract_snapshot(snapshot)
        #           id  ;
        source_volid = self._extract_source_volume(source_volume)
        #          ;
        size = self._extract_size(size, source_volume, snapshot)
        #         ,                   ;
        self._check_image_metadata(context, image_id, size)
        #                zone;
        availability_zone = self._extract_availability_zone(availability_zone,
                                                            snapshot,
                                                            source_volume)
        
        #       volume_type,          ;
        if not volume_type and not source_volume and not snapshot:
            #          ;
            volume_type = volume_types.get_default_volume_type()

        #         id ;
        volume_type_id = self._get_volume_type_id(volume_type,
                                                  source_volume, snapshot,
                                                  backup_source_volume)
        
        #          id ;
        encryption_key_id = self._get_encryption_key_id(key_manager,
                                                        context,
                                                        volume_type_id,
                                                        snapshot,
                                                        source_volume,
                                                        backup_source_volume)

        specs = {}
        #             QOS       ;
        if volume_type_id:
            qos_specs = volume_types.get_volume_type_qos_specs(volume_type_id)
            specs = qos_specs['qos_specs']
        if not specs:
            specs = None

        #              ;
        self._check_metadata_properties(metadata)

        #            ;
        #       task   ;
        return {
            'size': size,
            'snapshot_id': snapshot_id,
            'source_volid': source_volid,
            'availability_zone': availability_zone,
            'volume_type': volume_type,
            'volume_type_id': volume_type_id,
            'encryption_key_id': encryption_key_id,
            'qos_specs': specs,
        }

具体的な機能の実現過程は私のコードの中の注釈情報を見ることができて、ここではもう詳しく述べないで、このtaskクラスの中でもロールバック操作を必要としません;
2.3 api_flow.add(QuotaReserveTask())
このtaskクラスでは主にいくつかのステップの内容が実現されています.つまり、
与えられた確立された新しいボリュームの大きさとタイプに基づいて、資源割当情報を検出し、確立されたボリュームの実行可能性を検出する.
指定された新しいボリュームのサイズとタイプに基づいて、データベース内のリソース割当情報の更新を実現します.
新しいボリュームを確立する前の関連リソース割当情報を保持し、ボリューム確立の実行に異常が発生した場合、逆転ロールバックメソッドを呼び出してリソース割当情報の回復を実現する.
クラスQuotaReserveTaskの具体的なコードを見てみましょう.
class QuotaReserveTask(base.CinderTask):
    """
                             ;
    """

    def __init__(self):
        super(QuotaReserveTask, self).__init__(addons=[ACTION])
        self.requires.update(['size', 'volume_type_id'])
        self.provides.update(['reservations'])

    def __call__(self, context, size, volume_type_id):
        """
                                 ;
                       ,           ,         ;
                       ,                ;
                         ,               ,                   ;
        """
        try:
            reserve_opts = {'volumes': 1, 'gigabytes': size}
            #          reserve_opts,reserve_opts        ;
            QUOTAS.add_volume_type_opts(context, reserve_opts, volume_type_id)
            #                      ;
            reservations = QUOTAS.reserve(context, **reserve_opts)
            return {
                'reservations': reservations,
            }
        except exception.OverQuota as e:
            overs = e.kwargs['overs']
            quotas = e.kwargs['quotas']
            usages = e.kwargs['usages']

            def _consumed(name):
                return (usages[name]['reserved'] + usages[name]['in_use'])

            def _is_over(name):
                for over in overs:
                    if name in over:
                        return True
                return False

            if _is_over('gigabytes'):
                msg = _("Quota exceeded for %(s_pid)s, tried to create "
                        "%(s_size)sG volume (%(d_consu:med)dG "
                        "of %(d_quota)dG already consumed)")
                LOG.warn(msg % {'s_pid': context.project_id,
                                's_size': size,
                                'd_consumed': _consumed('gigabytes'),
                                'd_quota': quotas['gigabytes']})
                raise exception.VolumeSizeExceedsAvailableQuota()
            elif _is_over('volumes'):
                msg = _("Quota exceeded for %(s_pid)s, tried to create "
                        "volume (%(d_consumed)d volumes "
                        "already consumed)")
                LOG.warn(msg % {'s_pid': context.project_id,
                                'd_consumed': _consumed('volumes')})
                allowed = quotas['volumes']
                raise exception.VolumeLimitExceeded(allowed=quotas['volumes'])
            else:
                # If nothing was reraised, ensure we reraise the initial error
                raise

    def revert(self, context, result, cause):
        """
          result  reservations     ,                      ;
        """
        if not result:
            return
        if context.quota_committed:
            return

        reservations = result['reservations']
        
        #   reservations     ,                      ;
        try:
            QUOTAS.rollback(context, reservations)
        except exception.CinderException:
            LOG.exception(_("Failed rolling back quota for"
                            " %s reservations"), reservations)

具体的な機能の実現過程は私のコードの中の注釈情報を見ることができて、ここではもう述べない;このクラスの中でrevert方法を実現して、ボリュームを創立する過程の中で異常が発生する時、resultの中のreservationsの保留する情報に基づいて、データベースの中のボリュームの割当情報を新しいボリュームを創立する前の状態に回復することを実現する;方法revertの中で、具体的に方法QUOTASを呼び出した.rollback、コードの実装手順を簡単に見てみましょう.
    def rollback(self, context, reservations, project_id=None):
        """
                ;
          reservations     ,                      ;
        """

        #         ;
        #   reservations     ,                      ;
        try:
            self._driver.rollback(context, reservations, project_id=project_id)
        except Exception:
            LOG.exception(_("Failed to roll back reservations "
                            "%s") % reservations)

    def rollback(self, context, reservations, project_id=None):
        """
                ;
          reservations     ,                      ;
        """
        if project_id is None:
            project_id = context.project_id

        db.reservation_rollback(context, reservations, project_id=project_id)

    def reservation_rollback(context, reservations, project_id=None):
        """
                ;
          reservations     ,                      ;
        """
        return IMPL.reservation_rollback(context, reservations,
                                         project_id=project_id)

    @require_context
    def reservation_rollback(context, reservations, project_id=None):
        """
                ;
          reservations     ,                      ;
        """
        session = get_session()
        with session.begin():
            usages = _get_quota_usages(context, session, project_id)

            for reservation in _quota_reservations(session, context, reservations):
                usage = usages[reservation.resource]
                if reservation.delta >= 0:
                    usage.reserved -= reservation.delta

                reservation.delete(session=session)

            for usage in usages.values():
                usage.save(session=session)

    def _quota_reservations(session, context, reservations):
        """Return the relevant reservations."""

        # Get the listed reservations
        return model_query(context, models.Reservation,
                           read_deleted="no",
                           session=session).\
            filter(models.Reservation.uuid.in_(reservations)).\
            with_lockmode('update').\
            all()

最後の方法では、reservationsに従ってデータテーブルReservationで対応する保存されたリソース割当情報をクエリーし、データベース内のボリュームのリソース割当情報を新しいボリュームを確立する前の状態に復元することができます.
2.4 v_uuid = api_flow.add(EntryCreateTask(db))
このtaskクラスが実現する機能は、データベース内で所定の確立すべきボリュームのために関連エントリを確立することである.
クラスEntryCreateTaskの具体的なコードを見てみましょう.
class EntryCreateTask(base.CinderTask):
    """
                  ;
        :       volume_id     ;
    """

    def __init__(self, db):
        super(EntryCreateTask, self).__init__(addons=[ACTION])
        self.db = db
        self.requires.update(['availability_zone', 'description', 'metadata',
                              'name', 'reservations', 'size', 'snapshot_id',
                              'source_volid', 'volume_type_id',
                              'encryption_key_id'])
        self.provides.update(['volume_properties', 'volume_id'])

    def __call__(self, context, **kwargs):
        """
                         ,       ;
         kwargs          ,                      ;
        """

        volume_properties = {
            'size': kwargs.pop('size'),
            'user_id': context.user_id,
            'project_id': context.project_id,
            'status': 'creating',
            'attach_status': 'detached',
            'encryption_key_id': kwargs.pop('encryption_key_id'),
            # Rename these to the internal name.
            'display_description': kwargs.pop('description'),
            'display_name': kwargs.pop('name'),
        }

        volume_properties.update(kwargs)
        #         volume_properties       ;
        volume = self.db.volume_create(context, volume_properties)

        return {
            'volume_id': volume['id'],
            'volume_properties': volume_properties,
            'volume': volume,
        }

    def revert(self, context, result, cause):
        """
                          ,        ;
        """
        
        #   result none,            ,            ;
        if not result:
            return
        
        # quota_committed          ,         ;
        if context.quota_committed:
            return
        vol_id = result['volume_id']
        
        #                   ;
        try:
            self.db.volume_destroy(context.elevated(), vol_id)
        except exception.CinderException:
            LOG.exception(_("Failed destroying volume entry %s"), vol_id)

このtaskクラスの実装機能は簡単で、データベースに新しく確立されたボリュームに関連するエントリ情報を追加することであり、ボリュームを確立する操作に異常が発生した場合、メソッドrevertを呼び出して、データベースで新しく確立されたボリュームのデータエントリ情報を削除し、逆ロールバック操作を実現することができる.
2.5 api_flow.add(QuotaCommitTask())
このtaskクラスが実現する機能は,ボリュームの確立が成功したと一時的に仮定し,その際にリソース割当情報を変更する必要があり,ここでは新しいリソース割当情報をデータベースに提出することである.ボリュームの確立に異常が発生した場合,このtaskにおいてもrevertメソッドが実現され,この方法は新しく確立したボリュームの大きさなどの情報に基づいて,変更後のリソース割当からメッセージを残すことである.新しいボリュームのサイズなどの情報を減算することで、ボリュームのリソース割当予約情報を新しいボリュームの作成前の状態に復元することができます.
クラスQuotaCommitTaskの具体的なコードを見てみましょう.
class QuotaCommitTask(base.CinderTask):
    """
                      ;
    """

    def __init__(self):
        super(QuotaCommitTask, self).__init__(addons=[ACTION])
        self.requires.update(['reservations', 'volume_properties'])

    def __call__(self, context, reservations, volume_properties):
        #                   ;
        QUOTAS.commit(context, reservations)
        context.quota_committed = True
        return {'volume_properties': volume_properties}

    def revert(self, context, result, cause):
        """
                 ,       ,                   ;
        """
        if not result:
            return
        volume = result['volume_properties']
        try:
            reserve_opts = {'volumes': -1, 'gigabytes': -volume['size']}
            #          reserve_opts;
            QUOTAS.add_volume_type_opts(context,
                                        reserve_opts,
                                        volume['volume_type_id'])
            #                      ;
            reservations = QUOTAS.reserve(context,
                                          project_id=context.project_id,
                                          **reserve_opts)
            #                 ;
            if reservations:
                QUOTAS.commit(context, reservations,
                              project_id=context.project_id)
        except Exception:
            LOG.exception(_("Failed to update quota for deleting volume: %s"),
                          volume['id'])

上記の機能の実現はよく理解されている.すなわち、データベースに関連する操作である.
2.6 api_flow.add(OnFailureChangeStatusTask(db))
このtaskクラスが実現する機能は、エラー異常が発生した場合に、指定idのボリュームの状態をERRORに設定することである.
クラスOnFailureChangeStatusTaskの具体的なコードを見てみましょう.
class OnFailureChangeStatusTask(base.CinderTask):
    """
      task         ,    id      ERROR;
    """

    def __init__(self, db):
        super(OnFailureChangeStatusTask, self).__init__(addons=[ACTION])
        self.db = db
        self.requires.update(['volume_id'])
        self.optional.update(['volume_spec'])

    def __call__(self, context, volume_id, volume_spec=None):
        return {
            'volume_id': volume_id,
            'volume_spec': volume_spec,
        }

    def revert(self, context, result, cause):
        volume_spec = result.get('volume_spec')
        if not volume_spec:
            volume_spec = _find_result_spec(cause.flow)

        volume_id = result['volume_id']
        _restore_source_status(context, self.db, volume_spec)
        _error_out_volume(context, self.db, volume_id, reason=cause.exc)
        LOG.error(_("Volume %s: create failed"), volume_id)
        exc_info = False
        if all(cause.exc_info):
            exc_info = cause.exc_info
        LOG.error(_('Unexpected build error:'), exc_info=exc_info)

2.7 api_flow.add(VolumeCastTask(scheduler_rpcapi, volume_rpcapi, db))
このtaskクラスが実現する機能は、具体的なニーズに応じて関連するメソッドを呼び出してボリュームの構築を実現することです.ここでは具体的な分析はさておき、後でここの実装を詳細に解析するために新しいブログを書きます.もちろん、このtaskクラスにはrevertメソッドはありません.
OK!flowに追加された関連taskを簡単に解析しましたが、これらのtaskクラスは親CinderTaskと祖父クラス(ハハハ)Taskから継承されていることがわかりました.関連実装を見てみましょう.
class CinderTask(task.Task):
    """
      cinder     ;
    """
    def __init__(self, addons=None):
        # _make_task_name:        ;
        super(CinderTask, self).__init__(_make_task_name(self.__class__, addons))

ここでは、最初のtaskを追加してテスト出力を行う出力例を見ます.
_make_task_name(self.__class__, addons) = cinder.volume.flows.base.InjectTask;volume:create
実装される機能は、現在のタスククラスのクラス名とタスクを取得することである.
class Task(object):
    """
         task      ,                  ;
    """
    __metaclass__ = abc.ABCMeta

    def __init__(self, name):
        self.name = name
        self.requires = set()
        self.optional = set()
        self.provides = set()
        self.version = (1, 0)

    def __str__(self):
        return "%s==%s" % (self.name, utils.join(self.version, with_what="."))

    @abc.abstractmethod
    def __call__(self, context, *args, **kwargs):
        raise NotImplementedError()

    def revert(self, context, result, cause):
        pass

では、新しいボリュームを作成するためのflowの構築を具体的にどのように実現するかは簡単に解析されています.次のブログでは、構築されたflowの解析をどのように実行するか、つまり前のブログの第3ステップを行います.
遅いですね.明日続けましょう.頑張ってください.