Source code for standard_lib.fields_tests.test_generator

#
# Copyright 2021 Splunk Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# -*- coding: utf-8 -*-
"""
Module include class to generate the test cases
to test the knowledge objects of an Add-on.
"""
import pytest
import logging
from itertools import chain

from ..addon_parser import AddonParser
from . import FieldBank
from ..utilities import xml_event_parser


LOGGER = logging.getLogger("pytest-splunk-addon")


[docs]class FieldTestGenerator(object): """ Generates test cases to test the knowledge objects of an Add-on. * Provides the pytest parameters to the test templates. * Supports field_bank: List of fields with patterns and expected values which should be tested for the Add-on. Args: app_path (str): Path of the app package field_bank (str): Path of the fields Json file """ def __init__(self, app_path, tokenized_events, field_bank=None): LOGGER.debug("initializing AddonParser to parse the app") self.app_path = app_path self.addon_parser = AddonParser(self.app_path) self.tokenized_events = tokenized_events self.field_bank = field_bank
[docs] def generate_tests(self, fixture): """ Generate the test cases based on the fixture provided supported fixtures: * splunk_searchtime_fields_positive * splunk_searchtime_fields_negative * splunk_searchtime_fields_tags * splunk_searchtime_fields_eventtypes * splunk_searchtime_fields_savedsearches * splunk_searchtime_fields_requirements Args: fixture(str): fixture name sample_generator(SampleGenerator): sample objects generator store_events(bool): variable to define if events should be stored """ if fixture.endswith("positive"): yield from self.generate_field_tests(is_positive=True) elif fixture.endswith("negative"): yield from self.generate_field_tests(is_positive=False) elif fixture.endswith("tags"): yield from self.generate_tag_tests() elif fixture.endswith("eventtypes"): yield from self.generate_eventtype_tests() elif fixture.endswith("savedsearches"): yield from self.generate_savedsearches_tests() elif fixture.endswith("requirements"): yield from self.generate_requirements_tests() elif fixture.endswith("datamodels"): yield from self.generate_requirements_datamodels_tests()
[docs] def generate_field_tests(self, is_positive): """ Generate test case for fields Args: is_positive (bool): Test type to generate Yields: pytest.params for the test templates """ LOGGER.info("generating field tests") field_itr = chain( FieldBank.init_field_bank_tests(self.field_bank), self.addon_parser.get_props_fields(), ) for fields_group in field_itr: # Generate test case for the stanza # Do not generate if it is a negative test case if is_positive: stanza_test_group = fields_group.copy() stanza_test_group["fields"] = [] yield pytest.param( stanza_test_group, id="{stanza}".format(**fields_group) ) # Generate a test case for all the fields in the classname if self._contains_classname(fields_group, ["EXTRACT", "REPORT", "LOOKUP"]): # ACD-4136: Convert the Field objects to dictionary to resolve the shared # memory issue with pytest-xdist parallel execution test_group = fields_group.copy() test_group["fields"] = [each.__dict__ for each in test_group["fields"]] yield pytest.param( test_group, id="{stanza}::{classname}".format(**test_group) ) # For each field mentioned in field_bank, a separate # test should be generated. # Counter to make the test_id unique field_bank_id = 0 # Generate test-cases for each field in classname one by one for each_field in fields_group["fields"]: # Create a dictionary for a single field with classname and stanza # ACD-4136: Convert the Field object to dictionary to resolve the shared # memory issue with pytest-xdist parallel execution one_field_group = fields_group.copy() one_field_group["fields"] = [each_field.__dict__] if fields_group["classname"] != "field_bank": test_type = "field" else: field_bank_id += 1 test_type = f"field_bank_{field_bank_id}" stanza = fields_group["stanza"] yield pytest.param( one_field_group, id=f"{stanza}::{test_type}::{each_field}" )
[docs] def generate_tag_tests(self): """ Generate test case for tags Yields: pytest.params for the test templates """ for each_tag_group in self.addon_parser.get_tags(): yield pytest.param( each_tag_group, id="{stanza}::tag::{tag}".format(**each_tag_group) )
[docs] def generate_requirements_datamodels_tests(self): """ Generate test case for tags Yields: pytest.params for the test templates """ for event in self.tokenized_events: if not event.requirement_test_data: continue if event.metadata.get("input_type", "").startswith("syslog"): stripped_event = xml_event_parser.strip_syslog_header(event.event) if stripped_event is None: LOGGER.error( "Syslog event do not match CEF, RFC_3164, RFC_5424 format" ) continue else: stripped_event = event.event escaped_event = xml_event_parser.escape_char_event(stripped_event) datamodels = event.requirement_test_data.get("datamodels") if datamodels: if type(datamodels) is dict: if type(datamodels["model"]) == list: datamodels = datamodels["model"] else: datamodels = [datamodels] datamodels = [dm["model"] for dm in datamodels] else: datamodels = [] datamodels = [ datamodel.replace(" ", "_").replace(":", "_") for datamodel in datamodels ] yield pytest.param( { "datamodels": datamodels, "stanza": escaped_event, }, id=f"{'-'.join(datamodels)}::sample_name::{event.sample_name}::host::{event.metadata.get('host')}", )
[docs] def generate_eventtype_tests(self): """ Generate test case for eventtypes Yields: pytest.params for the test templates """ for each_eventtype in self.addon_parser.get_eventtypes(): yield pytest.param( each_eventtype, id="eventtype::{stanza}".format(**each_eventtype) )
[docs] def generate_savedsearches_tests(self): """ Generate test case for savedsearches Yields: pytest.params for the test templates """ for each_savedsearch in self.addon_parser.get_savedsearches(): yield pytest.param( each_savedsearch, id="{stanza}".format(**each_savedsearch) )
[docs] def generate_requirements_tests(self): """ Generate test cases for fields defined for datamodel These function generates tests previously covered by requirement tests Yields: pytest.params for the test templates """ for event in self.tokenized_events: if not event.requirement_test_data: continue if event.metadata.get("input_type", "").startswith("syslog"): stripped_event = xml_event_parser.strip_syslog_header(event.event) if stripped_event is None: LOGGER.error( "Syslog event do not match CEF, RFC_3164, RFC_5424 format" ) continue else: stripped_event = event.event escaped_event = xml_event_parser.escape_char_event(stripped_event) exceptions = event.requirement_test_data.get("exceptions", {}) metadata = event.metadata modinput_params = { "sourcetype": metadata.get("sourcetype_to_search"), } cim_fields = event.requirement_test_data.get("cim_fields", {}) if cim_fields: cim_fields = { field: value for field, value in cim_fields.items() if field not in exceptions } yield pytest.param( { "escaped_event": escaped_event, "fields": cim_fields, "modinput_params": modinput_params, }, id=f"sample_name::{event.sample_name}::host::{event.metadata.get('host')}", )
def _contains_classname(self, fields_group, criteria): """ Check if the field_group dictionary contains the classname """ return any([fields_group["classname"].startswith(each) for each in criteria])