#
# Copyright 2021 Splunk Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
import pytest
from ..sample_generation import SampleXdistGenerator
from ..sample_generation.rule import raise_warning
from ..sample_generation.sample_event import SampleEvent
LOGGER = logging.getLogger("pytest-splunk-addon")
[docs]class IndexTimeTestGenerator(object):
"""
Generates test cases to test the index time extraction of an Add-on.
* Provides the pytest parameters to the test templates.
* Supports key_fields: List of fields which should be tested
for the Add-on.
"""
[docs] def generate_tests(self, store_events, app_path, config_path, test_type):
"""
Generates the test cases based on test_type
Args:
app_path (str): Path of the app package
config_path (str): Path of package which contains pytest-splunk-addon-data.conf
test_type (str): Type of test case
Yields:
pytest.params for the test templates
"""
sample_generator = SampleXdistGenerator(app_path, config_path)
store_sample = sample_generator.get_samples(store_events)
tokenized_events = store_sample.get("tokenized_events")
if not store_sample.get("conf_name") == "psa-data-gen":
msg = (
"Index time tests cannot be executed without "
"pytest-splunk-addon-data.conf"
)
LOGGER.warning(msg)
return msg
if test_type == "line_breaker":
LOGGER.info("Generating line breaker test")
yield from self.generate_line_breaker_tests(tokenized_events)
else:
for tokenized_event in tokenized_events:
identifier_key = tokenized_event.metadata.get("identifier")
hosts = self.get_hosts(tokenized_event)
# Generate test params only if key_fields
if test_type == "key_fields" and tokenized_event.key_fields:
event = SampleEvent.copy(tokenized_event)
if tokenized_event.key_fields.get(
"host"
) and tokenized_event.metadata.get("host_prefix"):
host_prefix = tokenized_event.metadata.get("host_prefix")
event.key_fields["host"] = self.add_host_prefix(
host_prefix, tokenized_event.key_fields.get("host")
)
LOGGER.debug(
"Generating Key field test with the following params:\nevent={e}\nidentifier_key={k}\nhosts={h}".format(
e=event, k=identifier_key, h=hosts
)
)
yield from self.generate_params(event, identifier_key, hosts)
# Generate test only if time_values
elif (
test_type == "_time"
and tokenized_event.metadata.get("timestamp_type") == "event"
and not (
int(tokenized_event.metadata.get("requirement_test_sample", 0))
> 0
and tokenized_event.time_values == []
)
):
LOGGER.debug(
"Generating time field test with the following params:\ntokenized_event={e}\nidentifier_key={k}\nhosts={h}".format(
e=tokenized_event, k=identifier_key, h=hosts
)
)
yield from self.generate_params(
tokenized_event, identifier_key, hosts
)
[docs] def generate_line_breaker_tests(self, tokenized_events):
"""
Generates test case for testing line breaker
Yields:
pytest.params for the test templates
"""
line_breaker_params = {}
sample_count = 1
expected_count = 1
# As all the sample events would have same properties except Host
# Assigning those values outside the loop
for event in tokenized_events:
try:
sample_count = int(event.metadata.get("sample_count", 1))
expected_count = int(event.metadata.get("expected_event_count", 1))
LOGGER.info(
"Sample Count: {}".format(
int(event.metadata.get("sample_count", 1))
)
)
LOGGER.info(
"Expected Count: {}".format(
int(event.metadata.get("expected_event_count", 1))
)
)
except ValueError as e:
raise_warning("Invalid value {}".format(e))
if event.sample_name not in line_breaker_params:
line_breaker_params[event.sample_name] = {}
if not line_breaker_params[event.sample_name].get("sourcetype"):
line_breaker_params[event.sample_name][
"sourcetype"
] = self.get_sourcetype(event)
if not line_breaker_params[event.sample_name].get("expected_event_count"):
if event.metadata.get("input_type") not in [
"modinput",
"windows_input",
]:
expected_count = expected_count * sample_count
line_breaker_params[event.sample_name][
"expected_event_count"
] = expected_count
if not line_breaker_params[event.sample_name].get("host"):
line_breaker_params[event.sample_name]["host"] = set()
event_host = self.get_hosts(event)
if event_host:
line_breaker_params[event.sample_name]["host"] |= set(event_host)
for sample_name, params in line_breaker_params.items():
LOGGER.debug(
"Generating Line Breaker test with the following params:\nhost:{h}\nsourcetype:{s}\nexpected_event_count{e}".format(
h=params["host"],
s=params["sourcetype"],
e=params["expected_event_count"],
)
)
yield pytest.param(
{
"host": params["host"],
"sourcetype": params["sourcetype"],
"expected_event_count": params["expected_event_count"],
},
id="{}::{}".format(params["sourcetype"].replace(" ", "-"), sample_name),
)
[docs] def get_hosts(self, tokenized_event):
"""
Returns value of host for event
Args:
tokenized_event (SampleEvent): Instance containing event info
Returns:
Value of host for event
"""
if tokenized_event.metadata.get("host_type") in ("plugin", None):
hosts = tokenized_event.metadata.get("host")
elif tokenized_event.metadata.get("host_type") == "event":
hosts = tokenized_event.key_fields.get("host")
else:
hosts = None
LOGGER.error(
"Invalid 'host_type' for stanza {}".format(tokenized_event.sample_name)
)
if isinstance(hosts, str):
hosts = [hosts]
if tokenized_event.metadata.get("host_prefix"):
host_prefix = str(tokenized_event.metadata.get("host_prefix"))
hosts = self.add_host_prefix(host_prefix, hosts)
LOGGER.info(
"Returning host with value {} for stanza {}".format(
hosts, tokenized_event.sample_name
)
)
return hosts
[docs] def add_host_prefix(self, host_prefix, hosts):
"""
Returns value of host with prefix
Args:
host_prefix (str): Prefix value to be added in host
hosts (list): List of host
Returns:
Value of host with prefix
"""
hosts = [host_prefix + str(host) for host in hosts]
return hosts
[docs] def get_sourcetype(self, sample_event):
"""
Returns value of sourcetype for event
Args:
sample_event (SampleEvent): Instance containing event info
Returns:
Value of sourcetype for event
"""
return sample_event.metadata.get(
"sourcetype_to_search",
sample_event.metadata.get("sourcetype", "*"),
)
[docs] def get_source(self, sample_event):
"""
Returns value of source for event
Args:
sample_event (SampleEvent): Instance containing event info
Returns:
Value of source for event
"""
return sample_event.metadata.get(
"source_to_search", sample_event.metadata.get("source", "*")
)
[docs] def generate_params(self, tokenized_event, identifier_key, hosts):
"""
Generates test case based on parameters
Args:
tokenized_event (SampleEvent): Instance containing event info
identifier_key (str): Identifier Key if mention in conf file
hosts (list): List of host for event
Yields:
pytest.params for the test templates
"""
if identifier_key:
yield from self.generate_identifier_params(tokenized_event, identifier_key)
else:
yield from self.generate_hosts_params(tokenized_event, hosts)
[docs] def generate_identifier_params(self, tokenized_event, identifier_key):
"""
Generates test case based on Identifier key mentioned in conf file
Args:
tokenized_event (SampleEvent): Instance containing event info
identifier_key (str): Identifier Key if mention in conf file
Yields:
pytest.params for the test templates
"""
identifier_val = tokenized_event.key_fields.get(identifier_key)
for identifier in identifier_val:
yield pytest.param(
{
"identifier": identifier_key + "=" + identifier,
"sourcetype": self.get_sourcetype(tokenized_event),
"source": self.get_source(tokenized_event),
"tokenized_event": tokenized_event,
},
id="{}::{}:{}".format(
self.get_sourcetype(tokenized_event),
identifier_key,
identifier,
),
)
[docs] def generate_hosts_params(self, tokenized_event, hosts):
"""
Generates test case based on host value of the event
Args:
tokenized_event (SampleEvent): Instance containing event info
hosts (list): List of hosts for event
Yields:
pytest.params for the test templates
"""
id_host = tokenized_event.sample_name
if hosts:
if len(hosts) == 1:
id_host = hosts[0]
else:
id_host = hosts[0] + "_to_" + hosts[-1]
yield pytest.param(
{
"hosts": hosts,
"sourcetype": self.get_sourcetype(tokenized_event),
"source": self.get_source(tokenized_event),
"tokenized_event": tokenized_event,
},
id="{}::{}".format(self.get_sourcetype(tokenized_event), id_host),
)