Source code for standard_lib.index_tests.test_generator

import logging
import pytest

from ..sample_generation import SampleXdistGenerator
from ..sample_generation.rule import raise_warning
from ..sample_generation.sample_event import SampleEvent

LOGGER = logging.getLogger("pytest-splunk-addon")


[docs]class IndexTimeTestGenerator(object): """ Generates test cases to test the index time extraction of an Add-on. * Provides the pytest parameters to the test templates. * Supports key_fields: List of fields which should be tested for the Add-on. """
[docs] def generate_tests(self, store_events, app_path, config_path, test_type): """ Generates the test cases based on test_type Args: app_path (str): Path of the app package config_path (str): Path of package which contains pytest-splunk-addon-data.conf test_type (str): Type of test case Yields: pytest.params for the test templates """ sample_generator = SampleXdistGenerator(app_path, config_path) store_sample = sample_generator.get_samples(store_events) tokenized_events = store_sample.get("tokenized_events") if not store_sample.get("conf_name") == "psa-data-gen": LOGGER.warning( "Index Time tests cannot be executed using eventgen.conf, pytest-splunk-addon-data.conf is required.") return " Index Time tests cannot be executed using eventgen.conf,\ pytest-splunk-addon-data.conf is required." if test_type == "line_breaker": LOGGER.info("Generating line breaker test") yield from self.generate_line_breaker_tests(tokenized_events) else: for tokenized_event in tokenized_events: identifier_key = tokenized_event.metadata.get("identifier") hosts = self.get_hosts(tokenized_event) # Generate test params only if key_fields if test_type == "key_fields" and tokenized_event.key_fields: event = SampleEvent.copy(tokenized_event) if tokenized_event.key_fields.get('host') and tokenized_event.metadata.get('host_prefix'): host_prefix = tokenized_event.metadata.get('host_prefix') event.key_fields['host'] = self.add_host_prefix(host_prefix, tokenized_event.key_fields.get('host')) LOGGER.debug( "Generating Key field test with the following params:\nevent={e}\nidentifier_key={k}\nhosts={h}".format(e=event,k=identifier_key,h=hosts)) yield from self.generate_params( event, identifier_key, hosts ) # Generate test only if time_values elif test_type == "_time" and tokenized_event.metadata.get('timestamp_type') == 'event': LOGGER.debug("Generating time field test with the following params:\ntokenized_event={e}\nidentifier_key={k}\nhosts={h}".format(e=tokenized_event, k=identifier_key, h=hosts)) yield from self.generate_params( tokenized_event, identifier_key, hosts )
[docs] def generate_line_breaker_tests(self, tokenized_events): """ Generates test case for testing line breaker Args: tokenized_events (list): List of tokenized events Yields: pytest.params for the test templates """ line_breaker_params = {} sample_count = 1 expected_count = 1 # As all the sample events would have same properties except Host # Assigning those values outside the loop for event in tokenized_events: try: sample_count = int(event.metadata.get("sample_count", 1)) expected_count = int( event.metadata.get("expected_event_count", 1) ) LOGGER.info("Sample Count: {}".format(int(event.metadata.get("sample_count", 1)))) LOGGER.info("Expected Count: {}".format(int(event.metadata.get("expected_event_count", 1)))) except ValueError as e: raise_warning("Invalid value {}".format(e)) if event.sample_name not in line_breaker_params: line_breaker_params[event.sample_name] = {} if not line_breaker_params[event.sample_name].get('sourcetype'): line_breaker_params[event.sample_name][ "sourcetype" ] = self.get_sourcetype(event) if not line_breaker_params[event.sample_name].get('expected_event_count'): if event.metadata.get("input_type") not in [ "modinput", "windows_input", ]: expected_count = expected_count * sample_count line_breaker_params[event.sample_name][ "expected_event_count" ] = expected_count if not line_breaker_params[event.sample_name].get('host'): line_breaker_params[event.sample_name]['host'] = set() event_host = self.get_hosts(event) if event_host: line_breaker_params[event.sample_name]['host']|=set(event_host) for sample_name, params in line_breaker_params.items(): LOGGER.debug( "Generating Line Breaker test with the following params:\nhost:{h}\nsourcetype:{s}\nexpected_event_count{e}".format(h=params["host"], s=params["sourcetype"], e=params["expected_event_count"])) yield pytest.param( { "host": params["host"], "sourcetype": params["sourcetype"], "expected_event_count": params["expected_event_count"], }, id="{}::{}".format( params["sourcetype"].replace(" ", "-"), sample_name ), )
[docs] def get_hosts(self, tokenized_event): """ Returns value of host for event Args: tokenized_event (SampleEvent): Instance containing event info Returns: Value of host for event """ if tokenized_event.metadata.get("host_type") in ("plugin", None): hosts = tokenized_event.metadata.get("host") elif tokenized_event.metadata.get("host_type") == "event": hosts = tokenized_event.key_fields.get("host") else: hosts = None LOGGER.error( "Invalid 'host_type' for stanza {}".format( tokenized_event.sample_name ) ) if isinstance(hosts, str): hosts = [hosts] if tokenized_event.metadata.get("host_prefix"): host_prefix = str(tokenized_event.metadata.get("host_prefix")) hosts = self.add_host_prefix(host_prefix, hosts) LOGGER.info("Returning host with value {} for stanza {}".format( hosts, tokenized_event.sample_name)) return hosts
[docs] def add_host_prefix(self, host_prefix, hosts): """ Returns value of host with prefix Args: host_prefix (str): Prefix value to be added in host hosts (list): List of host Returns: Value of host with prefix """ hosts = [host_prefix + str(host) for host in hosts] return hosts
[docs] def get_sourcetype(self, sample_event): """ Returns value of sourcetype for event Args: sample_event (SampleEvent): Instance containing event info Returns: Value of sourcetype for event """ return sample_event.metadata.get( "sourcetype_to_search", sample_event.metadata.get("sourcetype", "*"), )
[docs] def get_source(self, sample_event): """ Returns value of source for event Args: sample_event (SampleEvent): Instance containing event info Returns: Value of source for event """ return sample_event.metadata.get( "source_to_search", sample_event.metadata.get("source", "*") )
[docs] def generate_params(self, tokenized_event, identifier_key, hosts): """ Generates test case based on parameters Args: tokenized_event (SampleEvent): Instance containing event info identifier_key (str): Identifier Key if mention in conf file hosts (list): List of host for event Yields: pytest.params for the test templates """ if identifier_key: yield from self.generate_identifier_params( tokenized_event, identifier_key ) else: yield from self.generate_hosts_params(tokenized_event, hosts)
[docs] def generate_identifier_params(self, tokenized_event, identifier_key): """ Generates test case based on Identifier key mentioned in conf file Args: tokenized_event (SampleEvent): Instance containing event info identifier_key (str): Identifier Key if mention in conf file Yields: pytest.params for the test templates """ identifier_val = tokenized_event.key_fields.get(identifier_key) for identifier in identifier_val: yield pytest.param( { "identifier": identifier_key + "=" + identifier, "sourcetype": self.get_sourcetype(tokenized_event), "source": self.get_source(tokenized_event), "tokenized_event": tokenized_event, }, id="{}::{}:{}".format( self.get_sourcetype(tokenized_event), identifier_key, identifier, ), )
[docs] def generate_hosts_params(self, tokenized_event, hosts): """ Generates test case based on host value of the event Args: tokenized_event (SampleEvent): Instance containing event info hosts (list): List of hosts for event Yields: pytest.params for the test templates """ id_host = tokenized_event.sample_name if hosts: if len(hosts) == 1: id_host = hosts[0] else: id_host = hosts[0] + "_to_" + hosts[-1] yield pytest.param( { "hosts": hosts, "sourcetype": self.get_sourcetype(tokenized_event), "source": self.get_source(tokenized_event), "tokenized_event": tokenized_event, }, id="{}::{}".format(self.get_sourcetype(tokenized_event), id_host), )