#
# Copyright 2021 Splunk Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Includes the test scenarios to check the index time properties of an Add-on.
"""
import logging
import pytest
import copy
from math import ceil
from ..cim_tests import FieldTestHelper
from ..utilities.log_helper import format_search_query_log
from ..utilities.log_helper import get_table_output
MAX_TIME_DIFFERENCE = 45
LOGGER = logging.getLogger("pytest-splunk-addon")
[docs]class IndexTimeTestTemplate(object):
"""
Test templates to test the index time fields of an App
"""
logger = logging.getLogger("pytest-splunk-addon-tests")
[docs] @pytest.mark.first
@pytest.mark.splunk_indextime
def test_indextime_key_fields(
self,
splunk_search_util,
splunk_ingest_data,
splunk_setup,
splunk_indextime_key_fields,
record_property,
caplog,
):
"""
This test case checks that a key_field has the expected values.
The key fields are as follows:
* src
* src_port
* dest
* dest_port
* dvc
* host
* user
* url
Args:
splunk_search_util (SearchUtil): Object that helps to search on Splunk.
splunk_ingest_data (fixture): To ingest data into splunk.
splunk_indextime_key_fields (fixture): Test for key fields
record_property (fixture): Document facts of test cases.
caplog (fixture): fixture to capture logs.
"""
index_list = (
"(index="
+ " OR index=".join(splunk_search_util.search_index.split(","))
+ ")"
)
assert splunk_indextime_key_fields.get(
"identifier"
) or splunk_indextime_key_fields.get(
"hosts"
), "Host or identifier fields cannot be determined from the config file.."
if splunk_indextime_key_fields.get("identifier"):
extra_filter = splunk_indextime_key_fields.get("identifier")
else:
extra_filter = (
'host IN ("'
+ '","'.join(set(splunk_indextime_key_fields.get("hosts")))
+ '")'
)
fields_to_check = copy.deepcopy(
splunk_indextime_key_fields["tokenized_event"].key_fields
)
query = "sourcetype={} {} | table {}".format(
splunk_indextime_key_fields.get("sourcetype"),
extra_filter,
",".join(fields_to_check),
)
search = "search {} {}".format(index_list, query)
record_property("Query", search)
LOGGER.debug("Base search for indextime key field test: {}".format(search))
results = splunk_search_util.getFieldValuesList(
search,
interval=splunk_search_util.search_interval,
retries=splunk_search_util.search_retry,
)
results = list(results)
LOGGER.debug("Results:{}".format(results))
if not results:
assert False, (
f"\nNo Events found for query." f"{format_search_query_log(search)}"
)
result_fields = dict()
for result in results:
for key, val in result.items():
try:
result_fields[key].append(val)
except KeyError:
result_fields[key] = [val]
# This logic helps in comparing Results when the token is
# only replaced once but the value is assigned to n events
# Example syslog: all the headers are only tokenized once hence
# key_fields = {'host': ['dummy_host']}
# result_dict = {'host': ['dummy_host']*n}
result_fields = {key: set(value) for key, value in result_fields.items()}
fields_to_check = {key: set(value) for key, value in fields_to_check.items()}
if not result_fields == fields_to_check:
value_list, missing_keys = [], []
for each_field in fields_to_check.keys():
if each_field in result_fields.keys():
if not fields_to_check.get(each_field) == result_fields.get(
each_field
):
value_list.append(
[
each_field,
fields_to_check[each_field],
result_fields.get(each_field),
]
)
else:
missing_keys.append([each_field, fields_to_check[each_field]])
final_str = ""
if value_list:
result_str = get_table_output(
headers=["Key_field", "Expected_values", "Actual_values"],
value_list=[
[
each_value[0],
str(each_value[1]),
str(each_value[2]),
]
for each_value in value_list
],
)
final_str += f"Some values for the following key fields are missing\n\n{result_str}"
if missing_keys:
missing_keys_result_str = get_table_output(
headers=["Key_field", "Expected_values"],
value_list=[
[
each_key[0],
str(each_key[1]),
]
for each_key in missing_keys
],
)
final_str += f"\n\nSome key fields are not found in search results\n\n{missing_keys_result_str}"
LOGGER.info(final_str)
assert (
int(len(value_list)) == 0 and int(len(missing_keys)) == 0
), f"\nSearch query: {search}{final_str}\n"
[docs] @pytest.mark.first
@pytest.mark.splunk_indextime
def test_indextime_time(
self,
splunk_search_util,
splunk_ingest_data,
splunk_setup,
splunk_indextime_time,
record_property,
caplog,
):
"""
This test case checks that _time value in the events has the expected values.
Args:
splunk_search_util (SearchUtil): Object that helps to search on Splunk.
splunk_ingest_data (fixture): To ingest data into splunk.
splunk_indextime_time (fixture): Test for _time field
record_property (fixture): Document facts of test cases.
caplog (fixture): fixture to capture logs.
"""
index_list = (
"(index="
+ " OR index=".join(splunk_search_util.search_index.split(","))
+ ")"
)
assert splunk_indextime_time.get("identifier") or splunk_indextime_time.get(
"hosts"
), "Host or identifier fields cannot be determined from the config file.."
assert splunk_indextime_time[
"tokenized_event"
].time_values, "_time field cannot be determined from the config file."
if splunk_indextime_time.get("identifier"):
extra_filter = splunk_indextime_time.get("identifier")
else:
extra_filter = (
'host IN ("'
+ '","'.join(set(splunk_indextime_time.get("hosts")))
+ '")'
)
if splunk_indextime_time["tokenized_event"].time_values:
extra_filter += " | eval e_time=_time"
query = "sourcetype={} {} | table {}".format(
splunk_indextime_time.get("sourcetype"),
extra_filter,
"e_time",
)
search = "search {} {}".format(index_list, query)
record_property("Query", search)
LOGGER.debug("Base search for indextime time field test: {}".format(search))
results = splunk_search_util.getFieldValuesList(
search,
interval=splunk_search_util.search_interval,
retries=splunk_search_util.search_retry,
)
results = list(results)
LOGGER.debug("Results:{}".format(results))
if not results:
assert False, (
f"\nNo Events found for query." f"{format_search_query_log(search)}"
)
result_fields = {
key: [ceil(float(item[key])) for item in results]
for key in results[0].keys()
}
key_time = [
ceil(t) for t in splunk_indextime_time["tokenized_event"].time_values
]
result_fields["e_time"].sort()
key_time.sort()
record_property("time_values", key_time)
record_property("result_time", result_fields)
assert all(
timestamp in result_fields["e_time"] for timestamp in key_time
), "Actual time {} :: Time in result {}".format(
key_time, result_fields["e_time"]
)
[docs] @pytest.mark.first
@pytest.mark.splunk_indextime
def test_indextime_line_breaker(
self,
splunk_search_util,
splunk_ingest_data,
splunk_setup,
splunk_indextime_line_breaker,
record_property,
caplog,
):
"""
This test case checks that number of events is as expected.
Args:
splunk_search_util (SearchUtil): Object that helps to search on Splunk.
splunk_ingest_data (fixture): To ingest data into splunk.
splunk_indextime_line_breaker (fixture): Test for event count
record_property (fixture): Document facts of test cases.
caplog (fixture): fixture to capture logs.
"""
expected_events_count = int(
splunk_indextime_line_breaker["expected_event_count"]
)
index_list = (
"(index="
+ " OR index=".join(splunk_search_util.search_index.split(","))
+ ")"
)
host = '("' + '","'.join(splunk_indextime_line_breaker.get("host")) + '")'
query = "search {} sourcetype={} host IN {} | stats count".format(
index_list, splunk_indextime_line_breaker.get("sourcetype"), host
)
record_property("Query", query)
LOGGER.debug("Base search for indextime key field test: {}".format(query))
results = list(
splunk_search_util.getFieldValuesList(
query,
interval=splunk_search_util.search_interval,
retries=splunk_search_util.search_retry,
)
)
count_from_results = int(results[0].get("count"))
LOGGER.debug("Resulting count:{}".format(count_from_results))
assert count_from_results == expected_events_count, (
f"{format_search_query_log(query)}"
f"\nExpected count: {expected_events_count} Actual Count: {count_from_results}"
)