import os
import re
import logging
from splunk_appinspect import App
from .rule import Rule, raise_warning
from . import SampleStanza
LOGGER = logging.getLogger("pytest-splunk-addon")
import warnings
[docs]class EventgenParser:
"""
This class represents the entire eventgen.conf file and handles parsing mechanism of eventgen and the rules.
Args:
addon_path (str): Path to the Splunk App
"""
conf_name = " "
def __init__(self, addon_path, config_path=None):
self._app = App(addon_path, python_analyzer_enable=False)
self.config_path = config_path
self._eventgen = None
self.addon_path = addon_path
self.match_stanzas = set()
@property
def path_to_samples(self):
if os.path.exists(os.path.join(self.config_path, "samples")):
LOGGER.info("Samples path is: {}".format(os.path.join(self.config_path, "samples")))
return os.path.join(self.config_path, "samples")
elif os.path.exists(
os.path.join(
os.path.abspath(os.path.join(self.config_path, os.pardir)), "samples"
)
):
LOGGER.info("Samples path is: {}".format(os.path.join(os.path.abspath(os.path.join(
self.config_path, os.pardir)), "samples")))
return os.path.join(
os.path.abspath(os.path.join(self.config_path, os.pardir)), "samples"
)
else:
LOGGER.info("Samples path is: {}".format(os.path.join(self.addon_path, "samples")))
return os.path.join(self.addon_path, "samples")
@property
def eventgen(self):
try:
relative_path = os.path.relpath(self.config_path, self.addon_path)
if os.path.exists(
os.path.join(
self.config_path,
"pytest-splunk-addon-data.conf"
)
):
self._eventgen = self._app.get_config(
"pytest-splunk-addon-data.conf", dir=relative_path
)
self.conf_name = "psa-data-gen"
path = self._app.get_filename(
relative_path, "pytest-splunk-addon-data.conf")
elif os.path.exists(
os.path.join(
self.config_path,
'eventgen.conf'
)
):
self._eventgen = self._app.get_config(
"eventgen.conf", dir=relative_path
)
self.conf_name = "eventgen"
path = self._app.get_filename(
relative_path, "eventgen.conf")
else:
self._eventgen = self._app.get_config("eventgen.conf")
self.conf_name = "eventgen"
path = self._app.get_filename(
"default", "eventgen.conf")
LOGGER.info("Using Eventgen path: {e}\nUsing Conf file name: {c}".format(
e=path, c=self.conf_name))
return self._eventgen
except OSError:
LOGGER.warning("pytest-splunk-addon-data.conf/eventgen.conf not Found")
raise FileNotFoundError("pytest-splunk-addon-data.conf/eventgen.conf not Found")
[docs] def get_sample_stanzas(self):
"""
Converts a stanza in eventgen.conf to an object of SampleStanza.
Yields:
SampleStanza Object
"""
eventgen_dict = self.get_eventgen_stanzas()
self.check_samples()
for sample_name, stanza_params in sorted(eventgen_dict.items()):
sample_path = os.path.join(self.path_to_samples, sample_name)
yield SampleStanza(
sample_path, stanza_params,
)
[docs] def get_eventgen_stanzas(self):
"""
Parses the eventgen.conf file and converts it into a dictionary.
Format::
{
"sample_file_name": # Not Stanza name
{
"input_type": "str",
"tokens":
{
1:
{
token: #One#
replacementType: random
replacement: static
}
}
}
}
Return:
Dictionary representing eventgen.conf in the above format.
"""
eventgen_dict = {}
if os.path.exists(self.path_to_samples):
for sample_file in os.listdir(self.path_to_samples):
for stanza in sorted(self.eventgen.sects):
stanza_match_obj = re.search(stanza, sample_file)
if stanza_match_obj and stanza_match_obj.group(0) == sample_file:
self.match_stanzas.add(stanza)
eventgen_sections = self.eventgen.sects[stanza]
eventgen_dict.setdefault((sample_file), {"tokens": {}})
for stanza_param in eventgen_sections.options:
eventgen_property = eventgen_sections.options[stanza_param]
if eventgen_property.name.startswith("token"):
_, token_id, token_param = eventgen_property.name.split(
"."
)
token_key = "{}_{}".format(stanza, token_id)
if (
not token_key
in eventgen_dict[sample_file]["tokens"].keys()
):
eventgen_dict[sample_file]["tokens"][token_key] = {}
eventgen_dict[sample_file]["tokens"][token_key][
token_param
] = eventgen_property.value
else:
eventgen_dict[sample_file][
eventgen_property.name
] = eventgen_property.value
return eventgen_dict
[docs] def check_samples(self):
"""
Gives a user warning when sample file is not found for the stanza peresent in the configuration file.
"""
if os.path.exists(self.path_to_samples):
for stanza in self.eventgen.sects:
if stanza not in self.match_stanzas:
raise_warning("No sample file found for stanza : {}".format(stanza))
LOGGER.info(
"Sample file found for stanza : {}".format(stanza))