Source code for standard_lib.addon_parser.props_parser

#
# Copyright 2021 Splunk Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Provides props.conf parsing mechanism
"""
from typing import Dict
from typing import Generator
from typing import Optional
import logging
import os
import re
from itertools import product

import addonfactory_splunk_conf_parser_lib as conf_parser

from .fields import convert_to_fields
from .transforms_parser import TransformsParser

LOGGER = logging.getLogger("pytest-splunk-addon")


[docs]class PropsParser(object): """ Parses props.conf and extracts the fields. Args: splunk_app_path (str): Path of the Splunk app """ def __init__(self, splunk_app_path: str): self._conf_parser = conf_parser.TABConfigParser() self.splunk_app_path = splunk_app_path self._props = None self.transforms_parser = TransformsParser(self.splunk_app_path) @property def props(self) -> Optional[Dict]: if self._props is not None: return self._props props_conf_path = os.path.join(self.splunk_app_path, "default", "props.conf") LOGGER.info("Parsing props.conf") self._conf_parser.read(props_conf_path) self._props = self._conf_parser.item_dict() return self._props if self._props else None
[docs] def get_props_fields(self): """ Parse the props.conf and yield all supported fields Yields: generator of all the supported fields """ for stanza_type, stanza_name, stanza_values in self._get_props_stanzas(): for key, value in stanza_values.items(): LOGGER.info(f"Parsing parameter={key} of stanza={stanza_name}") if not re.match("REPORT", key, re.IGNORECASE): LOGGER.info(f"Trying to parse classname={key}") parsing_method = self._get_props_method(key) if parsing_method: field_list = list(parsing_method(key, value)) if field_list: yield { "stanza": stanza_name, "stanza_type": stanza_type, "classname": key, "fields": field_list, } else: for transform_stanza, fields in self._get_report_fields(key, value): field_list = list(fields) if field_list: yield { "stanza": stanza_name, "stanza_type": stanza_type, "classname": f"{key}::{transform_stanza}", "fields": field_list, }
def _get_props_method(self, class_name: str): """ Get the parsing method depending on classname Args: class_name (str): class name of the props property Returns: instance method to parse the property """ method_mapping = { "EXTRACT": self._get_extract_fields, "EVAL": self._get_eval_fields, "FIELDALIAS": self._get_fieldalias_fields, "LOOKUP": self._get_lookup_fields, } for each_type in method_mapping: if re.match(each_type, class_name, re.IGNORECASE): LOGGER.info(f"Matched method of type={each_type}") return method_mapping[each_type] else: LOGGER.warning(f"No parser available for {class_name}. Skipping...") def _get_props_stanzas(self) -> Optional[Generator]: """ Parse the props.conf of the App & yield stanzas. For source with | (OR), it will return all combinations Yields: generator of stanzas from the props """ if not self.props: return for stanza_name, stanza_values in self.props.items(): if stanza_name.startswith("host::"): LOGGER.warning("Host stanza is not supported. Skipping..") continue if stanza_name.startswith("source::"): LOGGER.info(f"Parsing Source based stanza: {stanza_name}") for each_source in self.get_list_of_sources(stanza_name): yield "source", each_source, stanza_values else: LOGGER.info(f"Parsing Sourcetype based stanza: {stanza_name}") yield "sourcetype", stanza_name, stanza_values
[docs] @staticmethod def get_list_of_sources(source: str) -> Generator: """ For source with | (OR), it will return all combinations. Uses itertools.product to list the combinations Example:: input "(preA|preB)str(postX|postY)" output [ preAstrpostX preBstrpostX preAstrpostY preBstrpostY ] Args: source (str): Source name Yields: generator of source name """ LOGGER.debug("Finding combinations of a source..") match_obj = re.search(r"source::(.*)", source) value = match_obj.group(1).replace("...", "*") sub_groups = re.findall(r"\([^\)]+\)", value) sub_group_list = [] for each_group in sub_groups: sub_group_list.append(each_group.strip("()").split("|")) template = re.sub(r"\([^\)]+\)", "{}", value) count = 0 for each_permutation in product(*sub_group_list): count += 1 yield template.format(*each_permutation) LOGGER.debug("Found %d combinations", count)
@convert_to_fields def _get_extract_fields(self, name: str, value: str): """ Returns the fields parsed from EXTRACT Example:: EXTRACT-one = regex with (?<capturing_group>.*) Args: name: key in the configuration settings value: value of the respective name in the configuration Regex: Parse the fields from a regex. Examples, * (?<name>regex) * (?'name'regex) * (?P<name>regex) Yields: generator of fields """ regex = r"\(\?P?(?:[<'])([^\>'\s]+)[\>']" fields_group = [] for field in re.findall(regex, value): if not field.startswith(("_KEY_", "_VAL_")): fields_group.append(field) yield field # If SOURCE_KEY is used in EXTRACT, generate the test for the same. regex_for_source_key = r"(?:(?i)in\s+(\w+))\s*$" extract_source_key = re.search(regex_for_source_key, value, re.MULTILINE) if extract_source_key: LOGGER.info(f"Found a source key in {name}") yield extract_source_key.group(1) fields_group.insert(0, extract_source_key.group(1)) @convert_to_fields def _get_eval_fields(self, name, value): """ Return the fields parsed from EVAL Example:: EVAL-action = if(isnull(action), "unknown", action) Args: name: key in the configuration settings value: value of the respective name in the configuration Yields: generator of fields """ regex = r"EVAL-(?P<FIELD>.*)" if not value == "null()": yield from re.findall(regex, name, re.IGNORECASE) @convert_to_fields def _get_fieldalias_fields(self, name: str, value: str): """ Return the fields parsed from FIELDALIAS Example:: FIELDALIAS-class = source AS dest, sc2 AS dest2 Args: name: key in the configuration settings value: value of the respective name in the configuration Regex: Description: * Find all field alias group separated by space or comma Examples: * field_source AS field_destination * "Field Source" as "Field Destination" * field_source ASNEW 'Field Destination' * field_source asnew field_destination Yields: generator of fields """ regex = ( r"(\"(?:\\\"|[^\"])*\"|\'(?:\\\'|[^\'])*\'|[^\s,]+)" r"\s+(?i)(?:as(?:new)?)\s+" r"(\"(?:\\\"|[^\"])*\"|\'(?:\\\'|[^\'])*\'|[^\s,]+)" ) fields_tuples = re.findall(regex, value, re.IGNORECASE) return list(set([item for t in fields_tuples for item in t])) def _get_report_fields(self, name: str, value: str): """ Returns the fields parsed from REPORT In order to parse the fields REPORT, the method parses the transforms.conf and returns the list Args: name: key in the configuration settings value: value of the respective name in the configuration Yields: generator of (transform_stanza ,fields) parsed from transforms.conf """ transforms_itr = (each_stanza.strip() for each_stanza in value.split(",")) for transforms_section in transforms_itr: yield ( transforms_section, self.transforms_parser.get_transform_fields(transforms_section), ) @convert_to_fields def _get_lookup_fields(self, name: str, value: str): """ Extracts the lookup fields Args: name: key in the configuration settings value: value of the respective name in the configuration Returns: List of lookup fields """ parsed_fields = self._parse_lookup(value) lookup_field_list = ( parsed_fields["input_fields"] + parsed_fields["output_fields"] ) # If the OUTPUT or OUTPUTNEW argument is never used, then get the fields from the csv file if not parsed_fields["output_fields"]: LOGGER.info( "OUTPUT fields not found classname=%s. Parsing the lookup csv file", name, ) lookup_field_list += list( self.transforms_parser.get_lookup_csv_fields( parsed_fields["lookup_stanza"] ) ) return list(set(lookup_field_list)) def _parse_lookup(self, lookup: str): """ Get list of lookup fields by parsing the lookup string. If a field is aliased to another field, take the aliased field into consideration Example:: LOOKUP-class = lookup_stanza input_field OUTPUT output_field Args: lookup_str (str): Lookup string from props.conf Regex: Parse the fields from the lookup string. Examples, * field1 AS field2, field3 field4 as field5 Returns: (dict): lookup_stanza (str): The stanza name for the lookup in question in transforms.conf input_fields (list): The fields in the input of the lookup output_fields (list): The fields in the output of the lookup """ input_output_field_list = [] lookup_stanza = lookup.split(" ")[0] lookup_str = " ".join(lookup.split(" ")[1:]) # 0: Take the left side of the OUTPUT as input fields # -1: Take the right side of the OUTPUT as output fields for input_output_index in [0, -1]: if "OUTPUT" not in lookup_str: lookup_str += " OUTPUT " # Take input fields or output fields depending on the input_output_index input_output_str = lookup_str.split("OUTPUTNEW")[input_output_index].split( "OUTPUT" )[input_output_index] field_parser = r"(\"(?:\\\"|[^\"])*\"|\'(?:\\\'|[^\'])*\'|[^\s,]+)\s*(?:[aA][sS]\s+(\"(?:\\\"|[^\"])*\"|\'(?:\\\'|[^\'])*\'|[^\s,]+))?" # field_groups: Group of max 2 fields - (source, destination) for "source as destination" field_groups = re.findall(field_parser, input_output_str) field_list = [] # Take the last non-empty field from a field group. # Taking last non-empty field ensures that the aliased value will have # higher priority for each_group in field_groups: field_list.append( [each_field for each_field in reversed(each_group) if each_field][0] ) input_output_field_list.append(field_list) return { "input_fields": input_output_field_list[0], "output_fields": input_output_field_list[1], "lookup_stanza": lookup_stanza, }