Source code for standard_lib.cim_tests.field_test_helper

#
# Copyright 2021 Splunk Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# -*- coding: utf-8 -*-
"""
Provides the helper methods to test addon_parser.Field object
"""
import logging
import json
from .field_test_adapter import FieldTestAdapter
from ..utilities.log_helper import get_table_output
from ..utilities.log_helper import format_search_query_log


[docs]class FieldTestHelper(object): """ Provides the helper methods to test addon_parser.Field object Args: search_util (SearchUtil): the util class to search on the Splunk instance fields (list addon_parser.Field): The field to be tested interval (int): at what interval each retry should be made retries (int): number of retries to make if no results found """ logger = logging.getLogger("pytest-splunk-addon-tests") def __init__(self, search_util, fields, interval=10, retries=4): self.search_util = search_util self.fields = FieldTestAdapter.get_test_fields(fields) self.interval = interval self.retries = retries
[docs] def test_field(self, base_search, record_property=None): """ Generate a query for the list of fields and return the result Format of the query is:: <condition> | eval <validity> | eval <expected_values> | eval <not negative_values> | eval <invalid_fields> | stats count as event_count, count(field) as field_count, count(valid_field) as valid_field_count, values(invalid_field) by sourcetype, source Args: base_search (str): Base search. Must be a search command. record_property (fixture): Document facts of test cases. Yields: dict: with source, sourcetype, field, event_count, field_count, valid_field_count, invalid_values keys """ self._make_search_query(base_search) self.logger.info(f"Executing the search query: {self.search}") if record_property: record_property("search", " ".join(self.search.splitlines())) self.results = list( self.search_util.getFieldValuesList( self.search, self.interval, self.retries ) ) return self._parse_result(self.results)
def _make_search_query(self, base_search): """ Make the search query by using the list of fields:: <base_search> <condition> | eval valid_field=<validity> | eval valid_field = if(field in <expected_values>) | eval valid_field = if(field not in <not negative_values>) | eval invalid_field = field if isnull(valid_field) | stats count as event_count, count(field) as field_count, count(valid_field) as valid_field_count, values(invalid_field) by sourcetype, source Args: base_search (str): The base search """ self.search = f"{base_search} {self._gen_condition()}" self.search_event = self.search for each_field in self.fields: self.search += each_field.gen_validity_query() self.search += " \n| stats count as event_count" for each_field in self.fields: self.search += each_field.get_stats_query() self.search += " by sourcetype, source" def _parse_result(self, results): """ Flatten the result into the following format:: [{ "sourcetype": str, "source:: str, "event_count": int, "field": Field, "field_count": int, "valid_field_count": int "invalid_values": list }] """ self.parsed_result = list() for each_result in results: sourcetype = each_result.get("sourcetype") source = each_result.get("source") event_count = int(each_result.get("event_count")) for each_field in self.fields: field_dict = { "field": each_field, "field_count": int( each_result.get( FieldTestAdapter.FIELD_COUNT.format(each_field.name) ) ), } if each_field.gen_validity_query(): field_dict["valid_field_count"] = int( each_result.get( FieldTestAdapter.VALID_FIELD_COUNT.format(each_field.name) ) ) field_dict["invalid_values"] = each_result.get( FieldTestAdapter.INVALID_FIELD_VALUES.format(each_field.name), "-", ) field_dict.update( { "sourcetype": sourcetype, "event_count": event_count, "source": source, } ) self.parsed_result.append(field_dict) if not self.fields: self.parsed_result.append( { "sourcetype": sourcetype, "event_count": event_count, "source": source, } ) return self.parsed_result def _gen_condition(self): return " OR ".join( [each_field.condition for each_field in self.fields if each_field.condition] )
[docs] def format_exc_message(self): """ Format the exception message to display 1) There's no field in the result:: Source Sourcetype Event Count ------------------------------------------- splunkd.log splunkd 10 scheduler.log scheduler 0 ------------------------------------------- Search = <search query> 2) There are multiple fields in the result:: Source Sourcetype Field Event Count Field Count Invalid Field Count Invalid Values ------------------------------------------------------------------------------------------------ splunkd.log splunkd One 10 10 5 'unknown' scheduler.log scheduler Two 20 20 7 '-', 'invalid' ------------------------------------------------------------------------------------------------ Event count = 20 Search = <search_query> Properties for the field :: One . . . """ if not self.fields: exc_message = get_table_output( headers=["Source", "Sourcetype", "Event Count"], value_list=[ [ each_result["source"], each_result["sourcetype"], each_result["event_count"], ] for each_result in self.parsed_result ], ) elif len(self.fields) >= 1: exc_message = get_table_output( headers=[ "Source", "Sourcetype", "Field", "Event Count", "Field Count", "Invalid Field Count", "Invalid Values", ], value_list=[ [ each_result["source"], each_result["sourcetype"], each_result["field"].name, each_result["event_count"], each_result["field_count"], each_result["field_count"] - each_result.get( "valid_field_count", each_result["field_count"] ), ( each_result["invalid_values"][:200] if each_result["invalid_values"] else "-" ), ] for each_result in self.parsed_result ], ) exc_message += f"{format_search_query_log(self.search)}" for each_field in self.fields: exc_message += ( f"\n\nProperties for the field :: {each_field.get_properties()}" ) return exc_message