Source code for standard_lib.cim_tests.test_templates

#
# Copyright 2021 Splunk Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# -*- coding: utf-8 -*-
"""
Includes the test scenarios to check the CIM compatibility of an Add-on.
"""
import logging
import pytest
from .field_test_helper import FieldTestHelper
from ..utilities.log_helper import get_table_output
from ..utilities.log_helper import format_search_query_log


[docs]class CIMTestTemplates(object): """ Test scenarios to check the CIM compatibility of an Add-on Supported Test scenarios: - The eventtype should exctract all required fields of data model - One eventtype should not be mapped with more than one data model - Field Cluster should be verified (should be included with required field test) - Verify if CIM installed or not - Not Allowed Fields should not be extracted """ logger = logging.getLogger("pytest-splunk-addon-cim-tests")
[docs] @pytest.mark.splunk_searchtime_cim @pytest.mark.splunk_searchtime_cim_fields def test_cim_required_fields( self, splunk_search_util, splunk_ingest_data, splunk_setup, splunk_searchtime_cim_fields, record_property, ): """ Test the the required fields in the data models are extracted with valid values. Supports 3 scenarios. The test order is maintained for better test report. - Check that there is at least 1 event mapped with the data model - Check that each required field is extracted in all of the events mapped with the data model. - Check that if there are inter dependent fields, either all fields should be extracted or none of them should be extracted. """ cim_data_set = splunk_searchtime_cim_fields["data_set"] cim_fields = splunk_searchtime_cim_fields["fields"] cim_tag_stanza = splunk_searchtime_cim_fields["tag_stanza"] cim_single_field = ", ".join(map(str, cim_fields)) cim_fields_type = ", ".join(map(lambda f: f.get_type(), cim_fields)) cim_data_model = cim_data_set[-1].data_model data_set = str(cim_data_set[-1]) index_list = ( "(index=" + " OR index=".join(splunk_search_util.search_index.split(",")) + ")" ) # Search Query base_search = "| search {}".format(index_list) for each_set in cim_data_set: base_search += " | search {}".format(each_set.search_constraints) base_search += " | search {}".format(cim_tag_stanza) test_helper = FieldTestHelper( splunk_search_util, cim_fields, interval=splunk_search_util.search_interval, retries=splunk_search_util.search_retry, ) record_property("tag_stanza", cim_tag_stanza) record_property("data_model", cim_data_model) record_property("data_set", data_set) record_property("fields", cim_single_field) record_property("fields_type", cim_fields_type) # Execute the query and get the results results = test_helper.test_field(base_search, record_property) # All assertion are made in the same tests to make the test report with # very clear order of scenarios. with this approach, a user will be able to identify # what went wrong very quickly. if len(cim_fields) == 0: # If no fields are there, check that the events are mapped # with the data model assert results, ( "\n0 Events mapped with the dataset.\n" f"\n{test_helper.format_exc_message()}" ) if len(cim_fields) == 1: test_field = cim_fields[0] # If the field is required, # there should be events mapped with the data model # If the field is conditional, # It's fine if no events matched the condition if not test_field.type == "conditional": assert results, ( "\n0 Events mapped with the dataset.\n" f"\n{test_helper.format_exc_message()}" ) # The field should be extracted if event count > 0 for each_field in results: assert not each_field["field_count"] == 0, ( f"\nField {test_field} is not extracted in any events." f"\n{test_helper.format_exc_message()}" ) if each_field["field_count"] > each_field["event_count"]: raise AssertionError( f"\nField {test_field} should not be multi-value." f"\n{test_helper.format_exc_message()}" ) elif each_field["field_count"] < each_field["event_count"]: # The field should be extracted in all events mapped raise AssertionError( f"\nField {test_field} is not extracted in some events." f"\n{test_helper.format_exc_message()}" ) assert each_field["field_count"] == each_field["valid_field_count"], ( f"\nField {test_field} has invalid values." f"\n{test_helper.format_exc_message()}" ) elif len(cim_fields) > 1: # Check that count for all the fields in cluster is same. # If all the fields are not extracted in an event, that's a passing scenario # The count of the field may or may not be same with the count of event. sourcetype_fields = dict() for each_result in results: sourcetype_fields.setdefault( (each_result["source"], each_result["sourcetype"]), list() ).extend([each_result["field_count"], each_result["valid_field_count"]]) for sourcetype_fields in sourcetype_fields.values(): assert len(set(sourcetype_fields)) == 1, ( "All fields from the field-cluster should be extracted with valid values if any one field is extracted." f"\n{test_helper.format_exc_message()}" )
[docs] @pytest.mark.splunk_searchtime_cim @pytest.mark.splunk_searchtime_cim_fields_not_allowed_in_props def test_cim_fields_not_allowed_in_props( self, splunk_ingest_data, splunk_setup, splunk_searchtime_cim_fields_not_allowed_in_props, record_property, ): """ This testcase checks for cim field of type ["not_allowed_in_search_and_props", "not_allowed_in_props"] if an extraction is defined in the configuration file. """ result_str = ( "The field extractions are not allowed in the configuration files" "\nThese fields are automatically provided by asset and identity" " correlation features of applications like Splunk Enterprise Security." "\nDo not define extractions for these fields when writing add-ons.\n\n" ) result_str += get_table_output( headers=["Stanza", "Classname", "Fieldname"], value_list=[ [data["stanza"], data["classname"], data["name"]] for data in splunk_searchtime_cim_fields_not_allowed_in_props["fields"] ], ) assert not splunk_searchtime_cim_fields_not_allowed_in_props[ "fields" ], result_str
[docs] @pytest.mark.splunk_searchtime_cim @pytest.mark.splunk_searchtime_cim_mapped_datamodel def test_eventtype_mapped_multiple_cim_datamodel( self, splunk_search_util, splunk_ingest_data, splunk_setup, splunk_searchtime_cim_mapped_datamodel, record_property, caplog, ): """ This test case check that event type is not be mapped with more than one data model Args: splunk_search_util (SearchUtil): Object that helps to search on Splunk. splunk_searchtime_cim_mapped_datamodel: Object which contain eventtype list record_property (fixture): Document facts of test cases. caplog (fixture): fixture to capture logs. """ data_models = [ {"name": "Alerts", "tags": [["alert"]]}, { "name": "Authentication", "tags": [ ["authentication"], ["authentication", "default"], ["authentication", "insecure"], ["authentication", "privileged"], ], }, {"name": "Certificates", "tags": [["certificate"], ["certificate", "ssl"]]}, { "name": "Change", "tags": [ ["change"], ["change", "audit"], ["change", "endpoint"], ["change", "network"], ["change", "account"], ], }, { "name": "Compute_Inventory", "tags": [ ["inventory", "cpu"], ["inventory", "memory"], ["inventory", "network"], ["inventory", "storage"], ["inventory", "system", "version"], ["inventory", "user"], ["inventory", "user", "default"], ["inventory", "virtual"], ["inventory", "virtual", "snapshot"], ["inventory", "virtual", "tools"], ], }, {"name": "DLP", "tags": [["dlp", "incident"]]}, { "name": "Databases", "tags": [ ["database"], ["database", "instance"], ["database", "instance", "stats"], ["database", "instance", "session"], ["database", "instance", "lock"], ["database", "query"], ["database", "query", "tablespace"], ["database", "query", "stats"], ], }, { "name": "Email", "tags": [ ["email"], ["email", "delivery"], ["email", "content"], ["email", "filter"], ], }, { "name": "Endpoint", "tags": [ ["listening", "port"], ["process", "report"], ["service", "report"], ["endpoint", "filesystem"], ["endpoint", "registry"], ], }, {"name": "Event_Signatures", "tags": [["track_event_signatures"]]}, {"name": "Interprocess_Messaging", "tags": [["messaging"]]}, {"name": "Intrusion_Detection", "tags": [["ids", "attack"]]}, { "name": "JVM", "tags": [ ["jvm"], ["jvm", "threading"], ["jvm", "runtime"], ["jvm", "os"], ["jvm", "compilation"], ["jvm", "classloading"], ["jvm", "memory"], ], }, { "name": "Malware", "tags": [["malware", "attack"], ["malware", "operations"]], }, {"name": "Network_Resolution", "tags": [["network", "resolution", "dns"]]}, { "name": "Network_Sessions", "tags": [ ["network", "session"], ["network", "session", "start"], ["network", "session", "end"], ["network", "session", "dhcp"], ["network", "session", "vpn"], ], }, {"name": "Network_Traffic", "tags": [["network", "communicate"]]}, { "name": "Performance", "tags": [ ["performance", "cpu"], ["performance", "facilities"], ["performance", "memory"], ["performance", "storage"], ["performance", "network"], ["performance", "os"], ["performance", "os", "time", "synchronize"], ["performance", "os", "uptime"], ], }, { "name": "Splunk_Audit", "tags": [["modaction"], ["modaction", "invocation"]], }, { "name": "Ticket_Management", "tags": [ ["ticketing"], ["ticketing", "change"], ["ticketing", "incident"], ["ticketing", "problem"], ], }, {"name": "Updates", "tags": [["update", "status"], ["update", "error"]]}, {"name": "Vulnerabilities", "tags": [["report", "vulnerability"]]}, {"name": "Web", "tags": [["web"], ["web", "proxy"]]}, ] index_list = ( "(index=" + " OR index=".join(splunk_search_util.search_index.split(",")) + ")" ) search = "search {} ".format(index_list) # search = "search " search += " OR ".join( "eventtype={} \n".format(eventtype) for eventtype in splunk_searchtime_cim_mapped_datamodel["eventtypes"] ) search += " | fields eventtype,tag \n" for data_model in data_models: search += "| appendpipe [ | search " search += " OR ".join( "({})".format((" ".join("tag={}".format(tag) for tag in tags_list))) for tags_list in data_model.get("tags") ) search += f" | eval dm_type=\"{data_model.get('name')}\"]\n" search += """| stats delim=", " dc(dm_type) as datamodel_count, values(dm_type) as datamodels by eventtype | nomv datamodels | where datamodel_count > 1 and NOT eventtype IN ("err0r") """ record_property("search", search) results = list( splunk_search_util.getFieldValuesList( search, splunk_search_util.search_interval, splunk_search_util.search_retry, ) ) if results: record_property("results", results) result_str = get_table_output( headers=["Count", "Eventtype", "Datamodels"], value_list=[ [ each_result["datamodel_count"], each_result["eventtype"], each_result["datamodels"], ] for each_result in results ], ) assert not results, ( "Multiple data models are mapped with an eventtype" f"\nQuery result greater than 0." f"{format_search_query_log(search)}" f"\nEvent type which associated with multiple data model \n{result_str}" )
@pytest.mark.splunk_searchtime_cim @pytest.mark.splunk_requirements @pytest.mark.splunk_requirements_unit def test_cim_fields_recommended( self, splunk_dm_recommended_fields, splunk_searchtime_cim_fields_recommended ): datamodel = splunk_searchtime_cim_fields_recommended["datamodel"] datasets = splunk_searchtime_cim_fields_recommended["datasets"] fields = splunk_searchtime_cim_fields_recommended["fields"] cim_version = splunk_searchtime_cim_fields_recommended["cim_version"] fields_from_model_definition = splunk_dm_recommended_fields( datamodel, datasets, cim_version ) self.logger.debug(f"Fields from Splunk: {fields_from_model_definition}") model_key = f"{cim_version}:{datamodel}:{':'.join(datasets)}".strip(":") model_fields = fields_from_model_definition.get(model_key, []) self.logger.debug(f"Fields from CIM definition: {model_fields}") missing_fields = [] for field in set(model_fields): if field not in fields: missing_fields.append(field) assert ( missing_fields == [] ), f"Not all fields from datamodel found for event definition. Missing fields: {', '.join(missing_fields)}"