Source code for standard_lib.event_ingestors.hec_event_ingestor

#
# Copyright 2021 Splunk Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from .base_event_ingestor import EventIngestor
import requests
from time import time, mktime
import concurrent.futures
import logging
import os

requests.urllib3.disable_warnings()

LOGGER = logging.getLogger("pytest-splunk-addon")


[docs]class HECEventIngestor(EventIngestor): """ Class to ingest event via HEC Event The format for required_configs is:: { hec_uri: {splunk_hec_scheme}://{splunk_host}:{hec_port}/services/collector, session_headers(dict): { "Authorization": f"Splunk <hec-token>", } } Args: required_configs (dict): Dictionary containing hec_uri and session headers """ def __init__(self, required_configs): self.hec_uri = required_configs.get("splunk_hec_uri") self.session_headers = required_configs.get("session_headers")
[docs] def ingest(self, events, thread_count): """ Ingests event and metric data into splunk using HEC token via event endpoint. For batch ingestion of events in a single request at event endpoint provide a list of event dict to be ingested. The format of dictionary for ingesting a single event:: { "sourcetype": "sample_HEC", "source": "sample_source", "host": "sample_host", "event": "event_str" } The format of dictionary for ingesting a batch of events:: [ { "sourcetype": "sample_HEC", "source": "sample_source", "host": "sample_host", "event": "event_str1" }, { "sourcetype": "sample_HEC", "source": "sample_source", "host": "sample_host", "event": "event_str2" }, ] Args: events (list): List of events (SampleEvent) to be ingested """ data = list() for event in events: event_dict = { "sourcetype": event.metadata.get("sourcetype", "pytest_splunk_addon"), "source": event.metadata.get("source", "pytest_splunk_addon:hec:event"), "event": event.event, "index": event.metadata.get("index", "main"), } if event.metadata.get("host_type") in ("plugin", None): host = event.metadata.get("host") else: host = event.key_fields.get("host")[0] if host: event_dict["host"] = host if event.metadata.get("timestamp_type").lower() == "event": if event.time_values: event_dict["time"] = event.time_values[0] data.append(event_dict) batch_event_list = [] for i in range(0, len(data), 100): batch_event_list.append(data[i : i + 100]) with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor: _ = list(executor.map(self.__ingest, batch_event_list))
def __ingest(self, data): try: LOGGER.info( "Making a HEC event request with the following params:\nhec_uri:{}\nheaders:{}".format( str(self.hec_uri), str(self.session_headers) ) ) LOGGER.debug( "Creating the following sample event to be ingested via HEC event endoipnt:{}".format( str(data) ) ) response = requests.post( # nosemgrep: splunk.disabled-cert-validation "{}/{}".format(self.hec_uri, "event"), auth=None, json=data, headers=self.session_headers, verify=False, ) LOGGER.debug("Status code: {}".format(response.status_code)) if response.status_code not in (200, 201): raise Exception( "\nStatus code: {} \nReason: {} \ntext:{}".format( response.status_code, response.reason, response.text ) ) except Exception as e: LOGGER.error("\n\nAn error occurred while data ingestion.{}".format(e)) raise type(e)("An error occurred while data ingestion.{}".format(e))