Jump to: navigation, search

Difference between revisions of "StructuredStateManagementDetails"

(Resource locking and single workflow ownership)
(Resource locking and single workflow ownership)
Line 209: Line 209:
  
 
==== How it will be addressed ====
 
==== How it will be addressed ====
 +
 +
Zookeeper provides distributed locking in a tested and easy to use package, we should use its distributed locking mechanism to correctly and consistently lock resources and lock workflows that are being worked on.
  
 
=== Simplicity ===
 
=== Simplicity ===

Revision as of 00:33, 27 April 2013

Details

In order to implement of this new orchestration layer the following key concepts must be built into the design from the start.

  1. Atomic task units.
  2. Combining atomic task units into a workflow.
  3. Task tracking and resumption.
  4. Resource locking and single workflow ownership.
  5. Simplicity (allowing for extension and verifiability).
  6. High availability.
  7. Tolerance to upgrades.
  8. Task cancellation (nice to have)

Atomic task units

Why it matters

Tasks that are created (either via code or other operation) must be atomic so that the task as a unit can be said to have been applied/completed or the task as a unit can be said to have failed (or cancelled). This allows for said task to be rolled back as a single unit. Typically since a workflow (for example composed of stages [1, 2, 3]) may fail at any stage, for example 3 there needs to be a clearly defined path to rollback 2 and 1 (in that order). This task rollback concept is required to cleanly and correctly undo actions that happen after a failure (or due to cancellation for example, which is not necessarily caused by a failure in the traditional sense).

How it will be addressed

States and state transition which previously were in the run_instance path (and other paths eventually) of nova will need to be refactored into clearly defined tasks (likely with an apply() method and a rollback() method as shown below). These tasks will be split up so that each task performs a clearly defined and easily understandable single piece of work in an atomic manner (aka not one big task that does many different things) wherever possible. Note: that this will also help make testing of said task easier (since it will have a clear set of inputs and a clear set of expected outputs/side-effects, of which the rollback() method should undo), something which is not possible without this kind of task refactoring work. Also note that if downstream services (quantum for example) work in asynchronous manner that can not be cancelled then rollbacks will be much harder to correctly accomplish. It appears most of the downstream services like libvirt, nova-network and similar do provide acquire/release semantics (or a analogous mechanism), which is needed to correctly perform rollback (and cancellation).

For example this is the task base class created in the prototype:

class Task(base.Base):
    __metaclass__ = abc.ABCMeta

    def __str__(self):
        return "Task: %s" % (self.__class__.__name__)

    @abc.abstractmethod
    def apply(self, context, *args, **kwargs):
        # Do whatever is needed to fulfill this task...
        #
        # Return a useful object for others to use, this object
        # must be serializable so that it can be given back in-case of reversions.
        raise NotImplementedError()

    # Gets any context information (nova context) and the result
    # that this function returned when its apply succeeded (if it succeeded)
    # so that said task can examine what it produced and undo whatever it 
    # produced. The chain which caused this exception is also provided as well
    # as the exception itself and the state which causes this reversion.
    def revert(self, context, result, chain, excp, cause):
        pass

Combining atomic tasks into a workflow

Why it matters

