Jump to: navigation, search

Monasca/Transform

NOTICE: Monasca Transform has been deprecated

While Monasca Transform has been a useful utility, its user base has declined and engineers for maintenance have been pulled in other directions. To make this clear, the monasca-transform repository has been marked as deprecated and content removed from the Wallaby release. If you still need a release of Monasca Transform, use the Ussuri or older branch.

Reference the commit to clear the content for Wallaby release: https://opendev.org/openstack/monasca-transform/commit/811acd76c9029e175c0a43302386bc7732fcc48e

If you find Monasca Transform to be useful and have a desire to take on maintenance, please post a message to the #openstack-monasca IRC channel, as described on http://eavesdrop.openstack.org/#Monasca_Team_Meeting .

The following content is for historical purposes and describes some of the original motivation and development of Monasca Transform.


monasca-transform: A transformation and aggregation engine for Monasca

Original blueprint - https://blueprints.launchpad.net/monasca/+spec/monasca-transform Original Proposal - Monasca/Transform-proposal

  • monasca-transform is a data driven aggregation engine which collects, groups and aggregates existing individual Monasca metrics according to business requirements and publishes new transformed (derived) metrics to the Monasca Kafka queue.
  • Since the new transformed metrics are published as any other metric in Monasca, alarms can be set and triggered on the transformed metric.

Problem description

There are several use cases which can be addressed by transforming, aggregating and publishing new metrics in Monasca:

  • Aggregate individual metrics

Metrics that are currently available on a per host or per VM basis, for example mem.total_mb or vm.mem.used_mb, might have to be added up for all hosts or VMs so that cloud operator can know the the total memory available across all hosts and total memory being allocated to all VMs for the entire cloud.

  • Combine multiple metrics to derive a new metric

There might be a need to derive metrics from one or more metrics, for example cpu.total_logical_cores and cpu.idle_perc are two metrics for a host which can be used to derive a new metric which will give total utilized CPUs. Also these per host utilized CPUs can be further summed up to find the number of CPUs being utilized across the entire cloud.

  • Find the rate at which a metric changed over a time range

It might be necessary to calculate the rate at which a metric value is changing over time. For example, to find the rate at which swift disk capacity is increasing over a time range, one would have to find the value of the disk usage at the start of the time interval (say start of the hour) then find the disk usage at the end of the time interval (say end of the hour) in order to determine the rate at which the disk usage increased or decreased during that hour.

  • Deriving metrics using complex business logic

There might be a need to apply complex business logic on a set of metrics and derive a new metric. For example consider the periodic heart beat metrics which come from a VM indicating what state the VM was in. One might have to look at how the state of the VM changed over the target time interval and then use some business logic to come up with a new metric which would indicate the amount of time that the VM was in a usable state.

How Monasca Transform Operates

The monasca-transform component framework fits into the microservices architecture of Monasca. At its core, it collects a set of metrics from Apache Kafka[1] over a configurable time period, then using grouping operations, runs a series of transformation (or aggregation) operations which can consist of simple mathematical operations like sum/max/min/avg or complex operations such as iterating through group using some business rules. The resulting transformed metric or set of transformed metrics are published back to Apache Kafka.

Some design considerations that were essential based on past experience when building a similar aggregation framework to process vast amounts of raw log records from public cloud services:

  • Scalable, Fault Tolerant and Highly Available

Processing of metrics has to be scalable since we will have to collect, aggregate and publish vast amounts of metrics. The processing of data should be able to withstand any hardware failures such as CPU, memory or disk errors. The monasca-transform process is designed to be Highly Available. In case the monasca-transform daemon process goes down on a server because of any software or hardware problem the framework will start processing metrics on a another server.

  • Data driven transformations

The transformation framework has to be data driven and it should be possible to add new transformations for any new metrics by configuration changes.

  • Reusable components and reusable transformation routines

The transformation requirements of various metrics can be similar, so once a transformation routine has been implemented it should be possible to re-use the same transformation components and routines for a different set of metrics.

  • Easy to add components

