# Copyright 2021 Splunk Inc.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
Provides props.conf parsing mechanism
from typing import Dict
from typing import Generator
from typing import Optional
import logging
import os
import re
from itertools import product

import addonfactory_splunk_conf_parser_lib as conf_parser

from .fields import convert_to_fields
from .transforms_parser import TransformsParser

LOGGER = logging.getLogger("pytest-splunk-addon")

[docs]class PropsParser(object): """ Parses props.conf and extracts the fields. Args: splunk_app_path (str): Path of the Splunk app """ def __init__(self, splunk_app_path: str): self._conf_parser = conf_parser.TABConfigParser() self.splunk_app_path = splunk_app_path self._props = None self.transforms_parser = TransformsParser(self.splunk_app_path) @property def props(self) -> Optional[Dict]: if self._props is not None: return self._props props_conf_path = os.path.join(self.splunk_app_path, "default", "props.conf")"Parsing props.conf") self._props = self._conf_parser.item_dict() return self._props if self._props else None
[docs] def get_props_fields(self): """ Parse the props.conf and yield all supported fields Yields: generator of all the supported fields """ for stanza_type, stanza_name, stanza_values in self._get_props_stanzas(): for key, value in stanza_values.items():"Parsing parameter={key} of stanza={stanza_name}") if not re.match("REPORT", key, re.IGNORECASE):"Trying to parse classname={key}") parsing_method = self._get_props_method(key) if parsing_method: field_list = list(parsing_method(key, value)) if field_list: yield { "stanza": stanza_name, "stanza_type": stanza_type, "classname": key, "fields": field_list, } else: for transform_stanza, fields in self._get_report_fields(key, value): field_list = list(fields) if field_list: yield { "stanza": stanza_name, "stanza_type": stanza_type, "classname": f"{key}::{transform_stanza}", "fields": field_list, }
def _get_props_method(self, class_name: str): """ Get the parsing method depending on classname Args: class_name (str): class name of the props property Returns: instance method to parse the property """ method_mapping = { "EXTRACT": self._get_extract_fields, "EVAL": self._get_eval_fields, "FIELDALIAS": self._get_fieldalias_fields, "LOOKUP": self._get_lookup_fields, } for each_type in method_mapping: if re.match(each_type, class_name, re.IGNORECASE):"Matched method of type={each_type}") return method_mapping[each_type] else: LOGGER.warning(f"No parser available for {class_name}. Skipping...") def _get_props_stanzas(self) -> Optional[Generator]: """ Parse the props.conf of the App & yield stanzas. For source with | (OR), it will return all combinations Yields: generator of stanzas from the props """ if not self.props: return for stanza_name, stanza_values in self.props.items(): if stanza_name.startswith("host::"): LOGGER.warning("Host stanza is not supported. Skipping..") continue if stanza_name.startswith("source::"):"Parsing Source based stanza: {stanza_name}") for each_source in self.get_list_of_sources(stanza_name): yield "source", each_source, stanza_values else:"Parsing Sourcetype based stanza: {stanza_name}") yield "sourcetype", stanza_name, stanza_values
[docs] @staticmethod def get_list_of_sources(source: str) -> Generator: """ For source with | (OR), it will return all combinations. Uses itertools.product to list the combinations Example:: input "(preA|preB)str(postX|postY)" output [ preAstrpostX preBstrpostX preAstrpostY preBstrpostY ] Args: source (str): Source name Yields: generator of source name """ LOGGER.debug("Finding combinations of a source..") match_obj ="source::(.*)", source) value ="...", "*") sub_groups = re.findall(r"\([^\)]+\)", value) sub_group_list = [] for each_group in sub_groups: sub_group_list.append(each_group.strip("()").split("|")) template = re.sub(r"\([^\)]+\)", "{}", value) count = 0 for each_permutation in product(*sub_group_list): count += 1 yield template.format(*each_permutation) LOGGER.debug("Found %d combinations", count)
@convert_to_fields def _get_extract_fields(self, name: str, value: str): """ Returns the fields parsed from EXTRACT Example:: EXTRACT-one = regex with (?<capturing_group>.*) Args: name: key in the configuration settings value: value of the respective name in the configuration Regex: Parse the fields from a regex. Examples, * (?<name>regex) * (?'name'regex) * (?P<name>regex) Yields: generator of fields """ regex = r"\(\?P?(?:[<'])([^\>'\s]+)[\>']" fields_group = [] for field in re.findall(regex, value): if not field.startswith(("_KEY_", "_VAL_")): fields_group.append(field) yield field # If SOURCE_KEY is used in EXTRACT, generate the test for the same. regex_for_source_key = r"(?:(?i)in\s+(\w+))\s*$" extract_source_key =, value, re.MULTILINE) if extract_source_key:"Found a source key in {name}") yield fields_group.insert(0, @convert_to_fields def _get_eval_fields(self, name, value): """ Return the fields parsed from EVAL Example:: EVAL-action = if(isnull(action), "unknown", action) Args: name: key in the configuration settings value: value of the respective name in the configuration Yields: generator of fields """ regex = r"EVAL-(?P<FIELD>.*)" if not value == "null()": yield from re.findall(regex, name, re.IGNORECASE) @convert_to_fields def _get_fieldalias_fields(self, name: str, value: str): """ Return the fields parsed from FIELDALIAS Example:: FIELDALIAS-class = source AS dest, sc2 AS dest2 Args: name: key in the configuration settings value: value of the respective name in the configuration Regex: Description: * Find all field alias group separated by space or comma Examples: * field_source AS field_destination * "Field Source" as "Field Destination" * field_source ASNEW 'Field Destination' * field_source asnew field_destination Yields: generator of fields """ regex = ( r"(\"(?:\\\"|[^\"])*\"|\'(?:\\\'|[^\'])*\'|[^\s,]+)" r"\s+(?i)(?:as(?:new)?)\s+" r"(\"(?:\\\"|[^\"])*\"|\'(?:\\\'|[^\'])*\'|[^\s,]+)" ) fields_tuples = re.findall(regex, value, re.IGNORECASE) return list(set([item for t in fields_tuples for item in t])) def _get_report_fields(self, name: str, value: str): """ Returns the fields parsed from REPORT In order to parse the fields REPORT, the method parses the transforms.conf and returns the list Args: name: key in the configuration settings value: value of the respective name in the configuration Yields: generator of (transform_stanza ,fields) parsed from transforms.conf """ transforms_itr = (each_stanza.strip() for each_stanza in value.split(",")) for transforms_section in transforms_itr: yield ( transforms_section, self.transforms_parser.get_transform_fields(transforms_section), ) @convert_to_fields def _get_lookup_fields(self, name: str, value: str): """ Extracts the lookup fields Args: name: key in the configuration settings value: value of the respective name in the configuration Returns: List of lookup fields """ parsed_fields = self._parse_lookup(value) lookup_field_list = ( parsed_fields["input_fields"] + parsed_fields["output_fields"] ) # If the OUTPUT or OUTPUTNEW argument is never used, then get the fields from the csv file if not parsed_fields["output_fields"]: "OUTPUT fields not found classname=%s. Parsing the lookup csv file", name, ) lookup_field_list += list( self.transforms_parser.get_lookup_csv_fields( parsed_fields["lookup_stanza"] ) ) return list(set(lookup_field_list)) def _parse_lookup(self, lookup: str): """ Get list of lookup fields by parsing the lookup string. If a field is aliased to another field, take the aliased field into consideration Example:: LOOKUP-class = lookup_stanza input_field OUTPUT output_field Args: lookup_str (str): Lookup string from props.conf Regex: Parse the fields from the lookup string. Examples, * field1 AS field2, field3 field4 as field5 Returns: (dict): lookup_stanza (str): The stanza name for the lookup in question in transforms.conf input_fields (list): The fields in the input of the lookup output_fields (list): The fields in the output of the lookup """ input_output_field_list = [] lookup_stanza = lookup.split(" ")[0] lookup_str = " ".join(lookup.split(" ")[1:]) # 0: Take the left side of the OUTPUT as input fields # -1: Take the right side of the OUTPUT as output fields for input_output_index in [0, -1]: if "OUTPUT" not in lookup_str: lookup_str += " OUTPUT " # Take input fields or output fields depending on the input_output_index input_output_str = lookup_str.split("OUTPUTNEW")[input_output_index].split( "OUTPUT" )[input_output_index] field_parser = r"(\"(?:\\\"|[^\"])*\"|\'(?:\\\'|[^\'])*\'|[^\s,]+)\s*(?:[aA][sS]\s+(\"(?:\\\"|[^\"])*\"|\'(?:\\\'|[^\'])*\'|[^\s,]+))?" # field_groups: Group of max 2 fields - (source, destination) for "source as destination" field_groups = re.findall(field_parser, input_output_str) field_list = [] # Take the last non-empty field from a field group. # Taking last non-empty field ensures that the aliased value will have # higher priority for each_group in field_groups: field_list.append( [each_field for each_field in reversed(each_group) if each_field][0] ) input_output_field_list.append(field_list) return { "input_fields": input_output_field_list[0], "output_fields": input_output_field_list[1], "lookup_stanza": lookup_stanza, }