Source code for standard_lib.index_tests.test_templates

#
# Copyright 2021 Splunk Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Includes the test scenarios to check the index time properties of an Add-on.
"""
import logging
import pytest
import copy

from math import ceil
from ..cim_tests import FieldTestHelper
from ..utilities.log_helper import format_search_query_log
from ..utilities.log_helper import get_table_output

MAX_TIME_DIFFERENCE = 45
LOGGER = logging.getLogger("pytest-splunk-addon")


[docs]class IndexTimeTestTemplate(object): """ Test templates to test the index time fields of an App """ logger = logging.getLogger("pytest-splunk-addon-tests")
[docs] @pytest.mark.first @pytest.mark.splunk_indextime def test_indextime_key_fields( self, splunk_search_util, splunk_ingest_data, splunk_setup, splunk_indextime_key_fields, record_property, caplog, ): """ This test case checks that a key_field has the expected values. The key fields are as follows: * src * src_port * dest * dest_port * dvc * host * user * url Args: splunk_search_util (SearchUtil): Object that helps to search on Splunk. splunk_ingest_data (fixture): To ingest data into splunk. splunk_indextime_key_fields (fixture): Test for key fields record_property (fixture): Document facts of test cases. caplog (fixture): fixture to capture logs. """ index_list = ( "(index=" + " OR index=".join(splunk_search_util.search_index.split(",")) + ")" ) assert splunk_indextime_key_fields.get( "identifier" ) or splunk_indextime_key_fields.get( "hosts" ), "Host or identifier fields cannot be determined from the config file.." if splunk_indextime_key_fields.get("identifier"): extra_filter = splunk_indextime_key_fields.get("identifier") else: extra_filter = ( 'host IN ("' + '","'.join(set(splunk_indextime_key_fields.get("hosts"))) + '")' ) fields_to_check = copy.deepcopy( splunk_indextime_key_fields["tokenized_event"].key_fields ) query = "sourcetype={} {} | table {}".format( splunk_indextime_key_fields.get("sourcetype"), extra_filter, ",".join(fields_to_check), ) search = "search {} {}".format(index_list, query) record_property("Query", search) LOGGER.debug("Base search for indextime key field test: {}".format(search)) results = splunk_search_util.getFieldValuesList( search, interval=splunk_search_util.search_interval, retries=splunk_search_util.search_retry, ) results = list(results) LOGGER.debug("Results:{}".format(results)) if not results: assert False, ( f"\nNo Events found for query." f"{format_search_query_log(search)}" ) result_fields = dict() for result in results: for key, val in result.items(): try: result_fields[key].append(val) except KeyError: result_fields[key] = [val] # This logic helps in comparing Results when the token is # only replaced once but the value is assigned to n events # Example syslog: all the headers are only tokenized once hence # key_fields = {'host': ['dummy_host']} # result_dict = {'host': ['dummy_host']*n} result_fields = {key: set(value) for key, value in result_fields.items()} fields_to_check = {key: set(value) for key, value in fields_to_check.items()} if not result_fields == fields_to_check: value_list, missing_keys = [], [] for each_field in fields_to_check.keys(): if each_field in result_fields.keys(): if not fields_to_check.get(each_field) == result_fields.get( each_field ): value_list.append( [ each_field, fields_to_check[each_field], result_fields.get(each_field), ] ) else: missing_keys.append([each_field, fields_to_check[each_field]]) final_str = "" if value_list: result_str = get_table_output( headers=["Key_field", "Expected_values", "Actual_values"], value_list=[ [ each_value[0], str(each_value[1]), str(each_value[2]), ] for each_value in value_list ], ) final_str += f"Some values for the following key fields are missing\n\n{result_str}" if missing_keys: missing_keys_result_str = get_table_output( headers=["Key_field", "Expected_values"], value_list=[ [ each_key[0], str(each_key[1]), ] for each_key in missing_keys ], ) final_str += f"\n\nSome key fields are not found in search results\n\n{missing_keys_result_str}" LOGGER.info(final_str) assert ( int(len(value_list)) == 0 and int(len(missing_keys)) == 0 ), f"\nSearch query: {search}{final_str}\n"
[docs] @pytest.mark.first @pytest.mark.splunk_indextime def test_indextime_time( self, splunk_search_util, splunk_ingest_data, splunk_setup, splunk_indextime_time, record_property, caplog, ): """ This test case checks that _time value in the events has the expected values. Args: splunk_search_util (SearchUtil): Object that helps to search on Splunk. splunk_ingest_data (fixture): To ingest data into splunk. splunk_indextime_time (fixture): Test for _time field record_property (fixture): Document facts of test cases. caplog (fixture): fixture to capture logs. """ index_list = ( "(index=" + " OR index=".join(splunk_search_util.search_index.split(",")) + ")" ) assert splunk_indextime_time.get("identifier") or splunk_indextime_time.get( "hosts" ), "Host or identifier fields cannot be determined from the config file.." assert splunk_indextime_time[ "tokenized_event" ].time_values, "_time field cannot be determined from the config file." if splunk_indextime_time.get("identifier"): extra_filter = splunk_indextime_time.get("identifier") else: extra_filter = ( 'host IN ("' + '","'.join(set(splunk_indextime_time.get("hosts"))) + '")' ) if splunk_indextime_time["tokenized_event"].time_values: extra_filter += " | eval e_time=_time" query = "sourcetype={} {} | table {}".format( splunk_indextime_time.get("sourcetype"), extra_filter, "e_time", ) search = "search {} {}".format(index_list, query) record_property("Query", search) LOGGER.debug("Base search for indextime time field test: {}".format(search)) results = splunk_search_util.getFieldValuesList( search, interval=splunk_search_util.search_interval, retries=splunk_search_util.search_retry, ) results = list(results) LOGGER.debug("Results:{}".format(results)) if not results: assert False, ( f"\nNo Events found for query." f"{format_search_query_log(search)}" ) result_fields = { key: [ceil(float(item[key])) for item in results] for key in results[0].keys() } key_time = [ ceil(t) for t in splunk_indextime_time["tokenized_event"].time_values ] result_fields["e_time"].sort() key_time.sort() record_property("time_values", key_time) record_property("result_time", result_fields) assert all( timestamp in result_fields["e_time"] for timestamp in key_time ), "Actual time {} :: Time in result {}".format( key_time, result_fields["e_time"] )
[docs] @pytest.mark.first @pytest.mark.splunk_indextime def test_indextime_line_breaker( self, splunk_search_util, splunk_ingest_data, splunk_setup, splunk_indextime_line_breaker, record_property, caplog, ): """ This test case checks that number of events is as expected. Args: splunk_search_util (SearchUtil): Object that helps to search on Splunk. splunk_ingest_data (fixture): To ingest data into splunk. splunk_indextime_line_breaker (fixture): Test for event count record_property (fixture): Document facts of test cases. caplog (fixture): fixture to capture logs. """ expected_events_count = int( splunk_indextime_line_breaker["expected_event_count"] ) index_list = ( "(index=" + " OR index=".join(splunk_search_util.search_index.split(",")) + ")" ) host = '("' + '","'.join(splunk_indextime_line_breaker.get("host")) + '")' query = "search {} sourcetype={} host IN {} | stats count".format( index_list, splunk_indextime_line_breaker.get("sourcetype"), host ) record_property("Query", query) LOGGER.debug("Base search for indextime key field test: {}".format(query)) results = list( splunk_search_util.getFieldValuesList( query, interval=splunk_search_util.search_interval, retries=splunk_search_util.search_retry, ) ) count_from_results = int(results[0].get("count")) LOGGER.debug("Resulting count:{}".format(count_from_results)) assert count_from_results == expected_events_count, ( f"{format_search_query_log(query)}" f"\nExpected count: {expected_events_count} Actual Count: {count_from_results}" )