Adding new transformation components should be easy and straightforward.

  • Flexible to change and adapt

Over a period of time business requirements and needs can change. To meet those requirements new transformation components will have to be developed and added or existing ones updated. Monasca Transform makes it possible to add and update new components with minimum effort.

  • Unit Testing

Though the transformation routines can be complex, there are unit tests for each individual transformation component to be confident of the accuracy of the component.

Considered Alternatives

To avoid re-inventing the wheel it is desirable to use a framework or tool that provides the ability to process data across a cluster of commodity machines and that features work distribution, fault tolerance and the capability to seamlessly withstand node failures.

Hadoop with MapReduce/Yarn satisfies these requirements but requiring some kind of stable storage, HDFS and relying on temporary files to save intermediate processing states, they can be slow. They also require a number of components to be configured, installed, monitored and maintained across a cluster which is often a challenging and daunting task. Also it's highly likely that other associated tools like Hive (for sql-like processing of data) and Pig (which would faciliate the expression map reduce jobs as high level constructs) would be entailed. Using additional tools adds to the resource utilization burden as well as increasing development time.

Spark allows jobs to be expressed in Python consequently siting well with the openstack toolset. It has demonstrated much higher processing rates than Hadoop based tools yet allows existing domain knowledge of mapreduce type tools to be re-purposed. Spark is gaining momentum and adoption generally and this upward trajectory can be capitalised upon. It also provides support for streaming data, again meaning that we can concentrate on the task at hand rather than plumbing. For these reasons it has been selected to provide the processing framework.

There is a non-OpenStack project for those who want a light weight aggregation engine written in Go - monasca-aggregator. monasca-aggregator does not have feature parity with monasca-transform and does not have unit tests in place, but is open under the Apache License 2.0 for anyone who wishes to contribute.

Architecture

monasca-transform is a data driven data processing framework implemented in python and based on Apache Spark[2]. Apache Spark[2] is a highly scalable, fast, in-memory, fault tolerant and parallel data processing framework. All monasca-transform components are implemented in Python and use Spark's Python API to interact with Spark.

To process data, monasca-transform components make use of Spark's transformation operations like groupByKey(), filter(), map(), reduceByKey() on Spark's Resilient Distributed Datasets (also commonly referred to as RDDs). RDD is Spark's basic abstraction for distributed memory.

monasca-transform uses the Spark Steaming direct API to retrieve Monasca metrics from the Kafka queue. monasca-transform uses Spark Data Frames for processing whenever possible. Spark Data Frames use a new query planner and analyzer called Catalyst and is more efficient and optimized than comparable RDD operations.

Data Frames provide a more SQL-like data manipulation capability which also makes it easy to use from a development point of view. It is possible to convert from a Spark RDD to a Data Frame and vice versa. Also, Spark provides a common equivalent API for manipulating both an RDD as well as a Data Frame.

In addition to Apache Spark, monasca-transform also relies on MySQL database tables to store processing and configuration information:

  • processing information: Kafka offsets from Spark batch, used to resume processing after a failure or a restart
  • configuration information: Runtime parameters and transformation specifications, used to drive transformation and aggregation of metrics

It is possible to run multiple monasca-transform processes on multiple nodes. Multiple monasca-transform processes will use zookeeper's leader election capability to elect a master. In case a node on which monasca-transform process is running as a master goes down, another monasca-transform process running on an another node will be elected a leader and start processing data.

Here are the components of monasca-transform:

ArchitectureComponents.jpg

For an Enterprise deployment, it is recommended to deploy Spark cluster in standalone cluster mode alongside other Monasca components. In the minimal configuration, say for example in a devstack deployment, both master and worker can be deployed on a single node. Spark can be also deployed on multiple nodes, with multiple masters and workers running on each of the nodes.

Logical processing data flow

The logical data flow within monasca-transform can be broadly divided into two distinct flows, namely conversion of input metrics into a record store format and transforming of the record store data using series of generic transformation components to derive transformed metric data.

Conversion to record store format

LogicalEventBasedFlow1.jpg

