Jump to: navigation, search

StructuredStateManagementDetails

Revision as of 01:06, 25 April 2013 by Harlowja (talk | contribs) (Design details)

Design details

In order to implement of this new orchestration layer the following key concepts must be built into the design from the start.

  1. An atomic task abstraction.
  2. Combining atomic tasks that can be organized into a unit and ran and on failure reconciled via rollbacks.
    1. To start a linear unit is fine.
  3. Task resumption.
  4. Task rollback.
  5. Task tracking.
  6. Resource locking.
  7. Workflow sharding/ownership.
  8. Simplicity (allowing for extension and verifiability).
  9. Tolerant to upgrades.

Atomic task abstraction

Why it matters

Tasks that are created (either via code or other operation) must be atomic so that the task as a unit can be said to have completed or the task as a unit can be said to have failed. This allows for said task to be rolled back as a unit. It is also useful to be able to be able to accurately track exactly what tasks have been applied to a given workflow, which is inherently useful for correct status tracking (and is directly tied to how resumption is done).

How it will be addressed

Tasks which previously were very unorganized in the run_instance path of nova will need to be refactored into clearly defined tasks (likely with an apply() method and a rollback() method). These tasks will be split up so that each task performs a clear single piece of work in an atomic manner (aka not one big task that does many different things) where possible. This will also help make testing of said task easier (since it will have a clear set of inputs and a clear set of expected outputs/side-effects, of which the rollback() method should undo).

For example this could be a task/state baseclass:

class State(base.Base):
    __metaclass__ = abc.ABCMeta

    def __init__(self):
        super(State, self).__init__()

    def __str__(self):
        return "State: %s" % (self.__class__.__name__)

    @abc.abstractmethod
    def apply(self, context, *args, **kwargs):
        raise NotImplementedError()

    def revert(self, context, result, chain, excp, cause):
        pass

Combining 'atomic tasks into a workflow

Why it matters
How it will be addressed
class StateChain(object):
    def __init__(self, name, tolerant=False, parents=None):
        self.reversions = []
        self.name = name
        self.tolerant = tolerant
        self.states = OrderedDict()
        self.results = OrderedDict()
        self.parents = parents
        self.result_fetcher = None
        self.change_tracker = None
        self.listeners = []

    def __setitem__(self, name, performer):
        self.states[name] = performer

    def __getitem__(self, name):
        return self.results[name]

    def run(self, context, *args, **kwargs):
        for (name, performer) in self.states.items():
            try:
                self._on_state_start(context, performer, name)
                # See if we have already ran this... (resumption!)
                result = None
                if self.result_fetcher:
                    result = self.result_fetcher(context, name, self)
                if result is None:
                    result = performer.apply(context, *args, **kwargs)
                # Keep a pristine copy of the result in the results table
                # so that if said result is altered by other further states
                # the one here will not be. 
                #
                # Note: python is by reference objects, so someone else could screw with this,
                # which would be bad if we need to rollback and a result we created was modified by someone else...
                self.results[name] = copy.deepcopy(result)
                self._on_state_finish(context, performer, name, result)
            except Exception as ex:
                with excutils.save_and_reraise_exception():
                    try:
                        self._on_state_error(context, name, ex)
                    except:
                        pass
                    cause = (name, performer, (args, kwargs))
                    self.rollback(context, name, self, ex, cause)
        return self

    def _on_state_error(self, context, name, ex):
        if self.change_tracker:
            self.change_tracker(context, ERRORED, name, self)
        for i in self.listeners:
            i.notify(context, ERRORED, name, self, error=ex)

    def _on_state_start(self, context, performer, name):
        if self.change_tracker:
            self.change_tracker(context, STARTING, name, self)
        for i in self.listeners:
            i.notify(context, STARTING, name, self)

    def _on_state_finish(self, context, performer, name, result):
        # If a future state fails we need to ensure that we
        # revert the one we just finished.
        self.reversions.append((name, performer))
        if self.change_tracker:
            self.change_tracker(context, COMPLETED, name, self,
                                result=result.to_dict())
        for i in self.listeners:
            i.notify(context, COMPLETED, name, self, result=result)

    def rollback(self, context, name, chain=None, ex=None, cause=None):
        if chain is None:
            chain = self
        for (i, (name, performer)) in enumerate(reversed(self.reversions)):
            try:
                performer.revert(context, self.results[name], chain, ex, cause)
            except excp.NovaException:
                # Ex: WARN: Failed rolling back stage 1 (validate_request) of
                # chain validation due to nova exception
                # WARN: Failed rolling back stage 2 (create_db_entry) of
                # chain init_db_entry due to nova exception
                msg = _("Failed rolling back stage %s (%s)"
                        " of chain %s due to nova exception.")
                LOG.warn(msg, (i + 1), performer.name, self.name)
                if not self.tolerant:
                    # This will log a msg AND re-raise the Nova exception if
                    # the chain does not tolerate exceptions
                    raise
            except Exception:
                # Ex: WARN: Failed rolling back stage 1 (validate_request) of
                # chain validation due to unknown exception
                # WARN: Failed rolling back stage 2 (create_db_entry) of
                # chain init_db_entry due to unknown exception
                msg = _("Failed rolling back stage %s (%s)"
                        " of chain %s, due to unknown exception.")
                LOG.warn(msg, (i + 1), performer.name, self.name)
                if not self.tolerant:
                    # Log a msg AND re-raise the generic Exception if the
                    # Chain does not tolerate exceptions
                    raise
        if self.parents:
            # Rollback any parents chains
            for p in self.parents:
                p.rollback(context, name, chain, ex, cause)

Task resumption

Why it matters
How it will be addressed

Task rollback

Why it matters
How it will be addressed

Task tracking

Why it matters
How it will be addressed

Resource locking

Why it matters
How it will be addressed

Workflow sharding/ownership

Why it matters
How it will be addressed

Simplicity

Why it matters
How it will be addressed

Tolerant to upgrades

Why it matters
How it will be addressed