Source code for standard_lib.sample_generation.pytest_splunk_addon_data_parser

#
# Copyright 2021 Splunk Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
import re
import logging
from .rule import raise_warning
from . import SampleStanza

import addonfactory_splunk_conf_parser_lib as conf_parser
from xmlschema import XMLSchema, XMLSchemaValidationError

LOGGER = logging.getLogger("pytest-splunk-addon")


PSA_DATA_CONFIG_FILE = "pytest-splunk-addon-data.conf"
SCHEMA_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), "schema.xsd")


[docs]class PytestSplunkAddonDataParser: """ This class parses pytest-splunk-addon-data.conf file. Args: addon_path: Path to the Splunk App """ conf_name = " " def __init__(self, addon_path: str, config_path: str): self._conf_parser = conf_parser.TABConfigParser() self.config_path = config_path self._psa_data = None self.addon_path = addon_path self.match_stanzas = set() self._path_to_samples = self._get_path_to_samples() def _get_path_to_samples(self): if os.path.exists(os.path.join(self.config_path, "samples")): LOGGER.info( "Samples path is: {}".format(os.path.join(self.config_path, "samples")) ) return os.path.join(self.config_path, "samples") elif os.path.exists( os.path.join( os.path.abspath(os.path.join(self.config_path, os.pardir)), "samples" ) ): LOGGER.info( "Samples path is: {}".format( os.path.join( os.path.abspath(os.path.join(self.config_path, os.pardir)), "samples", ) ) ) return os.path.join( os.path.abspath(os.path.join(self.config_path, os.pardir)), "samples" ) else: LOGGER.info( "Samples path is: {}".format(os.path.join(self.addon_path, "samples")) ) return os.path.join(self.addon_path, "samples") @property def psa_data(self): psa_data_path = os.path.join(self.config_path, PSA_DATA_CONFIG_FILE) if os.path.exists(psa_data_path): self._conf_parser.read(psa_data_path) self.conf_name = "psa-data-gen" self._psa_data = self._conf_parser.item_dict() return self._psa_data else: LOGGER.warning(f"{PSA_DATA_CONFIG_FILE} not found") raise FileNotFoundError(f"{PSA_DATA_CONFIG_FILE} not found")
[docs] def get_sample_stanzas(self): """ Converts a stanza in pytest-splunk-addon-data.conf to an object of SampleStanza. Returns: List of SampleStanza objects. """ _psa_data = self._get_psa_data_stanzas() self._check_samples() results = [] for sample_name, stanza_params in sorted(_psa_data.items()): sample_path = os.path.join(self._path_to_samples, sample_name) results.append(SampleStanza(sample_path, stanza_params)) return results
def _get_psa_data_stanzas(self): """ Parses the pytest-splunk-addon-data.conf file and converts it into a dictionary. Format:: { "sample_file_name": # Not Stanza name { "input_type": "str", "tokens": { 1: { token: #One# replacementType: random replacement: static } } } } Return: Dictionary representing pytest-splunk-addon-data.conf in the above format. """ psa_data_dict = {} schema = XMLSchema(SCHEMA_PATH) if os.path.exists(self._path_to_samples): for sample_file in os.listdir(self._path_to_samples): for stanza, fields in sorted(self.psa_data.items()): stanza_match_obj = re.search(stanza, sample_file) if stanza_match_obj and stanza_match_obj.group(0) == sample_file: self.match_stanzas.add(stanza) if ( "requirement_test_sample" in self.psa_data[stanza].keys() and int(self.psa_data[stanza]["requirement_test_sample"]) > 0 ): filename = os.path.join(self._path_to_samples, sample_file) schema.validate(filename) test_unicode_char(filename) psa_data_dict.setdefault(sample_file, {"tokens": {}}) for key, value in fields.items(): if key.startswith("token"): _, token_id, token_param = key.split(".") token_key = f"{stanza}_{token_id}" if ( not token_key in psa_data_dict[sample_file]["tokens"].keys() ): psa_data_dict[sample_file]["tokens"][token_key] = {} psa_data_dict[sample_file]["tokens"][token_key][ token_param ] = value else: psa_data_dict[sample_file][key] = value return psa_data_dict def _check_samples(self): """ Gives a user warning when sample file is not found for the stanza present in the configuration file. """ if os.path.exists(self._path_to_samples): for stanza in self.psa_data.keys(): if stanza not in self.match_stanzas: raise_warning(f"No sample file found for stanza : {stanza}") LOGGER.info(f"Sample file found for stanza : {stanza}")
def test_unicode_char(filename): invalid = False # pattern = re.compile("[^\x00-\x7F]") #do ot want to replace printable chars like €¢ etc pattern = re.compile( "[\u200B-\u200E\uFEFF\u202c\u202D\u2063\u2062]" ) # zero width characters error_message = "" for i, line in enumerate(open(filename)): for match in re.finditer(pattern, line): err = f"Unicode char in FILE {filename} Line {i+1}: {match.group().encode('utf-8')}" error_message += f"{err}\n" LOGGER.debug(err) invalid = True if invalid: raise ValueError(error_message)