Conversion of input metrics into the record store format consists of following steps:

Identification: Identify and filter out unnecessary metrics. This is done by comparing input metric names with a list of metrics to be transformed in the configuration datastore. If an input metric is not recognized, the metric is ignored in further processing.

Validation and Field Extraction: Input data is validated by checking for presence of expected fields in the metric JSON (expected fields are retrieved from the configuration datastore). Assuming the format is valid, the relevant fields are extracted.

Record Generator: Extracted data is then used to generate one or more new internal processing metrics and the associated metadata, this is referred to as Record Store Data. Multiple internal metrics can be generated in the case where the input raw metric has to be aggregated in more than one way.

An advantage of converting the input metrics into internal record store data is that the rest of the processing does not depend on the input metric format. In the future if there is a need to transform and aggregate data coming in new format, new code will have to written to convert the new data format into record store, but the rest of the processing pipeline will be unchanged.

Generic transformation (aggregation) to derive and publish new metrics

LogicalEventBasedFlow2.jpg

This is the second half of the processing data flow. Record store data is routed to the appropriate processing component based on internal metric type and specifications in the configuration datastore. Additional processing is then performed. Processing includes grouping the data in the record store with various user-defined parameters, setting of the final transformed metric name, and setting appropriate dimensions in the new transformed metric. All these operations are driven from configuration parameters. The resulting new transformed metrics are then published back to Apache Kafka.

Data model design

DatastoreSchema.jpg

pre_transform_specifications and transform_specifications tables are used in conversion of input metrics into record store format and conversion of record store data into final transformed metrics to be published to Apache Kafka.

kafka_offsets table stores the offset information retrieved by Spark streaming direct Kafka API, when it starts processing a batch. Only if the processing data in a batch succeeds, the latest offset information is updated in kafka_offset table. The last saved offset information is used by the Spark driver on initialization to start processing from where it left off.


  • pre_transform_specifications table
    • event_type: name of the metric to process
    • pre_transform_spec: Specification in JSON format to aid in processing of data to record store format. The JSON specification consists of following fields:
      • event_type: name of the metric to
      • event_processing_params: key-value pairs dictionary of parameters to aid in conversion of input metric to record store format. Examples include set_default_region
    • intermediate_id_list: list of internal metrics, represented by their identifiers, that the input metric should be converted into. During record store generation script multiple internal metrics will be generated from the same input metric. Each internal metric can be transformed with a different processing pipeline.
    • required_raw_fields_list: list of JSON path strings, that will be used during validation to check if they exist and are not empty.
    • service_id: service identifier, identifies which service this input metric belongs to.
  • transform_specs table
    • metric_id: Identifier representing internal processing metric
    • transform_spec: Specification in JSON format which defines the processing pipeline and runtime parameters to be used by processing pipeline components. The JSON specification consists of following fields:
      • aggregation_params_map: key-value pairs which represent the processing pipeline and runtime parameters.
      • aggregation_pipeline: JSON representation of processing pipeline specification and any run time parameters used by components in processing pipeline e.g. aggregation_group_by_list
  • kafka_offsets table
    • topic: Kafka topic for which this offsets are being collected and stored
    • until_offset: end of the offset range
    • from_offset: start of the offset range
    • app_name: name of the application, on behalf of which these offsets are being stored.
    • partition: partition identifier

Design of generic transformation components

Generic transformation components will be reusable components that can be assembled into a complex transformation pipeline. Each generic component will be implemented as a python class and can invoke multiple Spark transformations. Generic components of a same type will implement functions which take standard set of arguments and return a standard Spark RDD. Source, Usage, Setter and Insert are few generic component types.

All usage components will implement a python function called 'usage' which takes 'transform_context' and 'record_store_dataframe' as arguments and returns an 'instance_usage_dataframe'. Similarly all setter components will implement a function called 'setter' which takes 'transform_context' and 'instance_usage_dataframe' as arguments and returns a modified 'instance_usage_dataframe. Component interfaces are standardized by input and output type to allow plug-ability.

