# -*- coding: utf-8 -*-
"""
Provides props.conf parsing mechanism
"""
import logging
import re
from itertools import product
from . import convert_to_fields, Field
from . import TransformsParser
LOGGER = logging.getLogger("pytest-splunk-addon")
[docs]class PropsParser(object):
"""
Parses props.conf and extracts the fields.
Args:
splunk_app_path (str): Path of the Splunk app
app (splunk_appinspect.App): Object of Splunk app
"""
def __init__(self, splunk_app_path, app):
self.app = app
self.splunk_app_path = splunk_app_path
self._props = None
self.transforms_parser = TransformsParser(self.splunk_app_path, self.app)
@property
def props(self):
try:
if not self._props:
LOGGER.info("Parsing props.conf")
self._props = self.app.props_conf()
return self._props
except OSError:
LOGGER.warning("props.conf not found.")
return None
[docs] def get_props_fields(self):
"""
Parse the props.conf and yield all supported fields
Yields:
generator of all the supported fields
"""
for stanza_type, stanza_name, stanza in self.get_props_stanzas():
for classname in stanza.options:
LOGGER.info(
"Parsing parameter=%s of stanza=%s", classname, stanza_name,
)
props_property = stanza.options[classname]
if not re.match("REPORT", classname, re.IGNORECASE):
LOGGER.info("Trying to parse classname=%s", classname)
parsing_method = self.get_props_method(classname)
if parsing_method:
field_list = list(parsing_method(props_property))
if field_list:
yield {
"stanza": stanza_name,
"stanza_type": stanza_type,
"classname": classname,
"fields": field_list,
}
else:
for transform_stanza, fields in self.get_report_fields(
props_property
):
field_list = list(fields)
if field_list:
yield {
"stanza": stanza_name,
"stanza_type": stanza_type,
"classname": f"{classname}::{transform_stanza}",
"fields": field_list,
}
[docs] def get_props_method(self, class_name):
"""
Get the parsing method depending on classname
Args:
class_name (str): class name of the props property
Returns:
instance method to parse the property
"""
method_mapping = {
"EXTRACT": self.get_extract_fields,
"EVAL": self.get_eval_fields,
"FIELDALIAS": self.get_fieldalias_fields,
"LOOKUP": self.get_lookup_fields,
}
for each_type in method_mapping:
if re.match(each_type, class_name, re.IGNORECASE):
LOGGER.info("Matched method of type=%s", each_type)
return method_mapping[each_type]
else:
LOGGER.warning("No parser available for %s. Skipping...", class_name)
[docs] def get_props_stanzas(self):
"""
Parse the props.conf of the App & yield stanzas.
For source with | (OR), it will return all combinations
Yields:
generator of stanzas from the props
"""
if not self.props:
return
for stanza_name in self.props.sects:
stanza = self.props.sects[stanza_name]
if stanza.name.startswith("host::"):
LOGGER.warning("Host stanza is not supported. Skipping..")
continue
if stanza.name.startswith("source::"):
LOGGER.info("Parsing Source based stanza: %s", stanza.name)
for each_source in self.get_list_of_sources(stanza_name):
yield "source", each_source, stanza
else:
LOGGER.info("Parsing Sourcetype based stanza: %s", stanza.name)
yield "sourcetype", stanza.name, stanza
[docs] @staticmethod
def get_list_of_sources(source):
"""
For source with | (OR), it will return all combinations.
Uses itertools.product to list the combinations
Example::
input "(preA|preB)str(postX|postY)"
output [
preAstrpostX
preBstrpostX
preAstrpostY
preBstrpostY
]
Args:
source (str): Source name
Yields:
generator of source name
"""
LOGGER.debug("Finding combinations of a source..")
match_obj = re.search(r"source::(.*)", source)
value = match_obj.group(1).replace("...", "*")
sub_groups = re.findall(r"\([^\)]+\)", value)
sub_group_list = []
for each_group in sub_groups:
sub_group_list.append(each_group.strip("()").split("|"))
template = re.sub(r"\([^\)]+\)", "{}", value)
count = 0
for each_permutation in product(*sub_group_list):
count += 1
yield template.format(*each_permutation)
LOGGER.debug("Found %d combinations", count)
[docs] def get_sourcetype_assignments(self, props_property):
"""
Get the sourcetype assigned for the source
Example::
[source::/splunk/var/log/splunkd.log]
sourcetype = splunkd
Args:
props_property (splunk_appinspect.configuration_file.ConfigurationSetting):
The configuration setting object of REPORT.
properties used:
* name : key in the configuration settings
* value : value of the respective name in the configuration
Yields:
the sourcetype field with possible value
"""
yield Field(
{"name": props_property.name, "expected_values": [props_property.value]}
)
[docs] @convert_to_fields
def get_eval_fields(self, props_property):
"""
Return the fields parsed from EVAL
Example::
EVAL-action = if(isnull(action), "unknown", action)
Args:
props_property (splunk_appinspect.configuration_file.ConfigurationSetting):
The configuration setting object of eval
properties used:
* name : key in the configuration settings
* value : value of the respective name in the configuration
Yields:
generator of fields
"""
regex = r"EVAL-(?P<FIELD>.*)"
if not props_property.value == "null()":
yield from re.findall(regex, props_property.name, re.IGNORECASE)
[docs] @convert_to_fields
def get_fieldalias_fields(self, props_property):
"""
Return the fields parsed from FIELDALIAS
Example::
FIELDALIAS-class = source AS dest, sc2 AS dest2
Args:
props_property (splunk_appinspect.configuration_file.ConfigurationSetting):
The configuration setting object of FIELDALIAS
properties used:
* name : key in the configuration settings
* value : value of the respective name in the configuration
Regex:
Description:
* Find all field alias group separated by space or comma
Examples:
* field_source AS field_destination
* "Field Source" as "Field Destination"
* field_source ASNEW 'Field Destination'
* field_source asnew field_destination
Yields:
generator of fields
"""
regex = (
r"(\"(?:\\\"|[^\"])*\"|\'(?:\\\'|[^\'])*\'|[^\s,]+)"
r"\s+(?i)(?:as(?:new)?)\s+"
r"(\"(?:\\\"|[^\"])*\"|\'(?:\\\'|[^\'])*\'|[^\s,]+)"
)
fields_tuples = re.findall(regex, props_property.value, re.IGNORECASE)
# Convert list of tuples into list
return list(set([item for t in fields_tuples for item in t]))
[docs] def get_report_fields(self, props_property):
"""
Returns the fields parsed from REPORT
In order to parse the fields REPORT, the method parses the
transforms.conf and returns the list
Args:
props_property (splunk_appinspect.configuration_file.ConfigurationSetting):
The configuration setting object of REPORT.
properties used:
* name : key in the configuration settings
* value : value of the respective name in the configuration
Yields:
generator of (transform_stanza ,fields) parsed from transforms.conf
"""
transforms_itr = (
each_stanza.strip() for each_stanza in props_property.value.split(",")
)
for transforms_section in transforms_itr:
yield (
transforms_section,
self.transforms_parser.get_transform_fields(transforms_section),
)
[docs] @convert_to_fields
def get_lookup_fields(self, props_property):
"""
Extracts the lookup fields
Args:
props_property (splunk_appinspect.configuration_file.ConfigurationSetting):
The configuration setting object of eval
properties used:
* name : key in the configuration settings
* value : value of the respective name in the configuration
Returns:
List of lookup fields
"""
parsed_fields = self.parse_lookup_str(props_property.value)
lookup_field_list = (
parsed_fields["input_fields"] + parsed_fields["output_fields"]
)
# If the OUTPUT or OUTPUTNEW argument is never used, then get the fields from the csv file
if not parsed_fields["output_fields"]:
LOGGER.info(
"OUTPUT fields not found classname=%s. Parsing the lookup csv file",
props_property.name,
)
lookup_field_list += list(
self.transforms_parser.get_lookup_csv_fields(
parsed_fields["lookup_stanza"]
)
)
return list(set(lookup_field_list))
[docs] def parse_lookup_str(self, lookup_str):
"""
Get list of lookup fields by parsing the lookup string.
If a field is aliased to another field, take the aliased field into consideration
Example::
LOOKUP-class = lookup_stanza input_field OUTPUT output_field
Args:
lookup_str (str): Lookup string from props.conf
Regex:
Parse the fields from the lookup string. Examples,
* field1 AS field2, field3 field4 as field5
Returns:
(dict):
lookup_stanza (str): The stanza name for the lookup in question in transforms.conf
input_fields (list): The fields in the input of the lookup
output_fields (list): The fields in the output of the lookup
"""
input_output_field_list = []
lookup_stanza = lookup_str.split(" ")[0]
lookup_str = " ".join(lookup_str.split(" ")[1:])
# 0: Take the left side of the OUTPUT as input fields
# -1: Take the right side of the OUTPUT as output fields
for input_output_index in [0, -1]:
if "OUTPUT" not in lookup_str:
lookup_str += " OUTPUT "
# Take input fields or output fields depending on the input_output_index
input_output_str = lookup_str.split("OUTPUTNEW")[input_output_index].split(
"OUTPUT"
)[input_output_index]
field_parser = r"(\"(?:\\\"|[^\"])*\"|\'(?:\\\'|[^\'])*\'|[^\s,]+)\s*(?:[aA][sS]\s+(\"(?:\\\"|[^\"])*\"|\'(?:\\\'|[^\'])*\'|[^\s,]+))?"
# field_groups: Group of max 2 fields - (source, destination) for "source as destination"
field_groups = re.findall(field_parser, input_output_str)
field_list = []
# Take the last non-empty field from a field group.
# Taking last non-empty field ensures that the aliased value will have
# higher priority
for each_group in field_groups:
field_list.append(
[each_field for each_field in reversed(each_group) if each_field][0]
)
input_output_field_list.append(field_list)
return {
"input_fields": input_output_field_list[0],
"output_fields": input_output_field_list[1],
"lookup_stanza": lookup_stanza,
}