Source code for standard_lib.index_tests.test_generator

#
# Copyright 2021 Splunk Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
import pytest

from ..sample_generation import SampleXdistGenerator
from ..sample_generation.rule import raise_warning
from ..sample_generation.sample_event import SampleEvent

LOGGER = logging.getLogger("pytest-splunk-addon")


[docs]class IndexTimeTestGenerator(object): """ Generates test cases to test the index time extraction of an Add-on. * Provides the pytest parameters to the test templates. * Supports key_fields: List of fields which should be tested for the Add-on. """
[docs] def generate_tests(self, store_events, app_path, config_path, test_type): """ Generates the test cases based on test_type Args: app_path (str): Path of the app package config_path (str): Path of package which contains pytest-splunk-addon-data.conf test_type (str): Type of test case Yields: pytest.params for the test templates """ sample_generator = SampleXdistGenerator(app_path, config_path) store_sample = sample_generator.get_samples(store_events) tokenized_events = store_sample.get("tokenized_events") if not store_sample.get("conf_name") == "psa-data-gen": msg = ( "Index time tests cannot be executed without " "pytest-splunk-addon-data.conf" ) LOGGER.warning(msg) return msg if test_type == "line_breaker": LOGGER.info("Generating line breaker test") yield from self.generate_line_breaker_tests(tokenized_events) else: for tokenized_event in tokenized_events: identifier_key = tokenized_event.metadata.get("identifier") hosts = self.get_hosts(tokenized_event) # Generate test params only if key_fields if test_type == "key_fields" and tokenized_event.key_fields: event = SampleEvent.copy(tokenized_event) if tokenized_event.key_fields.get( "host" ) and tokenized_event.metadata.get("host_prefix"): host_prefix = tokenized_event.metadata.get("host_prefix") event.key_fields["host"] = self.add_host_prefix( host_prefix, tokenized_event.key_fields.get("host") ) LOGGER.debug( "Generating Key field test with the following params:\nevent={e}\nidentifier_key={k}\nhosts={h}".format( e=event, k=identifier_key, h=hosts ) ) yield from self.generate_params(event, identifier_key, hosts) # Generate test only if time_values elif ( test_type == "_time" and tokenized_event.metadata.get("timestamp_type") == "event" and not ( int(tokenized_event.metadata.get("requirement_test_sample", 0)) > 0 and tokenized_event.time_values == [] ) ): LOGGER.debug( "Generating time field test with the following params:\ntokenized_event={e}\nidentifier_key={k}\nhosts={h}".format( e=tokenized_event, k=identifier_key, h=hosts ) ) yield from self.generate_params( tokenized_event, identifier_key, hosts )
[docs] def generate_line_breaker_tests(self, tokenized_events): """ Generates test case for testing line breaker Yields: pytest.params for the test templates """ line_breaker_params = {} sample_count = 1 expected_count = 1 # As all the sample events would have same properties except Host # Assigning those values outside the loop for event in tokenized_events: try: sample_count = int(event.metadata.get("sample_count", 1)) expected_count = int(event.metadata.get("expected_event_count", 1)) LOGGER.info( "Sample Count: {}".format( int(event.metadata.get("sample_count", 1)) ) ) LOGGER.info( "Expected Count: {}".format( int(event.metadata.get("expected_event_count", 1)) ) ) except ValueError as e: raise_warning("Invalid value {}".format(e)) if event.sample_name not in line_breaker_params: line_breaker_params[event.sample_name] = {} if not line_breaker_params[event.sample_name].get("sourcetype"): line_breaker_params[event.sample_name][ "sourcetype" ] = self.get_sourcetype(event) if not line_breaker_params[event.sample_name].get("expected_event_count"): if event.metadata.get("input_type") not in [ "modinput", "windows_input", ]: expected_count = expected_count * sample_count line_breaker_params[event.sample_name][ "expected_event_count" ] = expected_count if not line_breaker_params[event.sample_name].get("host"): line_breaker_params[event.sample_name]["host"] = set() event_host = self.get_hosts(event) if event_host: line_breaker_params[event.sample_name]["host"] |= set(event_host) for sample_name, params in line_breaker_params.items(): LOGGER.debug( "Generating Line Breaker test with the following params:\nhost:{h}\nsourcetype:{s}\nexpected_event_count{e}".format( h=params["host"], s=params["sourcetype"], e=params["expected_event_count"], ) ) yield pytest.param( { "host": params["host"], "sourcetype": params["sourcetype"], "expected_event_count": params["expected_event_count"], }, id="{}::{}".format(params["sourcetype"].replace(" ", "-"), sample_name), )
[docs] def get_hosts(self, tokenized_event): """ Returns value of host for event Args: tokenized_event (SampleEvent): Instance containing event info Returns: Value of host for event """ if tokenized_event.metadata.get("host_type") in ("plugin", None): hosts = tokenized_event.metadata.get("host") elif tokenized_event.metadata.get("host_type") == "event": hosts = tokenized_event.key_fields.get("host") else: hosts = None LOGGER.error( "Invalid 'host_type' for stanza {}".format(tokenized_event.sample_name) ) if isinstance(hosts, str): hosts = [hosts] if tokenized_event.metadata.get("host_prefix"): host_prefix = str(tokenized_event.metadata.get("host_prefix")) hosts = self.add_host_prefix(host_prefix, hosts) LOGGER.info( "Returning host with value {} for stanza {}".format( hosts, tokenized_event.sample_name ) ) return hosts
[docs] def add_host_prefix(self, host_prefix, hosts): """ Returns value of host with prefix Args: host_prefix (str): Prefix value to be added in host hosts (list): List of host Returns: Value of host with prefix """ hosts = [host_prefix + str(host) for host in hosts] return hosts
[docs] def get_sourcetype(self, sample_event): """ Returns value of sourcetype for event Args: sample_event (SampleEvent): Instance containing event info Returns: Value of sourcetype for event """ return sample_event.metadata.get( "sourcetype_to_search", sample_event.metadata.get("sourcetype", "*"), )
[docs] def get_source(self, sample_event): """ Returns value of source for event Args: sample_event (SampleEvent): Instance containing event info Returns: Value of source for event """ return sample_event.metadata.get( "source_to_search", sample_event.metadata.get("source", "*") )
[docs] def generate_params(self, tokenized_event, identifier_key, hosts): """ Generates test case based on parameters Args: tokenized_event (SampleEvent): Instance containing event info identifier_key (str): Identifier Key if mention in conf file hosts (list): List of host for event Yields: pytest.params for the test templates """ if identifier_key: yield from self.generate_identifier_params(tokenized_event, identifier_key) else: yield from self.generate_hosts_params(tokenized_event, hosts)
[docs] def generate_identifier_params(self, tokenized_event, identifier_key): """ Generates test case based on Identifier key mentioned in conf file Args: tokenized_event (SampleEvent): Instance containing event info identifier_key (str): Identifier Key if mention in conf file Yields: pytest.params for the test templates """ identifier_val = tokenized_event.key_fields.get(identifier_key) for identifier in identifier_val: yield pytest.param( { "identifier": identifier_key + "=" + identifier, "sourcetype": self.get_sourcetype(tokenized_event), "source": self.get_source(tokenized_event), "tokenized_event": tokenized_event, }, id="{}::{}:{}".format( self.get_sourcetype(tokenized_event), identifier_key, identifier, ), )
[docs] def generate_hosts_params(self, tokenized_event, hosts): """ Generates test case based on host value of the event Args: tokenized_event (SampleEvent): Instance containing event info hosts (list): List of hosts for event Yields: pytest.params for the test templates """ id_host = tokenized_event.sample_name if hosts: if len(hosts) == 1: id_host = hosts[0] else: id_host = hosts[0] + "_to_" + hosts[-1] yield pytest.param( { "hosts": hosts, "sourcetype": self.get_sourcetype(tokenized_event), "source": self.get_source(tokenized_event), "tokenized_event": tokenized_event, }, id="{}::{}".format(self.get_sourcetype(tokenized_event), id_host), )