Generic transformation components are extensible and new components can be added when necessary. For example if there is a need for some transformation routine to lookup a value by making a REST API call, a new setter component can be developed which implements the standard "setter" function interface. Since the new setter component adheres to the Setter component interface, it is possible to add this new component after or before any previous setter component. Complex transformation pipelines can be built by using series of these pluggable components via the transformation specification.

Component Type Component Short Name Component Description Arguments Returns
Usage fetch_quantity Groups record_store_data_frame record represented by intermediate metric id by usage_group_by_list, and then performs operation indicated by usage_fetch_operation parameter. transform_context, record_store_data_frame instance_usage_data_frame
  fetch_quantity_util Groups record_store_data_frame records for quantity and utilization percentage by usage_group_by_list and usage_fetch operation to find the utilized quantity. transform_context, record_store_data_frame instance_usage_data_frame
Setter rollup_quantity Groups instance_usage_data_frame data, by setter_group_by_list parameter and then performs operation as indicated by setter_fetch_operation parameter transform_context, instance_usage_data_frame instance_usage_data_frame
  set_aggregated_metric_name Iterate through instance_usage_data_frame data and set metric name, indicated by aggregation_uom parameter transform_context, instance_usage_data_frame instance_usage_data_frame
Insert insert_data Prepares final final transformed metric json by setting dimensions as indicated by setter_dimension_list parameter and publishes the data to Kafka "metrics" topic transform_context, instance_usage_data_frame instance_usage_data_frame

Generic transform components are easy to write and maintain. Besides being re-usable, these generic transform components can be combined to create complex transformation routines or pipelines. Unit tests in python can be written to test each component individually and also to test a complex transformation routines.

Example transformation of input metric

Following table lists transformation steps for a input metric

Step Component Description Spark Transformation Notes
1 Retrieve Input Metric
('583d49ab6aca482da653dd838deaf538load.avg_5_minhostnamedevstack',
'{"metric":{"name":"mem.total_mb",
            "dimensions":{"service":"monitoring",
                          "hostname":"mini-mon"
                         },
            "timestamp":1453308000000,
            "value":5969.0},
  "meta":{"tenantId":"583d49ab6aca482da653dd838deaf538",
          "region":"useast"},
  "creation_time":1453308005}')
Incoming Monasca metric tuple pulled from Kafka --  
2 Call Input Metric to Record Store Converter Converts incoming metric into a record store dataframe
//pseudo code
KafkaUtils.createDirectStream //Spark Streaming API get discretized stream from kafka
DStream.transform() //to pull offsets
raw_metrics_df = inputDStreamRDD.map()  //to extract the JSON metric from the tuple, and convert to dataframe
pre_transform_df = pre_transform_specs.jsonRDD() //convert to pre_transform_specs into data frame
filtered_metrics_df = raw_metrics_df.join(pre_transform_df).where(event_type=metric_name) //join raw data with pre transform specs df to filter out unwanted raw metrics
validated_df = filtered_metrics_df.validate() //validate metrics, check for required fields from pre_transform_spec for this event type
record_store_df = validated_df.explode() //generate multiple records for each intermediate metrics from pre_transform spec's intermediate_metric_id_list
Pulls kafka offset information from Kafka DStream, filters, validates and generates records in record store dataframe format
3 Process Metrics in record store data Finds the intermediate metrics from the record store dataframe, and for each metric finds the transformation pipeline from transform specs and invokes each component represented by aggregation_pipeline parameter in the transform spec -- Creates a transform_context object, which is contains Kafka offset information, and any configuration properties
4 Call fetch quantity usage component fetch_quantity (latest) finds the latest quantity for an hour
fetch_latest(transform_context, record_store_data) {
   
    //pseudo code
                    
    grouped_sorted_df = record_store_df.groupBy("host", "event_date", "event_hour") and sort group by timestamp
    latest_grouped_sorted_df = grouped_sorted_df: get latest in each group
    instance_usage_df = convert latest_grouped_sorted_df to instance_usage_df
    return instance_usage_df
}
 
