Source code for standard_lib.addon_parser.transforms_parser
#
# Copyright 2021 Splunk Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Provides transforms.conf parsing mechanism
"""
from typing import Dict
from typing import Generator
from typing import Optional
import logging
import re
import os
import csv
import addonfactory_splunk_conf_parser_lib as conf_parser
LOGGER = logging.getLogger("pytest-splunk-addon")
from . import convert_to_fields
[docs]class TransformsParser(object):
"""
Parses transforms.conf and extracts fields
Args:
splunk_app_path (str): Path of the Splunk app
"""
def __init__(self, splunk_app_path: str):
self._conf_parser = conf_parser.TABConfigParser()
self.splunk_app_path = splunk_app_path
self._transforms = None
@property
def transforms(self) -> Optional[Dict]:
if self._transforms is not None:
return self._transforms
transforms_conf_path = os.path.join(
self.splunk_app_path, "default", "transforms.conf"
)
LOGGER.info("Parsing transforms.conf")
self._conf_parser.read(transforms_conf_path)
self._transforms = self._conf_parser.item_dict()
return self._transforms if self._transforms else None
[docs] @convert_to_fields
def get_transform_fields(self, transforms_stanza: str) -> Optional[Generator]:
"""
Parse the transforms.conf of the App & yield fields of
a specific stanza.
Supported extractions from transforms.conf are
* SOURCE_KEY = _raw
* REGEX = some regex with (capturing_group)
* FIELDS = one,
Args:
transforms_stanza (str):
The stanza of which the fields should be extracted
Regex:
Parse the fields from a regex. Examples::
(?<name>regex)
(?'name'regex)
(?P<name>regex)
Yields:
generator of fields
"""
try:
if not self.transforms:
return
transforms_values = self.transforms[transforms_stanza]
if "SOURCE_KEY" in transforms_values:
LOGGER.info(f"Parsing source_key of {transforms_stanza}")
yield transforms_values["SOURCE_KEY"]
if "REGEX" in transforms_values:
LOGGER.info(f"Parsing REGEX of {transforms_stanza}")
regex = r"\(\?P?[<'](?!_KEY|_VAL)([A-Za-z0-9_]+)[>']"
match_fields = re.findall(regex, transforms_values["REGEX"])
for each_field in match_fields:
if not each_field.startswith(("_KEY_", "_VAL_")):
yield each_field.strip()
if "FIELDS" in transforms_values:
LOGGER.info(f"Parsing FIELDS of {transforms_stanza}")
fields_values = transforms_values["FIELDS"]
for each_field in fields_values.split(","):
yield each_field.strip()
if "FORMAT" in transforms_values:
LOGGER.info(f"Parsing FORMAT of {transforms_stanza}")
regex = r"(\S*)::"
match_fields = re.findall(regex, transforms_values["FORMAT"])
for each_field in match_fields:
if "$" not in each_field:
yield each_field.strip()
except KeyError:
LOGGER.error(
f"The stanza {transforms_stanza} does not exists in transforms.conf."
)
[docs] def get_lookup_csv_fields(self, lookup_stanza: str) -> Optional[Generator]:
"""
Parse the fields from a lookup file for a specific lookup_stanza
Args:
lookup_stanza (str): A lookup stanza mentioned in transforms.conf
Yields:
string of field names
"""
if not self.transforms:
return
if lookup_stanza in self.transforms.keys():
stanza_values = self.transforms[lookup_stanza]
if "filename" in stanza_values:
lookup_file = stanza_values["filename"]
try:
location = os.path.join(
self.splunk_app_path, "lookups", lookup_file
)
with open(location) as csv_file:
reader = csv.DictReader(csv_file)
fieldnames = reader.fieldnames
for items in fieldnames:
yield items.strip()
# If there is an error. the test should fail with the current fields
# This makes sure the test doesn't exit prematurely
except (OSError, IOError, UnboundLocalError, TypeError) as e:
LOGGER.error(
"Could not read the lookup file, skipping test. error=%s",
str(e),
)