Source code for standard_lib.cim_tests.data_model_handler

#
# Copyright 2021 Splunk Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# -*- coding: utf-8 -*-
"""
Provides Data Model handling functionalities. Such as

* Parse all the data model JSON files
* Get Mapped data model for an eventtype 
"""
import os
import logging

import json
from . import DataModel
from . import JSONSchema

LOGGER = logging.getLogger("pytest-splunk-addon")


[docs]class DataModelHandler(object): """ Provides Data Model handling functionalities. Such as * Parse all the data model JSON files * Get Mapped data model for an eventtype Args: data_model_path (str): path to the data model JSON files """ def __init__(self, data_model_path): self.data_model_path = data_model_path self._data_models = None @property def data_models(self): if not self._data_models: self._data_models = list(self.load_data_models(self.data_model_path)) return self._data_models def _get_all_tags_per_stanza(self, addon_parser): """ Get list of all tags mapped with single stanza in tags.conf Args: addon_parser (addon_parser.AddonParser): Object of Addon_parser Returns: tags mapped with stanzas in tags.conf { stanza_name: [List of tags mapped to the stanza] } """ tag_stanzas = {} for each_tag in addon_parser.get_tags(): stanza_name = each_tag["stanza"] tags = each_tag["tag"] tag_stanzas.setdefault(stanza_name, []).append(tags) return tag_stanzas
[docs] def load_data_models(self, data_model_path): """ Parse all the data model JSON files one by one Yields: (cim_tests.data_model.DataModel): parsed data model object """ # Parse each fields and load data models json_list = [ each for each in os.listdir(data_model_path) if each.endswith(".json") ] for each_json in json_list: yield DataModel( JSONSchema.parse_data_model(os.path.join(data_model_path, each_json)) )
[docs] def get_mapped_data_models(self, addon_parser): """ Get list of eventtypes mapped with Data-Sets. The reason addon_parser is an argument & not attribute of the class is that, the loaded handler should be used with multiple addons. Args: addon_parser (addon_parser.AddonParser): Object of Addon_parser Yields: tag stanza mapped with list of data sets "eventtype=sample", DataSet(performance) """ tags_in_each_stanza = self._get_all_tags_per_stanza(addon_parser) for eventtype, tags in tags_in_each_stanza.items(): is_mapped_datasets = False for each_data_model in self.data_models: mapped_datasets = list(each_data_model.get_mapped_datasets(tags)) if mapped_datasets: is_mapped_datasets = True LOGGER.info( "Data Model=%s mapped for %s", each_data_model, eventtype ) for each_mapped_dataset in mapped_datasets: yield eventtype, each_mapped_dataset if not is_mapped_datasets: LOGGER.info("No Data Model mapped for %s", eventtype)