5 Call Set Aggregated Metric Name Component Sets final aggregated metric name
set_aggregated_metric_metric_name(transform_context, instance_usage_df) {
    //pseudo code
    instance_usage_df = instance_usage_df.map(transform_context.transform_spec.aggregated_uom)
    return instance_usage_df
}
||  
6 Call Insert Kafka Component Converts instance_usage_df to metrics json and publish to Kafka
insert_data(transform_context, instance_usage_df) {
    //pseudo code
    //convert to json and set proper dimensions
    instance_usage_json_list = convert_to_metrics_json(instance_usage_df,
                                                  transform_context.transform_specs.aggregaton_dimensions)
    kafka_publisher.publish(instance_usage_json_list) //publish metrics to kafka
}
 

Impact of deploying Monasca Transform

Security impact

monasca-transform uses Spark Streaming to connect to the "metrics" topic in Kafka to retrieve metrics for processing. monasca-transform writes transformed metrics back to Kafka on behalf of the tenant which can be set in a configuration file. The tenant should have appropriate monasca-admin role so that the data gets persisted in the Monasca datastore. The Spark Web user interface and connections between master and worker nodes can be secured using ACL and encryption[4]. Connection to Kafka and encryption is not yet supported, but work is currently underway to provide such a support in Kafka[4].

Performance/Scalability Impacts

  • Since new transformed metrics will be written back to the "metrics" topic in Kafka, it will have some impact in terms of increasing the metrics to be persisted. This should not have any significant impact as transformed/aggregated metrics would necessarily be significantly fewer in number compared to the overall number of incoming metrics being persisted.
  • Given that transformed metrics will be significantly fewer in number they should not have any appreciable impact on data retention policy.
  • Spark master and worker has to be configured and installed on nodes where monasca-transform is deployed (most likely alongside other Monasca components though it's not necessary) and thus has to be allocated resources like CPU and Memory for the Spark master, worker and executor processes. Also, monasca-transform python components have to be configured and deployed. Deploying Spark and monasca-transform will have some impact in terms of increasing load on nodes where it runs. This impact will have to be evaluated.
  • In terms of scalability, Spark worker nodes can be scaled horizontally across additional nodes in case the amount of metrics that have to be processed increases.

Other deployer impact

monasca-transform will have to be configured via configuration file(s). Configuration options include pointers to Zookeeper, batching interval used by Spark Streaming, tenant id on behalf of which metrics will be submitted to Kafka and Spark Master information so that the monasca-transform can submit jobs to Spark cluster. There will also be configuration options to access the MySQL database to pull in information from the driver tables.

In addition, the deployer which installs monasca-transform will have to install start/stop scripts for the Spark worker, Spark master and monasca-transform processes. The deployer will also have to insert pre-transformation specifications and transformation specifications in the MySQL driver tables.

monasca-transform will be an optional component in Monasca and users can elect to either install it or not. A devstack plugin is being authored which will provide the means to install monasca-transform and Spark and also a tempest test. This will enable verification that monasca-transform is working. This can be used in CI/CD processing.

Developer impact

Developers who want to write their own transformation routines for aggregating new metrics will have to write new pre-transformation specifications in the transformation specifications JSON and add them to driver tables. To add new transformation components, the developers have to write python modules which implement the appropriate interface such as usage, setter, insert, etc. and use Spark transformations to manipulate and/or write data back to Kafka. New components will also have to be added to the setup.cfg so that new components can be loaded and utilized by monasca-transform.

Dependencies

  • Apache Spark 1.6.3 (originally developed against 1.6.0)
  • Zookeeper
  • Relational database - tested with MariaDB 10.2
  • Other Monasca services, including Kafka as used in monasca-common

Testing

Python unit tests are written to test each component of the overall transformation pipeline individually, as well as to test a part of or whole transformation pipeline. A devstack plugin was developed to install monasca-transform and Spark, allowing a simple way to try out monasca-transform.

References

1 Apache Kafka [[1]]

2 Apache Spark [[2]]

3 Spark Security [[3]]

4 Kafka Security [[4]]