Source code for standard_lib.addon_parser.transforms_parser
# -*- coding: utf-8 -*-
"""
Provides transforms.conf parsing mechanism
"""
import logging
import re
import os
import csv
from urllib.parse import unquote
LOGGER = logging.getLogger("pytest-splunk-addon")
from . import convert_to_fields
[docs]class TransformsParser(object):
"""
Parses transforms.conf and extracts fields
Args:
splunk_app_path (str): Path of the Splunk app
app (splunk_appinspect.App): Object of Splunk app
"""
def __init__(self, splunk_app_path, app):
self.app = app
self.splunk_app_path = splunk_app_path
self._transforms = None
@property
def transforms(self):
try:
if not self._transforms:
LOGGER.info("Parsing transforms.conf")
self._transforms = self.app.transforms_conf()
return self._transforms
except OSError:
LOGGER.warning("transforms.conf not found.")
return None
[docs] @convert_to_fields
def get_transform_fields(self, transforms_stanza):
"""
Parse the tranforms.conf of the App & yield fields of
a specific stanza.
Supported extractions from transforms.conf are
* SOURCE_KEY = _raw
* REGEX = some regex with (capturing_group)
* FIELDS = one,
Args:
transforms_stanza (str):
The stanza of which the fields should be extracted
Regex:
Parse the fields from a regex. Examples::
(?<name>regex)
(?'name'regex)
(?P<name>regex)
Yields:
generator of fields
"""
try:
if not self.transforms:
return
transforms_section = self.transforms.sects[transforms_stanza]
if "SOURCE_KEY" in transforms_section.options:
LOGGER.info("Parsing source_key of %s", transforms_stanza)
yield transforms_section.options["SOURCE_KEY"].value
if "REGEX" in transforms_section.options:
LOGGER.info("Parsing REGEX of %s", transforms_stanza)
regex = r"\(\?P?[<'](?!_KEY|_VAL)([A-Za-z0-9_]+)[>']"
match_fields = re.findall(
regex, transforms_section.options["REGEX"].value
)
for each_field in match_fields:
if not each_field.startswith(("_KEY_", "_VAL_")):
yield each_field.strip()
if "FIELDS" in transforms_section.options:
LOGGER.info("Parsing FIELDS of %s", transforms_stanza)
for each_field in transforms_section.options["FIELDS"].value.split(","):
yield each_field.strip()
if "FORMAT" in transforms_section.options:
LOGGER.info("Parsing FORMAT of %s", transforms_stanza)
regex = r"(\S*)::"
match_fields = re.findall(
regex, transforms_section.options["FORMAT"].value
)
for each_field in match_fields:
if not "$" in each_field:
yield each_field.strip()
except KeyError:
LOGGER.error(
"The stanza {} does not exists in transforms.conf.".format(
transforms_stanza
),
)
[docs] def get_lookup_csv_fields(self, lookup_stanza):
"""
Parse the fields from a lookup file for a specific lookup_stanza
Args:
lookup_stanza (str): A lookup stanza mentioned in transforms.conf
Yields:
string of field names
"""
if not self.transforms:
return
if lookup_stanza in self.transforms.sects:
stanza = self.transforms.sects[lookup_stanza]
if "filename" in stanza.options:
lookup_file = stanza.options["filename"].value
try:
location = os.path.join(
self.splunk_app_path, "lookups", lookup_file
)
with open(location, "r") as csv_file:
reader = csv.DictReader(csv_file)
fieldnames = reader.fieldnames
for items in fieldnames:
yield items.strip()
# If there is an error. the test should fail with the current fields
# This makes sure the test doesn't exit prematurely
except (OSError, IOError, UnboundLocalError, TypeError) as e:
LOGGER.error(
"Could not read the lookup file, skipping test. error=%s",
str(e),
)