Source code for standard_lib.cim_tests.field_test_adapter

#
# Copyright 2021 Splunk Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from ..addon_parser import Field


[docs]class FieldTestAdapter(Field): """ Field adapter to include the testing related properties on top of Field Properties: * valid_field (str): New field generated which can only have the valid values * invalid_field (str): New field generated which can only have the invalid values * validity_query (str): The query which extracts the valid_field out of the field """ VALID_FIELD = "{}_valid" INVALID_FIELD = "{}_invalid" FIELD_COUNT = "{}_count" VALID_FIELD_COUNT = "{}_valid_count" INVALID_FIELD_VALUES = "{}_invalid_values" def __init__(self, field): self.__dict__ = field.__dict__.copy() self.valid_field = self.VALID_FIELD.format(field) self.invalid_field = self.INVALID_FIELD.format(field) self.validity_query = None
[docs] @staticmethod def get_query_from_values(values): """ List of values into SPL list Example:: ["a", "b"] to '\"a\", \"b\"' Args: values (list): List of str values Returns: str: SPL query list """ query = '\\", \\"'.join(values) return f'\\"{query}\\"'
[docs] def gen_validity_query(self): """ Generate validation search query:: | eval valid_field = <validity> | eval valid_field = if(searchmatch(valid_field in <expected_values>), valid_field, null()) | eval valid_field = if(searchmatch(valid_field in <negative_values>), null(), valid_field) | eval invalid_field=if(isnull(valid_field),field, null()) """ if not self.validity_query is None: return self.validity_query else: self.validity_query = "" if self.multi_value: self.validity_query += "\n" f"| nomv {self.name}" self.validity_query += "\n" f"| eval {self.valid_field}={self.validity}" if self.expected_values: self.validity_query += ( "\n" '| eval {valid_field}=if(searchmatch("{valid_field} IN ({values})"), {valid_field}, null())'.format( valid_field=self.valid_field, values=self.get_query_from_values(self.expected_values), ) ) if self.negative_values: self.validity_query += ( "\n" '| eval {valid_field}=if(searchmatch("{valid_field} IN ({values})"), null(), {valid_field})'.format( valid_field=self.valid_field, values=self.get_query_from_values(self.negative_values), ) ) self.validity_query += ( "\n" f"| eval {self.invalid_field}=if(isnull({self.valid_field}), {self.name}, null())" ) return self.validity_query
[docs] def get_stats_query(self): """ Generate stats search query:: count(field) as field_count, count(valid_field) as valid_field_count, values(invalid_field) as invalid_values """ query = f", count({self.name}) as {self.FIELD_COUNT.format(self.name)}" if self.gen_validity_query(): query += f", count({self.valid_field}) as {self.VALID_FIELD_COUNT.format(self.name)}" query += f", values({self.invalid_field}) as {self.INVALID_FIELD_VALUES.format(self.name)}" return query
@classmethod def get_test_fields(cls, fields): return [cls(each_field) for each_field in fields]