A workflow in nova can be typically organized into a series of smaller tasks/states which can be applied as a single unit (and rolled back as a single unit). Since we need to be able to run individual tasks in a trackable manner (and we shouldn't duplicate this code in multiple places) a concept of a chain of tasks (for the mostly linear nova case) is needed to be able to run and rollback that set of tasks as a group in a easy to use manner (which makes the code easier to read as well!).

How it will be addressed

The following was created to address the issue of running (and rolling back) multiple tasks as a single unit.

Note: to start a linear unit is fine.

This is the 'chain' class created in the prototype:

class WorkflowChain(object):
    def __init__(self, name, tolerant=False, parents=None):
        # What tasks I have done that may need to be undone on failure.
        self.reversions = []
        self.name = name
        self.tolerant = tolerant
        # The order in which tasks will run is controlled by this - note its only linear.
        self.tasks = OrderedDict()
        self.results = OrderedDict()  # Useful for accessing the result of a given tasks after running...
        # A parent set of tasks that will need to be rolled back if this one fails.
        self.parents = parents
        # This functor is needed to be able to fetch results for tasks which have already occurred.
        #
        # It is required since we need to be able to rollback said tasks if it has already completed elsewhere
        # and one of the requirements we have put in place is that one rollback a task is given the result 
        # it returned when its apply method was called. 
        self.result_fetcher = None
        # This is a default tracker + other listeners which will always be notified when tasks start/complete/error. 
        #
        # They are useful for altering some external system that should be notified when a state changes.
        self.change_tracker = None
        self.listeners = []

    def __setitem__(self, name, performer):
        self.tasks[name] = performer

    def __getitem__(self, name):
        return self.results[name]

    def run(self, context, *args, **kwargs):
        for (name, performer) in self.tasks.items():
            try:
                self._on_state_start(context, performer, name)
                # See if we have already ran this... (resumption!)
                result = None
                if self.result_fetcher:
                    result = self.result_fetcher(context, name, self)
                if result is None:
                    result = performer.apply(context, *args, **kwargs)
                # Keep a pristine copy of the result in the results table
                # so that if said result is altered by other further states
                # the one here will not be. 
                #
                # Note: python is by reference objects, so someone else could screw with this,
                # which would be bad if we need to rollback and a result we created was modified by someone else...
                self.results[name] = copy.deepcopy(result)
                self._on_state_finish(context, performer, name, result)
            except Exception as ex:
                with excutils.save_and_reraise_exception():
                    try:
                        self._on_state_error(context, name, ex)
                    except:
                        pass
                    cause = (name, performer, (args, kwargs))
                    self.rollback(context, name, self, ex, cause)
        return self

    def _on_state_error(self, context, name, ex):
        if self.change_tracker:
            self.change_tracker(context, ERRORED, name, self)
        for i in self.listeners:
            i.notify(context, ERRORED, name, self, error=ex)

    def _on_state_start(self, context, performer, name):
        if self.change_tracker:
            self.change_tracker(context, STARTING, name, self)
        for i in self.listeners:
            i.notify(context, STARTING, name, self)

    def _on_state_finish(self, context, performer, name, result):
        # If a future state fails we need to ensure that we
        # revert the one we just finished.
        self.reversions.append((name, performer))
        if self.change_tracker:
            self.change_tracker(context, COMPLETED, name, self,
                                             result=result.to_dict())
        for i in self.listeners:
            i.notify(context, COMPLETED, name, self, result=result)

    def rollback(self, context, name, chain=None, ex=None, cause=None):
        if chain is None:
            chain = self
        for (i, (name, performer)) in enumerate(reversed(self.reversions)):
            try:
                performer.revert(context, self.results[name], chain, ex, cause)
            except excp.NovaException:
                # Ex: WARN: Failed rolling back stage 1 (validate_request) of
                # chain validation due to nova exception
                # WARN: Failed rolling back stage 2 (create_db_entry) of
                # chain init_db_entry due to nova exception
                msg = _("Failed rolling back stage %s (%s)"
                        " of chain %s due to nova exception.")
                LOG.warn(msg, (i + 1), performer.name, self.name)
                if not self.tolerant:
                    # This will log a msg AND re-raise the Nova exception if
                    # the chain does not tolerate exceptions
                    raise
            except Exception:
                # Ex: WARN: Failed rolling back stage 1 (validate_request) of
                # chain validation due to unknown exception
                # Ex. WARN: Failed rolling back stage 2 (create_db_entry) of
                # chain init_db_entry due to unknown exception
                msg = _("Failed rolling back stage %s (%s)"
                        " of chain %s, due to unknown exception.")
                LOG.warn(msg, (i + 1), performer.name, self.name)
                if not self.tolerant:
                    # Log a msg AND re-raise the generic exception if the
                    # chain does not tolerate exceptions in the first place.
                    raise
        if self.parents:
            # Rollback any parents chains
            for p in self.parents:
                p.rollback(context, name, chain, ex, cause)

Task tracking & resumption

Why it matters

Key to the ability to horizontal scaling your orchestration units is the ability to be able to resume the work of a failed orchestration unit on another orchestration unit. This makes it so that these orchestration units can survive individual program failure and not put the tasks they were working on into a ERROR like state, which should not be needed in a transactional workflow oriented system. This resumption & its associated tracking also provides a useful bonus feature where-in it becomes very easy to audit the tasks and workflows that a given resource/request has gone through in the lifetime of that request. This has many varying usages and creates potentially new interesting features for later.

How it will be addressed

To start a task log will need to be created and when each task is executed that log will need to be updated accordingly. Note that it is not enough to just store the name of the task that completed but enough metadata must also be stored with that log entry to be able to correctly revert that task. Typically in nova there is a concept of a resource that is being modified by each task, at the end of each task we can store the resource (which will pick up the modifications of that resource automatically). This task log and associated metadata can be stored in different ways. For example a database could be used, or something less durable could be used as well, since once a workflow completes there may not be a need to keep around the task log, of course a database (or something similarly durable, for example zookeeper persistent nodes) would allow for workflow auditing but for those who do not care about auditing they could use something like redis which has complex object non-durable expiry-based storage.

Issues: the admin_password entry is a part of this resource object, it needs to be stored securely (if we store it at all in the task log metadata).

To accomplish resumption there needs to be a concept of workflow ownership and way to notice when a orchestration unit has failed and be able to release the ownership of that workflow so that other orchestration units can take over said ownership. Both of these problems can be easily solved by zookeeper, a battle-hardened program that is designed for ownership and triggering tasks, of which workflow ownership and releasing that ownership are prime examples of. The way to understand this is to notice that ownership of a workflow is a lock on an object (this object being the workflow) and zookeeper has the ability to notify others when said lock is lost (either due to failure or to cancellation) via a concept of watches, these two general concepts allow for resumption in a distributed and highly scalable manner. Zookeper even provides a few recipes which help in making this possible, see the kazoo recipe source code for a few examples (the locking queue concept is very very useful).

Example code to try this concept: https://github.com/harlowja/zkplayground

See: http://research.yahoo.com/node/1849 and http://research.yahoo.com/pub/3280

Other users of zookeeper: https://cwiki.apache.org/confluence/display/ZOOKEEPER/PoweredBy

Homepage: https://zookeeper.apache.org/

Note: that it is likely extermely/very hard (or would require a lot of hoop jumping) to correctly have these semantics with a plain ole database (aka mysql), since databases do not provide watches/triggering or distributed locking (especially when replicated, sharded or clustered). Without these semantics it is likely not easily (or cleanly) possible to have automatic resumption capabilities without some concept of periodic table scanning tasks (which by-itself does not address the distributed locking acquisition/releasing semantics).

Resource locking and single workflow ownership

Why it matters

In order to ensure consistent resource state and consistent workflow state to start we need to be able to lock the resource (think instance) that we are working on and also lock the workflow that is being worked on. This ensures that only a single entity is allowed to process said workflow and resource at the same time, which will avoid state and resource inconsistencies and race conditions.

Note: in the future it might be possible to work on the same resource in parallel as long as the workflows do not having tasks which have conflicting goals.

How it will be addressed

Zookeeper provides distributed locking in a tested and easy to use package, we should use its distributed locking mechanism to correctly and consistently lock resources and lock workflows that are being worked on.

Simplicity

Why it matters

How it will be addressed

High availability

Why it matters

How it will be addressed

Tolerant to upgrades

Why it matters

How it will be addressed

Cancellation

Why it matters

How it will be addressed