#
# Copyright 2021 Splunk Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from .base_event_ingestor import EventIngestor
import requests
from time import time, mktime
import concurrent.futures
import logging
import os
requests.urllib3.disable_warnings()
LOGGER = logging.getLogger("pytest-splunk-addon")
[docs]class HECEventIngestor(EventIngestor):
"""
Class to ingest event via HEC Event
The format for required_configs is::
{
hec_uri: {splunk_hec_scheme}://{splunk_host}:{hec_port}/services/collector,
session_headers(dict):
{
"Authorization": f"Splunk <hec-token>",
}
}
Args:
required_configs (dict): Dictionary containing hec_uri and session headers
"""
def __init__(self, required_configs):
self.hec_uri = required_configs.get("splunk_hec_uri")
self.session_headers = required_configs.get("session_headers")
[docs] def ingest(self, events, thread_count):
"""
Ingests event and metric data into splunk using HEC token via event endpoint.
For batch ingestion of events in a single request at event endpoint provide a list of event dict to be ingested.
The format of dictionary for ingesting a single event::
{
"sourcetype": "sample_HEC",
"source": "sample_source",
"host": "sample_host",
"event": "event_str"
}
The format of dictionary for ingesting a batch of events::
[
{
"sourcetype": "sample_HEC",
"source": "sample_source",
"host": "sample_host",
"event": "event_str1"
},
{
"sourcetype": "sample_HEC",
"source": "sample_source",
"host": "sample_host",
"event": "event_str2"
},
]
Args:
events (list): List of events (SampleEvent) to be ingested
"""
data = list()
for event in events:
event_dict = {
"sourcetype": event.metadata.get("sourcetype", "pytest_splunk_addon"),
"source": event.metadata.get("source", "pytest_splunk_addon:hec:event"),
"event": event.event,
"index": event.metadata.get("index", "main"),
}
if event.metadata.get("host_type") in ("plugin", None):
host = event.metadata.get("host")
else:
host = event.key_fields.get("host")[0]
if host:
event_dict["host"] = host
if event.metadata.get("timestamp_type").lower() == "event":
if event.time_values:
event_dict["time"] = event.time_values[0]
data.append(event_dict)
batch_event_list = []
for i in range(0, len(data), 100):
batch_event_list.append(data[i : i + 100])
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
_ = list(executor.map(self.__ingest, batch_event_list))
def __ingest(self, data):
try:
LOGGER.info(
"Making a HEC event request with the following params:\nhec_uri:{}\nheaders:{}".format(
str(self.hec_uri), str(self.session_headers)
)
)
LOGGER.debug(
"Creating the following sample event to be ingested via HEC event endoipnt:{}".format(
str(data)
)
)
response = requests.post( # nosemgrep: splunk.disabled-cert-validation
"{}/{}".format(self.hec_uri, "event"),
auth=None,
json=data,
headers=self.session_headers,
verify=False,
)
LOGGER.debug("Status code: {}".format(response.status_code))
if response.status_code not in (200, 201):
raise Exception(
"\nStatus code: {} \nReason: {} \ntext:{}".format(
response.status_code, response.reason, response.text
)
)
except Exception as e:
LOGGER.error("\n\nAn error occurred while data ingestion.{}".format(e))
raise type(e)("An error occurred while data ingestion.{}".format(e))