"""
Publish to Splunk using a ``multiprocessing.Process`` worker
Including a handler derived from the original repository:
https://github.com/zach-taylor/splunk_handler
Please note this will not work with Celery. Please use
the ``spylunking.splunk_publisher.SplunkPublisher`` if you want
to see logs inside a Celery task.
Supported environment variables:
::
export SPLUNK_HOST="<splunk host>"
export SPLUNK_PORT="<splunk port: 8088>"
export SPLUNK_API_PORT="<splunk port: 8089>"
export SPLUNK_ADDRESS="<splunk address host:port>"
export SPLUNK_API_ADDRESS="<splunk api address host:port>"
export SPLUNK_TOKEN="<splunk token>"
export SPLUNK_INDEX="<splunk index>"
export SPLUNK_SOURCE="<splunk source>"
export SPLUNK_SOURCETYPE="<splunk sourcetype>"
export SPLUNK_VERIFY="<verify certs on HTTP POST>"
export SPLUNK_TIMEOUT="<timeout in seconds>"
export SPLUNK_QUEUE_SIZE="<num msgs allowed in queue - 0=infinite>"
export SPLUNK_SLEEP_INTERVAL="<sleep in seconds per batch>"
export SPLUNK_RETRY_COUNT="<attempts per log to retry publishing>"
export SPLUNK_RETRY_BACKOFF="<cooldown in seconds per failed POST>"
export SPLUNK_DEBUG="<1 enable debug|0 off>"
"""
import json
import logging
import time
import traceback
import requests
import signal
import multiprocessing
import spylunking.send_to_splunk as send_to_splunk
from spylunking.rnow import rnow
from spylunking.ppj import ppj
from spylunking.consts import SPLUNK_HOST
from spylunking.consts import SPLUNK_PORT
from spylunking.consts import SPLUNK_TOKEN
from spylunking.consts import SPLUNK_INDEX
from spylunking.consts import SPLUNK_SOURCE
from spylunking.consts import SPLUNK_SOURCETYPE
from spylunking.consts import SPLUNK_VERIFY
from spylunking.consts import SPLUNK_TIMEOUT
from spylunking.consts import SPLUNK_SLEEP_INTERVAL
from spylunking.consts import SPLUNK_RETRY_COUNT
from spylunking.consts import SPLUNK_RETRY_BACKOFF
from spylunking.consts import SPLUNK_QUEUE_SIZE
from spylunking.consts import SPLUNK_DEBUG
from spylunking.consts import SPLUNK_HOSTNAME
from requests.packages.urllib3.util.retry import Retry
from requests.adapters import HTTPAdapter
[docs]class MPSplunkPublisher(logging.Handler):
"""
A logging handler to send logs to a Splunk Enterprise instance
running the Splunk HTTP Event Collector.
Originally inspired from the repository but written for
python's multiprocessing framework:
https://github.com/zach-taylor/splunk_handler
This class allows multiple processes like Celery workers
to reliably publish logs to Splunk from inside of a Celery task
"""
def __init__(
self,
host=None,
port=None,
address=None,
token=None,
index=None,
hostname=None,
source=None,
sourcetype='text',
verify=True,
timeout=60,
sleep_interval=None,
queue_size=0,
debug=False,
retry_count=20,
run_once=False,
retry_backoff=2.0):
"""__init__
Initialize the MPSplunkPublisher with support for
multiprocessing
:param host: Splunk fqdn
:param port: Splunk HEC Port 8088
:param address: Splunk fqdn:8088 - overrides host and port
:param token: Pre-existing Splunk token
:param index: Splunk index
:param hostname: Splunk address <host:port>
:param source: source for log records
:param sourcetype: json
:param verify: verify using certs
:param timeout: HTTP request timeout in seconds
:param sleep_interval: Sleep before purging queued
logs interval in seconds
:param queue_size: Queue this number of logs
before dropping new logs with
0 is an infinite number of messages
:param debug: debug the publisher
:param retry_count: number of publish retries per log record
:param retry_backoff: cooldown timer in seconds
:param run_once: flag used by tests for publishing once
and shutting down
"""
logging.Handler.__init__(self)
self.host = host
if self.host is None:
self.host = SPLUNK_HOST
self.port = port
if self.port is None:
self.port = SPLUNK_PORT
if address:
address_split = address.split(':')
self.host = address_split[0]
self.port = int(address_split[1])
self.token = token
if self.token is None:
self.token = SPLUNK_TOKEN
self.index = index
if self.index is None:
self.index = SPLUNK_INDEX
self.source = source
if self.source is None:
self.source = SPLUNK_SOURCE
self.sourcetype = sourcetype
if self.sourcetype is None:
self.sourcetype = SPLUNK_SOURCETYPE
self.verify = verify
if self.verify is None:
self.verify = SPLUNK_VERIFY
self.timeout = timeout
if self.timeout is None:
self.timeout = SPLUNK_TIMEOUT
self.sleep_interval = sleep_interval
if self.sleep_interval is None:
self.sleep_interval = SPLUNK_SLEEP_INTERVAL
self.retry_count = retry_count
if self.retry_count is None:
self.retry_count = SPLUNK_RETRY_COUNT
self.retry_backoff = retry_backoff
if self.retry_backoff is None:
self.retry_backoff = SPLUNK_RETRY_BACKOFF
self.queue_size = queue_size
if self.queue_size is None:
self.queue_size = SPLUNK_QUEUE_SIZE
self.log_payload = ''
self.session = requests.Session()
self.num_sent = 0
# Multiprocesing entities
self.run_once = run_once
self.processes = []
self.manager = multiprocessing.Manager()
self.queue = self.manager.Queue(maxsize=self.queue_size)
self.shutdown_event = multiprocessing.Event()
self.shutdown_ack = multiprocessing.Event()
self.already_done = multiprocessing.Event()
self.debug = debug
if SPLUNK_DEBUG:
self.debug = True
# True if application requested exit
self.shutdown_now = False
self.debug_log('starting debug mode')
self.hostname = hostname
if hostname is None:
self.hostname = SPLUNK_HOSTNAME
self.debug_log('preparing to override loggers')
# prevent infinite recursion by silencing requests and urllib3 loggers
logging.getLogger('requests').propagate = False
logging.getLogger('urllib3').propagate = False
# and do the same for ourselves
logging.getLogger(__name__).propagate = False
# disable all warnings from urllib3 package
if not self.verify:
requests.packages.urllib3.disable_warnings()
# Set up automatic retry with back-off
self.debug_log('preparing to create a Requests session')
retry = Retry(
total=self.retry_count,
backoff_factor=self.retry_backoff,
method_whitelist=False, # Retry for any HTTP verb
status_forcelist=[500, 502, 503, 504])
self.session.mount('https://', HTTPAdapter(max_retries=retry))
self.start_worker()
self.debug_log('class initialize complete')
# end of __init__
[docs] def emit(
self,
record):
"""emit
Emit handler for queue-ing message for
the helper thread to send to Splunk on the ``self.sleep_interval``
:param record: LogRecord to send to Splunk
https://docs.python.org/3/library/logging.html
"""
self.debug_log('emit - start')
try:
record = self.format_record(
record)
except Exception as e:
self.write_log(
'exception in Splunk logging handler: %s' % str(e))
self.write_log(
traceback.format_exc())
return
if not self.is_shutting_down(shutdown_event=self.shutdown_event) \
and self.sleep_interval > 0.1:
try:
self.debug_log(
'writing to queue={}'.format(
self.queue))
# Put log message into queue; worker thread will pick up
self.queue.put(
record)
except Exception as e:
self.write_log(
'log queue full; log data will be dropped.')
else:
# Publish immediately because there is no worker
self.publish_to_splunk(
payload=record)
self.debug_log('emit - done')
# end of emit
[docs] def start_worker(
self):
"""start_worker
Start the helper worker process to package queued messages
and send them to Splunk
"""
# Start a worker thread responsible for sending logs
if not self.is_shutting_down(shutdown_event=self.shutdown_event) \
and self.sleep_interval > 0.1:
self.processes = []
self.debug_log(
'start_worker - start multiprocessing.Process')
p = multiprocessing.Process(
target=self.perform_work,
args=(
self.queue,
self.shutdown_event,
self.shutdown_ack,
self.already_done))
p.daemon = True
p.start()
self.processes.append({
'process': p,
'shutdown_event': self.shutdown_event,
'shutdown_ack_event': self.shutdown_ack,
'already_done_event': self.already_done
})
self.debug_log(
'start_worker - done')
# end of start_worker
[docs] def write_log(
self,
log_message):
"""write_log
:param log_message: message to log
"""
print(('{} mpsplunkpub {}'.format(
rnow(),
log_message)))
# end of write_log
[docs] def debug_log(
self,
log_message):
"""debug_log
Write logs that only show up in debug mode.
To turn on debugging with environment variables
please set this environment variable:
::
export SPLUNK_DEBUG="1"
:param log_message: message to log
"""
if self.debug:
print(('{} mpsplunkpub DEBUG {}'.format(
rnow(),
log_message)))
# end of debug_log
# end of format_record
# end of perform_work
[docs] def publish_to_splunk(
self,
payload=None,
shutdown_event=None,
shutdown_ack_event=None,
already_done_event=None):
"""publish_to_splunk
Publish the queued messages to Splunk
:param payload: optional string log message to send to Splunk
:param shutdown_event: multiprocessing.Event - shutdown event
:param shutdown_ack_event: multiprocessing.Event -
acknowledge shutdown is in progress
:param already_done_event: multiprocessing.Event -
already shutting down
"""
self.debug_log((
'publish_to_splunk - start'))
use_payload = payload
if not use_payload:
use_payload = self.log_payload
if use_payload:
self.debug_log(
'payload available for sending')
url = 'https://{}:{}/services/collector'.format(
self.host,
self.port)
self.debug_log(
'destination URL={}'.format(
url))
try:
if self.debug:
try:
msg_dict = json.loads(use_payload)
event_data = json.loads(msg_dict['event'])
msg_dict['event'] = event_data
self.debug_log((
'sending payload: {}').format(
ppj(msg_dict)))
except Exception:
self.debug_log((
'sending data payload: {}').format(
use_payload))
if self.num_sent > 100000:
self.num_sent = 0
else:
self.num_sent += 1
send_to_splunk.send_to_splunk(
session=self.session,
url=url,
data=use_payload,
headers={'Authorization': 'Splunk {}'.format(
self.token)},
verify=self.verify,
timeout=self.timeout)
self.debug_log(
'payload sent successfully')
except Exception as e:
try:
self.write_log(
'Exception in Splunk logging handler: {}'.format(
e))
self.write_log(traceback.format_exc())
except Exception:
self.debug_log(
'Exception encountered,'
'but traceback could not be formatted')
self.log_payload = ''
# end of publish handling
self.debug_log((
'publish_to_splunk - done - '
'self.is_shutting_down={} self.shutdown_now={}').format(
self.is_shutting_down(shutdown_event=self.shutdown_event),
self.shutdown_now))
# end of publish_to_splunk
[docs] def build_payload_from_queued_messages(
self,
use_queue,
shutdown_event):
"""build_payload_from_queued_messages
Empty the queued messages by building a large ``self.log_payload``
:param use_queue: queue holding the messages
:param shutdown_event: shutdown event
"""
not_done = True
while not_done:
if self.is_shutting_down(
shutdown_event=shutdown_event):
self.debug_log(
'build_payload shutting down')
return True
self.debug_log('reading from queue={}'.format(
str(use_queue)))
try:
msg = use_queue.get(
block=True,
timeout=self.sleep_interval)
self.log_payload = self.log_payload + msg
if self.debug:
self.debug_log('got queued message={}'.format(
msg))
not_done = not self.queue_empty(
use_queue=use_queue)
except Exception as e:
if self.is_shutting_down(
shutdown_event=shutdown_event):
self.debug_log(
'helper was shut down '
'msgs in the queue may not all '
'have been sent')
else:
self.debug_log((
'helper hit an ex={} shutting down '
'msgs in the queue may not all '
'have been sent').format(
e))
not_done = True
self.shutdown_now = True
return True
# end of getting log msgs from the queue
self.debug_log('done reading from queue')
if self.is_shutting_down(
shutdown_event=shutdown_event):
self.debug_log(
'build_payload - already shutting down')
return True
# If the payload is getting very long,
# stop reading and send immediately.
# Current limit is 50MB
if self.is_shutting_down(shutdown_event=shutdown_event) \
or len(self.log_payload) >= 524288:
self.debug_log(
'payload maximum size exceeded, sending immediately')
return False
self.debug_log('build_payload - done')
return True
# end of build_payload_from_queued_messages
[docs] def queue_empty(
self,
use_queue):
"""queue_empty
:param use_queue: queue to test
"""
if hasattr(use_queue, 'empty'):
return use_queue.empty()
else:
return use_queue.qsize() == 0
# end of queue_empty
[docs] def force_flush(
self):
"""force_flush
Flush the queue and publish everything to Splunk
"""
self.debug_log('force flush requested')
self.perform_work(
self.queue,
self.shutdown_event,
self.shutdown_ack,
self.already_done)
# end of force_flush
[docs] def is_shutting_down(
self,
shutdown_event):
"""is_shutting_down
Determine if the parent is shutting down or this was
triggered to shutdown
:param shutdown_event: shutdown event
"""
return bool(
shutdown_event.is_set()
or self.shutdown_now
or self.run_once)
# end of is_shutting_down
[docs] def close(self):
"""close"""
self.debug_log(
'close - start shutdown')
self.shutdown()
logging.Handler.close(self)
self.debug_log(
'close - handler join on processes={}'.format(
len(self.processes)))
self.debug_log(
'close - done')
# end of close
[docs] def shutdown(
self):
"""shutdown
"""
# Only initiate shutdown once
if self.is_shutting_down(
shutdown_event=self.shutdown_event):
self.debug_log('shutdown - still shutting down')
return
else:
self.debug_log('shutdown - start - setting instance shutdown')
self.shutdown_now = True
self.shutdown_event.set()
# if/else already shutting down
self.debug_log(
'shutdown - trying to publish remaining msgs')
# Send the remaining items that might be sitting in queue.
self.perform_work(
self.queue,
self.shutdown_event,
self.shutdown_ack,
self.already_done)
self.debug_log('shutdown - joining')
for p in self.processes:
p['shutdown_event'].set()
self.debug_log('shutdown - done')
# end of shutdown
# end of MPSplunkPublisher